I am doing some heavy processing that needs async methods. One of my methods returns a list of dictionaries that needs to go through heavy processing prior to adding it to another awaitable object. ie.
def cpu_bound_task_here(record):
```some complicated preprocessing of record```
return record
After the answer given below by the kind person, my code is now just stuck.
async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()
while True:
try:
records = await obj.receive()
if not records:
continue
record_count += len(records)
So what the above function does, is its streaming values asynchronously and does some heavy processing prior to pushing to redis indefinitely. I made the necessary changes and now I’m stuck.
2
Answers
As that output tells you,
run_in_executor
returns aFuture
. You need to await it to get its result.Note that any arguments to
something_cpu_bound_task_here
need to be passed torun_in_executor
.Additionally, as you’ve mentioned that this is a CPU-bound task, you’ll want to make sure you’re using a
concurrent.futures.ProcessPoolExecutor
. Unless you’ve calledloop.set_default_executor
somewhere, the default is an instance ofThreadPoolExecutor
.Finally, your while loop is effectively running synchronously. You need to wait for the future and then for
obj.add
before moving on to process the next item inrecords
. You might want to restructure your code a bit and use something likegather
to allow for some concurrency.I’m not sure how to handle
obj
since that isn’t defined in your example, but I’m sure you can figure that out.Check out the library Pypeln, it is perfect for streaming tasks between process, thread, and asyncio pools: