skip to Main Content

I have multiple zip files called f1.zip, f2.zip, … f7.zip each contain around 200k xml files and I am using this code to multiprocess to unzip them in parallel, but even if each file is very small, the read/write process makes the unzipping really slow and I wanted to know if exist a way of using AWS EMR to speed up the unzipping process.

import os, boto3, zipfile
import sys
import multiprocessing
from io import BytesIO

sys.path.insert(0, './src')

def list_s3_obj(bucket, prefix, extension):
    s3 = boto3.resource('s3')
    raw_bucket = s3.Bucket(bucket)
    file_list = []

    for file in raw_bucket.objects.filter(Prefix=prefix):
        if os.path.join(file.key).endswith(extension):
            file_list.append((bucket, file.key))

    return file_list

def unzip_s3_obj(bucket, zip_key, unzip_key):
    print('Unzip | ' + zip_key)

    MB = 1024 ** 2
    config = boto3.s3.transfer.TransferConfig(
        use_threads = True,
        multipart_threshold = 1024*MB,
        max_concurrency = 4,
        io_chunksize = 300*MB,
    )
    s3 = boto3.resource('s3')

    buffer = BytesIO()
    zip_obj = s3.Object(bucket_name=bucket ,key=zip_key)
    zip_obj.download_fileobj(Fileobj=buffer, Config=config)
    z = zipfile.ZipFile(buffer)

    for filename in z.namelist():
        print('Extracting | ' + filename)
        file_info = z.getinfo(filename)
        s3.meta.client.upload_fileobj(
            z.open(filename),
            Bucket=bucket,
            Key=unzip_key + filename,
            Config=config
        )

current_key = 'Folder_spot'


bucket = 's3-bucket'
raw_key = f'{current_key}/zip/'
unzip_f1_key = f'{current_key}/unzip/f1/'
unzip_f2_key = f'{current_key}/unzip/f2/'
unzip_f3_key = f'{current_key}/unzip/f3/'
unzip_f4_key = f'{current_key}/unzip/f4/'
unzip_f5_key = f'{current_key}/unzip/f5/'
unzip_f6_key = f'{current_key}/unzip/f6/'
unzip_f7_key = f'{current_key}/unzip/f7/'


def unzip_s3():
    file_list = list_s3_obj(bucket, raw_key, '.zip')

    subprocess_map = [ (*e, unzip_f1_key) for e in file_list if 'f1' in e[1].lower()]
    subprocess_map.extend([ (*e, unzip_f2_key) for e in file_list if 'f2' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f3_key) for e in file_list if 'f3' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f4_key) for e in file_list if 'f4' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f5_key) for e in file_list if 'f5' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f6_key) for e in file_list if 'f6' in e[1].lower()])
    subprocess_map.extend([ (*e, unzip_f7_key) for e in file_list if 'f7' in e[1].lower()])

    p = multiprocessing.Pool(7)
    p.starmap(unzip_s3_obj, subprocess_map)

unzip_s3()

Also, I use later the library xmltodict from python to convert the xml files into json and join them into 20k record, reducing the quantity of files from 200k files to 10 files, that speed up the process of transform and manipulate them later, but the process of reading 200k xml files again its very slow and can’t find a code example or solution using EMR.

2

