How to stop a mongodb changestream temporarily and resume it again?
public Flux<Example> watch() {
final ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.builder().returnFullDocumentOnUpdate().build();
return reactiveMongoTemplate.changeStream("collection", changeStreamOptions, Example.class)
.filter(e -> e.getOperationType() != null)
.mapNotNull(ChangeStreamEvent::getBody);
}
I’m trying to create a rest endpoint that should be able to stop the changestream for sometime while we do some database maintenance and then invoke the endpoint again to resume the stream from where it left off using resume token.
2
Answers
I found the solution to unsubscribe/stop changestream
I am not a MongoDB expert but this is what I understood from one, hope I got it right and I am using the plain Java driver API for easier readability:
The time window from the moment you receive the event until you save it should be very small but it may happen the process crashes before you save the continuation _id. If you have operations that are sensitive to that time window, then those operations should be idempotent so that in case you replay an already received event your data will not be affected.
It would have been nice for the Mongo server to keep track of the current offset for all change streams and uniquely identify clients. This is not possible now and this is why Mongo provides and asks for the resume token.