skip to Main Content

I am doing some heavy processing that needs async methods. One of my methods returns a list of dictionaries that needs to go through heavy processing prior to adding it to another awaitable object. ie.

def cpu_bound_task_here(record):
    ```some complicated preprocessing of record```
    return record

After the answer given below by the kind person, my code is now just stuck.

async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()

while True:
    try:
        records = await obj.receive()
        if not records:
            continue

        record_count += len(records)
        

So what the above function does, is its streaming values asynchronously and does some heavy processing prior to pushing to redis indefinitely. I made the necessary changes and now I’m stuck.

2

Answers


  1. As that output tells you, run_in_executor returns a Future. You need to await it to get its result.

    record = await loop.run_in_executor(
        None, something_cpu_bound_task_here, record
    )
    

    Note that any arguments to something_cpu_bound_task_here need to be passed to run_in_executor.

    Additionally, as you’ve mentioned that this is a CPU-bound task, you’ll want to make sure you’re using a concurrent.futures.ProcessPoolExecutor. Unless you’ve called loop.set_default_executor somewhere, the default is an instance of ThreadPoolExecutor.

    with ProcessPoolExecutor() as executor:
        for record in records:
            record = await loop.run_in_executor(
                executor, something_cpu_bound_task_here, record
            )
    

    Finally, your while loop is effectively running synchronously. You need to wait for the future and then for obj.add before moving on to process the next item in records. You might want to restructure your code a bit and use something like gather to allow for some concurrency.

    async def process_record(record, obj, loop, executor):
        record = await loop.run_in_executor(
            executor, something_cpu_bound_task_here, record
        )
        await obj.add(record)
    
    async def fun():
        loop = asyncio.get_running_loop()
        records = await receive()
        with ProcessPoolExecutor() as executor:
            await asyncio.gather(
                *[process_record(record, obj, loop, executor) for record in records]
            )
            
    

    I’m not sure how to handle obj since that isn’t defined in your example, but I’m sure you can figure that out.

    Login or Signup to reply.
  2. Check out the library Pypeln, it is perfect for streaming tasks between process, thread, and asyncio pools:

    import pypeln as pl
    data = get_iterable()
    data = pl.task.map(f1, data, workers=100) # asyncio
    data = pl.thread.flat_map(f2, data, workers=10)
    data = filter(f3, data)
    data = pl.process.map(f4, data, workers=5, maxsize=200)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search