I have multiple zip files called f1.zip, f2.zip, … f7.zip each contain around 200k xml files and I am using this code to multiprocess to unzip them in parallel, but even if each file is very small, the read/write process makes the unzipping really slow and I wanted to know if exist a way of using AWS EMR to speed up the unzipping process.
import os, boto3, zipfile
import sys
import multiprocessing
from io import BytesIO
sys.path.insert(0, './src')
def list_s3_obj(bucket, prefix, extension):
s3 = boto3.resource('s3')
raw_bucket = s3.Bucket(bucket)
file_list = []
for file in raw_bucket.objects.filter(Prefix=prefix):
if os.path.join(file.key).endswith(extension):
file_list.append((bucket, file.key))
return file_list
def unzip_s3_obj(bucket, zip_key, unzip_key):
print('Unzip | ' + zip_key)
MB = 1024 ** 2
config = boto3.s3.transfer.TransferConfig(
use_threads = True,
multipart_threshold = 1024*MB,
max_concurrency = 4,
io_chunksize = 300*MB,
)
s3 = boto3.resource('s3')
buffer = BytesIO()
zip_obj = s3.Object(bucket_name=bucket ,key=zip_key)
zip_obj.download_fileobj(Fileobj=buffer, Config=config)
z = zipfile.ZipFile(buffer)
for filename in z.namelist():
print('Extracting | ' + filename)
file_info = z.getinfo(filename)
s3.meta.client.upload_fileobj(
z.open(filename),
Bucket=bucket,
Key=unzip_key + filename,
Config=config
)
current_key = 'Folder_spot'
bucket = 's3-bucket'
raw_key = f'{current_key}/zip/'
unzip_f1_key = f'{current_key}/unzip/f1/'
unzip_f2_key = f'{current_key}/unzip/f2/'
unzip_f3_key = f'{current_key}/unzip/f3/'
unzip_f4_key = f'{current_key}/unzip/f4/'
unzip_f5_key = f'{current_key}/unzip/f5/'
unzip_f6_key = f'{current_key}/unzip/f6/'
unzip_f7_key = f'{current_key}/unzip/f7/'
def unzip_s3():
file_list = list_s3_obj(bucket, raw_key, '.zip')
subprocess_map = [ (*e, unzip_f1_key) for e in file_list if 'f1' in e[1].lower()]
subprocess_map.extend([ (*e, unzip_f2_key) for e in file_list if 'f2' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f3_key) for e in file_list if 'f3' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f4_key) for e in file_list if 'f4' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f5_key) for e in file_list if 'f5' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f6_key) for e in file_list if 'f6' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f7_key) for e in file_list if 'f7' in e[1].lower()])
p = multiprocessing.Pool(7)
p.starmap(unzip_s3_obj, subprocess_map)
unzip_s3()
Also, I use later the library xmltodict from python to convert the xml files into json and join them into 20k record, reducing the quantity of files from 200k files to 10 files, that speed up the process of transform and manipulate them later, but the process of reading 200k xml files again its very slow and can’t find a code example or solution using EMR.
2
Answers
The following is a solution in scala. I had to do this before in my job. So I am extracting the relevant bits here.
Few important points to keep in mind.
If possible in your workflow, try to do a tar.gz of your files instead of zip. Because I tried it only with that format.
Secondly, repartition the rdd
numPartitionsProvided
to a suitably large number so that all your executors are utilized.ZipFileReader.scala
I have created a tar.gz file using the following command.
tar -czvf myfirst.tar.gz username_files
username_files
folder contains the following files as example.Upload the myfirst.tar.gz to hdfs locally to try it out and check if everything is working ok.
Actually porting my previous answer from tarred gzipped archive to zipped archive wasn’t that difficult.
Important point(s) to keep in mind.
Repartition the rdd
numPartitionsProvided
to a suitably large number so that all your executors are utilized.ZipFileReader.scala
username_files
folder contains the following files as example.