skip to Main Content

How to stop a mongodb changestream temporarily and resume it again?

public Flux<Example> watch() {
  final ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.builder().returnFullDocumentOnUpdate().build();
  return reactiveMongoTemplate.changeStream("collection", changeStreamOptions, Example.class)
          .filter(e -> e.getOperationType() != null)
          .mapNotNull(ChangeStreamEvent::getBody);
}

I’m trying to create a rest endpoint that should be able to stop the changestream for sometime while we do some database maintenance and then invoke the endpoint again to resume the stream from where it left off using resume token.

2

Answers


  1. Chosen as BEST ANSWER

    I found the solution to unsubscribe/stop changestream

    Disposable subscription = service.watch()
           .subscribe(exampleService::doSomething)
    
    // cancel the subscription       
    subscription.dispose();
    

  2. I am not a MongoDB expert but this is what I understood from one, hope I got it right and I am using the plain Java driver API for easier readability:

    // 1. Open, consume, close, save token
    MongoCursor<ChangeStreamDocument<Document>> cursor = inventoryCollection.watch().iterator();
    ChangeStreamDocument<Document> next = cursor.next()
    BsonDocument resumeToken = next.getResumeToken();
    cursor.close();
    
    // 2. Save the resume token in the database, in case your process goes down for any reason during your pause. Otherwise, you will not know where to start resuming.
    ...
    
    // 3. When you want to reopen again start from the DB saved resumeToken
    cursor = inventoryCollection.watch().resumeAfter(resumeToken).iterator();
    

    The time window from the moment you receive the event until you save it should be very small but it may happen the process crashes before you save the continuation _id. If you have operations that are sensitive to that time window, then those operations should be idempotent so that in case you replay an already received event your data will not be affected.

    It would have been nice for the Mongo server to keep track of the current offset for all change streams and uniquely identify clients. This is not possible now and this is why Mongo provides and asks for the resume token.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search