skip to Main Content

What is the difference b/w the behavior of resumeAt that accepts a timestamp to resume the notifications from vs resumeToken that accepts the resume token?

return ChangeStreamOptions.builder()
                    .filter(Aggregation.newAggregation(Example.class, matchOperationType))
                    .resumeAt(Instant.ofEpochSecond(1675303335)) // this is simply a unix timestamp
                    .resumeToken(tokenDoc) // resume token saved from previous notification
                    .returnFullDocumentOnUpdate().build();

In case the application crashes/restarted would be ideal/simple to just pass in an unix timestamp of a reasonable past time (ranging from few hours to few days) vs building application logic to save the token of every last successfully processed message?

2

Answers


  1. Both resumeAt and resumeAfter accept either a BsonTimestamp or token.

    The documentation describes them as:

    resumeAt – Resume the change stream at a given point.
    resumeAfter – Resume the change stream after a given point.

    Effectively the same difference as using > vs >= to compare the timestamp.

    Login or Signup to reply.
  2. I think the resume token is a safer bet because time is relative.
    Still if your subsequent operations are idempotent, meaning you do not have a problem consuming previously seen changes (and this should be the case anyway) probably the time should be also good.

    Example logic:

    // 1. Open, consume, close, save token
    MongoCursor<ChangeStreamDocument<Document>> cursor = inventoryCollection.watch().iterator();
    ChangeStreamDocument<Document> next = cursor.next()
    BsonDocument resumeToken = next.getResumeToken();
    cursor.close();
    
    // 2. Save the resume token in the database, in case your process goes down for any reason during your pause. Otherwise, you will not know where to start resuming.
    ...
    
    // 3. When you want to reopen again start from the DB saved resumeToken
    cursor = inventoryCollection.watch().resumeAfter(dbSavedResumeToken).iterator();
    

    The time window from the moment you receive the event until you save it should be very small but (with a 0.00…1 chance) it may happen the process crashes before you save the continuation _id. If you have operations that are sensitive to that time window, then those operations should be idempotent so that in case you replay an already received event your data will not be affected.

    It would have been nice for the Mongo server to keep track of the current offset for all change streams and uniquely identify clients. This is not possible now and this is why Mongo provides and asks for the resume token.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search