Parallel upload to Amazon S3 with python, boto and multiprocessing

Brad Chapman bio photo By Brad Chapman Comment

One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn't going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak's answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores.

Amazon Simple Storage System (S3) provides relatively inexpensive cloud storage with their reduced redundancy storage option. S3, and all of Amazon's cloud services, are accessible directly from Python using boto. By using boto's multipart upload support, coupled with Python's built in multiprocessing module, I'll demonstrate maximizing transfer speeds to make uploading data less painful. The script is available from GitHub and requires the latest boto from GitHub (2.0b5 or better).

Parallel upload with multiprocessing

The overall process uses boto to connect to an S3 upload bucket, initialize a multipart transfer, split the file into multiple pieces, and then upload these pieces in parallel over multiple cores. Each processing core is passed a set of credentials to identify the transfer: the multipart upload identifier (mp.id), the S3 file key name (mp.key_name) and the S3 bucket name (mp.bucket_name).

[sourcecode language="python"]
import boto

conn = boto.connect_s3()
bucket = conn.lookup(bucket_name)
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part)
for (i, part) in
enumerate(split_file(tarball, mb_size, cores)))):
pass
mp.complete_upload()
[/sourcecode]

The split_file function uses the unix split command to divide the file into sections, each of which will be uploaded separately.

[sourcecode language="python"]
def split_file(in_file, mb_size, split_num=5):
prefix = os.path.join(os.path.dirname(in_file),
"%sS3PART" % (os.path.basename(s3_key_name)))
split_size = int(min(mb_size / (split_num * 2.0), 250))
if not os.path.exists("%saa" % prefix):
cl = ["split", "-b%sm" % split_size, in_file, prefix]
subprocess.check_call(cl)
return sorted(glob.glob("%s*" % prefix))
[/sourcecode]

The multiprocessing aspect is managed using a contextmanager. The initial multiprocessing pool is setup, using a specified number of cores, and configured to allow keyboard interrupts. We then return a lazy map function (imap) which can be used just like Python's standard map. This transparently divides the function calls for each file part over all available cores. Finally, the pool is cleaned up when the map is finished running.

[sourcecode language="python"]
@contextlib.contextmanager
def multimap(cores=None):
if cores is None:
cores = max(multiprocessing.cpu_count() - 1, 1)
def wrapper(func):
def wrap(self, timeout=None):
return func(self, timeout=timeout if timeout is not None else 1e100)
return wrap
IMapIterator.next = wrapper(IMapIterator.next)
pool = multiprocessing.Pool(cores)
yield pool.imap
pool.terminate()
[/sourcecode]

The actual work of transferring each portion of the file is done using two functions. The helper function, mp_from_ids, uses the id information about the bucket, file key and multipart upload id to reconstitute a multipart upload object:

[sourcecode language="python"]
def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
conn = boto.connect_s3()
bucket = conn.lookup(mp_bucketname)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_keyname
mp.id = mp_id
return mp
[/sourcecode]

This object, together with the number of the file part and the file itself, are used to transfer that section of the file. The file part is removed after successful upload.

[sourcecode language="python"]
@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
print " Transferring", i, part
with open(part) as t_handle:
mp.upload_part_from_file(t_handle, i+1)
os.remove(part)
[/sourcecode]

When all sections, distributed over all processors, are finished, the multipart upload is signaled complete and Amazon finishes the process. Your file is now available on S3.

Parallel download

Download speeds can be maximized by utilizing several existing parallelized accelerators:

Combine these with the uploader to build up a cloud analysis workflow: move your data to S3, run a complex analysis pipeline on EC2, push the results back to S3, and then download them to local machines. Please share other tips and tricks you use to deal with Amazon file transfer in the comments.

comments powered by Disqus