skip to Main Content

I’ve been trying to implement a change stream that monitors a Mongo collection for new documents. While simple to setup for catching one change, I don’t understand how to keep the process running indefinitely.

db = pymongo_util.get_collection("DataDB","XYZ_Collection")
stream = db.watch(full_document="updateLookup"):
document = next(stream)  # it blocks here until a change happens.  Prints the change, and program ends
print(document)

My goal is to create a ‘listener’ for the database. Listen for new documents, and process those new documents. I’m not sure if asyncio is the way to go, threading, or if its something basic I’m missing.

It looks like I’m not the only one to ask, but none seem to have an answer:

  1. How to actually use pymongo ChangeStreams with Flask in a non-blocking way?

  2. Watch MongoDB Change Streams in Python asynchronous

2

Answers


  1. Both referred question were about how to implement asynchronous behaviour. If you are happy with blocking reads, just loop over the iterator:

    for document in stream:
        print(document)
    

    It won’t run indefinitely of course, but for some significant time. You will need to wrap it in try-except to catch cursor errors, and use resume_token to continue reading from where it aborted, but it’s another story.

    Login or Signup to reply.
  2. Like this:

    cursor = db.collection.watch()
    while True:
        document = next(cursor)
        # do stuff
    

    If you want it to work like traditional database triggers, you will need to run this continuously. It’s important to note that while you can run this as an always-running python process, processes die pretty easily. An alternative would be to run this every minute or so via a cron-job.

    You will need to save the resume token of the collection in a separate collection and pass it the next time you run this process

    # assuming you saved the token the last time you ran this
    resume_token = db.progress_colection.findOne(
        {collectionName: 'name'}
    )['resumeToken']
    cursor = db.collection.watch(
        resume_after=resume_token
    )
    
    # do stuff
    resume_token = cursor.resume_token
    
    # save the token to the collection
    
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search