skip to Main Content

Use Case: Given a ~2GB .gz file with newline delimited json, manipulate each line and write output to zip file (csv)

Issue: The environment I am working with has ~1GB of memory and I do not have traditional access to the file system. The only way I can write to a file is by passing the entire data stream as a single object from memory (I cannot loop a generator and write to file)

My approach so far has been to loop through the data in my .gz file, modify the data, then compress it in memory and write it out after all data is processed. When I use chunking and do not manipulate the data this works. However, when I try to do this one line at a time it seems to run indefinitely and does not work.

Example gzip data:

{"ip": "1.1.1.1", "org": "cloudflare"}
{"ip": "2.2.2.2", "org": "chickenNugget"}

Note: that this is not true json, each line is valid json but this is NOT an array

Target Output:

value,description
1.1.1.1, cloudflare
2.2.2.2, chickenNugget

Example that works in a few seconds using chunking:

import gzip
chunksize = 100 * 1024 * 1024
with gzip.open('latest.json.gz', 'rt', encoding='utf8') as f:
    while True:
        chunk = f.read(chunksize)
        if not chunk:
            break
         compressed += gzip.compress(chunk.encode())

# I am able to use platforms internal file creation 
# process to create a zip with "compressed" variable - the issue here is that I cannot 
# reliably manipulate the data. 

What I tried but does NOT work

import gzip
compressed = 'value,description,expiration,activen'.encode()
with gzip.open('latest.json.gz', 'rt', encoding='utf8') as f:
    for line in f:
        obj = json.loads(line)
        data = f'{obj.get("ip")}{obj.get("organization")},,Truen'
            
        compressed += gzip.compress(data.encode())

# This code never seems to complete - I gave up after running for 3+ hours

EDIT
When I test the second example in an unconstrained environment it runs forever as well. However, if I modify the code like below to break after 10k lines it works as expected

... 
count = 0
for line in f:
    if count > 10000: break
    ...
    count += 1
   
    

Is there a better way to approach this?

2

Answers


  1. Your compressed += is the culprit. Python is copying compressed every time you concatenate to it. That takes O(n k) time instead of O(n), where n is the total size of the output and k is the number of pieces being concatenated. (Note that k is small for your 100MB chunks, but very large for your lines.) You need to write that data out or do something with it, instead of trying to accumulate it in a giant string in memory.

    If you absolutely cannot avoid making a giant string in memory, then you will need to allocate space for the compressed data as a mutable array and copy it into that space instead of concatenating. In order to not have to know how big the result is ahead of time, once you get a compressed result that would exceed the space you have left, double the size of the space and then copy into that. So long as you double each time (or multiply the size by some factor), the time spent copying will be O(n).

    The easiest approach in Python would be to use BytesIO, which should do the above for you. Just write the compressed data to a string as if it were a file.

    As I mention in a comment above, do not compress each little line separately as it’s own gzip stream. You will not be compressing the data. You will be expanding the data. Instead create a zlib.compressobj, send the lines to that, and get the compressed data back as it’s generated.

    Login or Signup to reply.
  2. It’s impossible to do that, because the size of the converted gzip bytes will be greater than 1GB. If you remove curly braces, double quotes, colons, and keys from the decompressed text and recompress the converted text, the size will be near 100% of the original size, never close to 50%, because removed characters in each line does not change.

    Anyway you can use the memory more effectively and compress faster by using the GZipFile stream like the following. (This compresses data only once as what Mr. Adler said.)

    import io
    import json
    import gzip
    
    bio = io.BytesIO()
    N = 4000 # Number of lines sent to a compression stream at once. Experiment and find a good one.
    
    with gzip.open('latest.json.gz', 'rt') as src_f, 
        gzip.GzipFile('latest.csv.gz', 'wb', fileobj=bio) as dst_f:
        lines = []
        def flush():
            if lines:
                dst_f.write((''.join(lines)).encode())
                lines.clear()
        def write(line):
            lines.append(line)
            if len(lines) >= N:
                flush()
    
        write('value,description,expiration,activen')
        for line in src_f:
            obj = json.loads(line)
            write(f'{obj.get("ip")}{obj.get("organization")},,Truen')
        flush()
    
    compressed = bio.getbuffer()
    ...
    

    You can optimize this using two threads or processes if you can use multiple cores.(One for decompressing data and converting a JSON line to a CSV line, and the other for compressing data)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search