skip to Main Content

I have some working code that I need to improve the run time on dramatically and I am pretty lost. Essentially, I will get zip folders containing tens of thousands of json files, each containing roughly 1,000 json messages. There are about 15 different types of json objects interspersed in each of these files and some of those objects have lists of dictionaries inside of them while others are pretty simple. I need to read in all the data, parse the objects and pull out the relevant information, and then pass that parsed data back and insert it into a different program using an API for a third party software (kind of a wrapper around a proprietary implementation of SQL).

So I have code that does all of that. The problem is it takes around 4-5 hours to run each time and I need to get that closer to 30 minutes.

My current code relies heavily on asyncio. I use that to get some concurrency, particularly while reading the json files. I have also started to profile my code and have so far moved to using orjson to read in the data from each file and rewrote each of my parser functions in cython to get some improvements on that side as well. However, I use asyncio queues to pass stuff back and forth and my profiler shows a lot of time is spent just in the queue.get and queue.put calls. I also looked into msgspec to improving reading in the json data and while that was faster, it became slower when I had to send the msgspec.Struct objects into my cython code and use them instead of just a dictionary.

So was just hoping for some general help on how to improve this process. I have read about multiprocessing both with multiprocessing.pools and concurrent.futures but both of those turned out to be slower than my current implementation. I was thinking maybe I need to change how I pass stuff through the queues so I passed the full json data for each file instead of each individual message (about 1,000 documents each) but that didn’t help.

I have read so many SO questions/answers but it seems like a lot of people have very uniform json data (not 15 different message types). I looked into batching but I don’t fully understand how that changes things – that was what I was doing using concurrent.futures but again it actually took longer.

Overall I would like to keep it as queues because in the future I would like to run this same process on streaming data, so that part would just take the place of the json reading and instead each message received over the stream would be put into the queue and everything else would work the same.

Some psuedo-code is included below.

main.py

import asyncio
from glob import glob
import orjson
from parser_dispatcher import ParserDispatcher
from sql_dispatcher import SqlDispatcher

async def load_json(file_path, queue):
    async with aiofiles.open(file_path, mode="rb") as f:
        data = await f.read()
        json_data = await asyncio.to_thread(orjson.loads(data))
        for msg in json_data:
            await queue.put(msg)

async def load_all_json_files(base_path, queue):
    file_list = glob(f"{base_path}/*.json")
    tasks = [load_json(file_path, queue) for file_path in file_list]
    await asyncio.gather(*tasks)
    await queue.put(None) # to end the processing

def main()
    base_path = "pathtojsonfolder"
    paser_queue = asyncio.queue()
    sql_queue = asyncio.queue()
    
    parser_dispatch = ParserDispatcher()
    sql_dispatch = SqlDispatcher()

    load_task = load_all_json_files(base_path, parser_queue)
    parser_task = parser_dispatch.process_queue(parser_queue, sql_queue)
    sql_task = sql_dispatch.process_queue(sql_queue)

    await asyncio.gather(load_task, parser_task, sqlr_task)

if __name__ -- "__main__":
    asyncio.run(main))

parser_dispatcher.py

import asyncio
import message_parsers as mp

class ParserDispatcher:
    def __init__(self):
        self.parsers = {
            ("1", "2", "3"): mp.parser1,
            .... etc
        } # this is a dictionary where keys are tuples and values are the parser functions

    def dispatch(self, msg):
        parser_key = tuple(msg.get("type"), msg.get("source"), msg.get("channel"))
        parser = self.parsers.get(parser_key)
        if parser:
            new_msg = parser(msg)
        else:
            new_msg = []
        return new_msg
    
    async def process_queue(self, parser_queue, sql_queue):
        while True:
            msg = await process_queue.get()
            if msg is None:
                await sql_put.put(None)
            process_queue.task_done()
            parsed_messages = self.dispatch(msg)
            for parsed_message in parsed_messages:
                await sql_queue.put(parsed_message)

sql_dispatcher.py

import asycnio
import proprietarySqlLibrary as sql

class SqlDispatcher:
    def __init__(self):
        # do all the connections to the DB in here

    async def process_queue(self, sql_queue):
        while True:
            msg = await sql_queue.get()
            # then go through and add this data to the DB
            # this part is also relatively slow but I'm focusing on the first half for now
            # since I don't have control over the DB stuff

2

Answers


  1. My first instinct is parallel processing from one machine won’t be much faster than single-threaded. I would guess the bulk of the time is spent processing the data rather than reading it from disk (which you could prove/disprove by running your programming and commenting out the pieces which process data, leaving only the pieces which read).

    This question might be relevant to yours. Python 3.13 introduced certain features to getting around the performance limitations of the GIL.

    If you have access to multiple machines you might consider farming out some of the processing to multiple machines using a queue (I like https://python-rq.org/). Even if this does not get you all the way home, the jobs on the queue might read the original files and write pre-digested ones back to disk, allowing your program to process pre-digested files which should theoretically run faster than your current program.

    Login or Signup to reply.
  2. This might improve performance, but whether significantly enough is still an open question:

    The parsing of JSON data is a CPU-bound task and by concurrently doing this parsing in a thread pool will not buy you anything unless orjson is implemented in C (probably) and releases the GIL (very questionable; see this).

    The code should therefore be re-arranged to submit the parsing of messages to a concurrent.futures.ProcessPoolExecutor instance in batches. This is the general idea, which could not be tested since I do not have access to the data. Note that I am not attempting to read in the JSON files concurrently, since it is unclear whether doing so would actually hurt performance instead of helping. You can always modify the code to use multiple tasks to perform this reading.

    ...
    from concurrent.futures import ProcessPoolExecutor
    
    def process_batch(file_data):
        msgs = []
        for data in file_data:
            msgs.extend(orjson.loads(data))
        return msgs
    
    async def load_json(executor, file_data, queue):
        loop = asyncio.get_running_loop()
        msgs = await loop.run_in_executor(executor, process_batch, file_data)
        for msg in msgs:
            await queue.put(msg)
    
    async def load_all_json_files(base_path, queue):
        # If you have the memory, ideally BATCH_SIZE should be:
        # ceil(number_of_files / (4 * number_of_cores)))
        # So if you had 50_000 files and 10 cores, then
        # BATCH_SIZE should be 1250
        
        BATCH_SIZE = 1_000
    
        # Use a multiprocessing pool:
        executor = ProcessPoolExecutor()
    
        tasks = []
        file_data = [] 
        file_list = glob(f"{base_path}/*.json")
    
        for file_path in file_list:
            async with aiofiles.open(file_path, mode="rb") as f:
                file_data.append(await f.read())
            if len(file_data) == BATCH_SIZE:
                tasks.append(asyncio.create_task(load_json(executor, file_data, queue)))
                file_data = []
        if file_data:
            tasks.append(asyncio.create_task(load_json(executor, file_data, queue)))
    
        await asyncio.gather(*tasks)   
        await queue.put(None) # to end the processing
    
    ...
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search