skip to Main Content

I am using a mongodb change stream that listens for changes on a collection and runs a handler function that handles the change. I want each change coming in to be handled concurrently

async def handle_collection_changes(
    *,
    change_stream,
    handler,
    handler_args,
    db,
    collection_name,
    service_name,
):
    try:
        for change in change_stream:
            asyncio.create_task(
                handler(change, *handler_args),
            )

    except KeyboardInterrupt:
        log_debug("keyboard interrupt detected, closing stream")
    except Exception as e:
        log_critical(f"unexpected error in change stream: {repr(e)}")

in the above snippet, the handler function never starts executing. however if I await it in place like below, it works

async def handle_collection_changes(
    *,
    change_stream,
    handler,
    handler_args,
    db,
    collection_name,
    service_name,
):
    try:
        for change in change_stream:
            task = asyncio.create_task(
                handler(change, *handler_args),
            )
            
            await task
    except KeyboardInterrupt:
        log_debug("keyboard interrupt detected, closing stream")
    except Exception as e:
        log_critical(f"unexpected error in change stream: {repr(e)}")

but the above solution doesn’t fix the problem as I’m waiting for each task to be completed before moving on to the next change

I also tried something like this:

async def handle_collection_changes(
    *,
    change_stream,
    handler,
    handler_args,
    db,
    collection_name,
    service_name,
    ):
    try:
        tasks = []

        for change in change_stream:
            asyncio.create_task(
                handler(change, *handler_args),
            )
    
            save_resume_token(db, collection_name, change["_id"], service_name)
    
        for task in tasks:
            await task
    
    except KeyboardInterrupt:
        log_debug("keyboard interrupt detected, closing stream")
    except Exception as e:
        log_critical(f"unexpected error in change stream: {repr(e)}")

but since the change stream loop is never exited, the execution never gets to the await instruction so all the tasks are being queued, but none of them ever start executing

2

Answers


  1. Calling asyncio.create_task does NOT begin the Task’s execution. The Task is created and becomes "pending." It will not begin to execute until your code reaches an await expression.

    In your second code snippet you await on the newly created Task. When you do that, the execution of your main code will be suspended at that point until the Task runs to completion.

    One solution is to await something else, instead of the new Task. For example:

    async def handle_collection_changes(
        *,
        change_stream,
        handler,
        handler_args,
        db,
        collection_name,
        service_name,
    ):
        try:
            for change in change_stream:
                asyncio.create_task(handler(change, *handler_args),)
                await asyncio.sleep(0)
        except KeyboardInterrupt:
            log_debug("keyboard interrupt detected, closing stream")
        except Exception as e:
            log_critical(f"unexpected error in change stream: {repr(e)}")
    

    The asyncio.sleep(0) function does nothing, but it allows other Tasks to run if they are ready. As soon as all the Tasks have executed up to their next await expression, your main code will resume. So all of the Tasks you create in your for loop have an opportunity to run concurrently.

    This is not beautiful code, but it will accomplish what you are trying to do with minimal changes.

    Login or Signup to reply.
  2. Why is the "the change stream loop is never exited"? If you don’t respond to comments that require a response, you are only making it more difficult for people to help you. Also, if you receive one or more answers that resolves your question, you should select the best answer and accept it (see What should I do when someone answers my question?). When you have increased your reputation, you can also upvote one or more questions that are useful.

    If you are running Python 3.12 or later, you can do:

    async def handle_collection_changes(...):
        loop = asyncio.get_running_loop
        loop.set_task_factory(asyncio.eager_task_factory)
    

    See asyncio.eager_task_factory. Now when you execute task = asyncio.create_task(coro.), coro will be immediately invoked. The new task will not be added to the event loop (it’s as if a synchronous call were being made to coro) and run until either it completes or until it issues a blocking await expression in which case it will then be added to the event loop like any regular task. Setting this factory will of course result in all tasks being created "eagerly". This is generally not a problem, but if it is then instantiate the tasks that you want to start "eagerly" with:

    loop = asyncio.get_running_loop()
    task = asyncio.Task(coro, loop=loop, eager_start=True)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search