Answers


  1. The following is a solution in scala. I had to do this before in my job. So I am extracting the relevant bits here.

    Few important points to keep in mind.

    If possible in your workflow, try to do a tar.gz of your files instead of zip. Because I tried it only with that format.

    Secondly, repartition the rdd numPartitionsProvided to a suitably large number so that all your executors are utilized.

    ZipFileReader.scala

    import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
    import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
    
    import org.apache.spark.input.PortableDataStream
    import org.apache.spark.sql.SparkSession
    
    import java.nio.charset.StandardCharsets
    import scala.util.Try
    
    object ZipFileReader {
    
      def decode(bytes: Array[Byte]) =
        new String(bytes, StandardCharsets.UTF_8)
    
    
      def extractFilesCSV(ps: PortableDataStream, n: Int = 1024) = Try {
        val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
        Stream.continually(Option(tar.getNextTarEntry))
          // Read until next exntry is null
          .takeWhile(_.isDefined)
          // flatten
          .flatMap(x => x)
          // Drop directories
          .filter(!_.isDirectory)
          .filter(ele => ele.getName.split("/")(1).endsWith(".csv"))
          .map(e => {
            println("This could be name", e.getName)
            (e.getName.split("/")(1),
              Stream.continually {
                  // Read n bytes
                  val buffer = Array.fill[Byte](n)(-1)
                  val i = tar.read(buffer, 0, n)
                  (i, buffer.take(i))}
                // Take as long as we have read something
                .takeWhile(_._1 > 0)
                .map(_._2)
                .flatten
                .toArray)})
          .toArray
      }
    
    
      def extractFilesJSON(ps: PortableDataStream, n: Int = 1024) = Try {
        val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
        Stream.continually(Option(tar.getNextTarEntry))
          // Read until next exntry is null
          .takeWhile(_.isDefined)
          // flatten
          .flatMap(x => x)
          // Drop directories
          .filter(!_.isDirectory)
          .filter(ele => ele.getName.split("/")(1).endsWith(".json"))
          .map(e => {
            println("This could be name", e.getName)
            (e.getName.split("/")(1),
              Stream.continually {
                  // Read n bytes
                  val buffer = Array.fill[Byte](n)(-1)
                  val i = tar.read(buffer, 0, n)
                  (i, buffer.take(i))}
                // Take as long as we have read something
                .takeWhile(_._1 > 0)
                .map(_._2)
                .flatten
                .toArray)})
          .toArray
      }
    
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        val datafilename = "/zip-loc/myfirst.tar.gz"
        val output_path = "/zip-loc/output1/"
    
    
    
    
        val readInRawCSVFiles = spark.sparkContext.binaryFiles(datafilename).flatMapValues(x =>
          extractFilesCSV(x).toOption).map(ele => ele._2.map(inner_ele => (inner_ele._1, decode(inner_ele._2) )))
    
        val rawDataCSVParallelized = spark.sparkContext.parallelize(readInRawCSVFiles.first())
    
        //make this a large number so that all your executors are utilized
        val numPartitionsProvided = 3
        println("Writing out the files to output_path")
        rawDataCSVParallelized.repartition(numPartitions = numPartitionsProvided).map(ele => ele._2).saveAsTextFile(output_path)
    
    
        // The following code is only for sanity checking and testing everything is going smoothly
        // The following code can be removed from production file.
        rawDataCSVParallelized.foreach(ele => println("fileName within .tar.gz", ele._1))
    
        val countRawDataCSVParallelized = rawDataCSVParallelized.count()
        println(s"Count of the csv input files = $countRawDataCSVParallelized")
    
    
        val dataCollected = rawDataCSVParallelized.collect()
        val username1_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username1.csv")).head
        println("username1_csv")
        println(username1_csv)
        val username2_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username2.csv")).head
        println("username2_csv")
        println(username2_csv)
    
      }
    
    }
    

    I have created a tar.gz file using the following command.

    tar -czvf myfirst.tar.gz username_files

    username_files folder contains the following files as example.

    username1.csv
    username2.csv
    username3.csv
    username4.csv
    username5.csv
    username6.csv
    

    Upload the myfirst.tar.gz to hdfs locally to try it out and check if everything is working ok.

    Login or Signup to reply.
  2. Actually porting my previous answer from tarred gzipped archive to zipped archive wasn’t that difficult.

    Important point(s) to keep in mind.

    Repartition the rdd numPartitionsProvided to a suitably large number so that all your executors are utilized.

    ZipFileReader.scala

    import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
    import org.apache.spark.input.PortableDataStream
    import org.apache.spark.sql.SparkSession
    
    import java.nio.charset.StandardCharsets
    import scala.util.Try
    
    object ZipFileReader {
    
      def decode(bytes: Array[Byte]) =
        new String(bytes, StandardCharsets.UTF_8)
    
    
      def extractZippedFilesCSV(ps: PortableDataStream, n: Int = 1024) = Try {
        val zip = new ZipArchiveInputStream(ps.open)
        Stream.continually(Option(zip.getNextEntry))
          // Read until next exntry is null
          .takeWhile(_.isDefined)
          // flatten
          .flatMap(x => x)
          // Drop directories
          .filter(!_.isDirectory)
          .filter(ele => ele.getName.split("/")(1).endsWith(".csv"))
          .map(e => {
            println("This could be name", e.getName)
            (e.getName.split("/")(1),
              Stream.continually {
                  // Read n bytes
                  val buffer = Array.fill[Byte](n)(-1)
                  val i = zip.read(buffer, 0, n)
                  (i, buffer.take(i))}
                // Take as long as we have read something
                .takeWhile(_._1 > 0)
                .map(_._2)
                .flatten
                .toArray)})
          .toArray
      }
    
    
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        val datafilename_zipped = "/zip-loc/username_files.zip"
    
        val output_path = "/zip-loc/output1/"
    
    
        val readInRawCSVFiles = spark.sparkContext.binaryFiles(datafilename_zipped).flatMapValues(x =>
          extractZippedFilesCSV(x).toOption).map(ele => ele._2.map(inner_ele => (inner_ele._1, decode(inner_ele._2) )))
    
    
        val rawDataCSVParallelized = spark.sparkContext.parallelize(readInRawCSVFiles.first())
    
        //make this a large number so that all your executors are utilized
        val numPartitionsProvided = 3
        println("Writing out the files to output_path")
        rawDataCSVParallelized.repartition(numPartitions = numPartitionsProvided).map(ele => ele._2).saveAsTextFile(output_path)
    
    
        // The following code is only for sanity checking and testing everything is going smoothly
        // The following code can be removed from production file.
        rawDataCSVParallelized.foreach(ele => println("fileName within .zip", ele._1))
    
        val countRawDataCSVParallelized = rawDataCSVParallelized.count()
        println(s"Count of the csv input files = $countRawDataCSVParallelized")
    
    
        val dataCollected = rawDataCSVParallelized.collect()
        val username1_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username1.csv")).head
        println("username1_csv")
        println(username1_csv)
        val username2_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username2.csv")).head
        println("username2_csv")
        println(username2_csv)
    
      }
    
    }
    

    username_files folder contains the following files as example.

    username1.csv
    username2.csv
    username3.csv
    username4.csv
    username5.csv
    username6.csv
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search