I would liketo listen only to 3 collections in a database: c1, c2, c3. I was not able to figureout how to limit listening to these 3 collections only. Below is my code.
- i would like to ignore this error and proceed further. How to do it? In this case the cursor itself is not getting created.
- Like i said previously, is there a way to limit the listening to the collections
c1
,c2
c3
collections only?– on the db side. Below code is listening to the full db and then filtering the collections on the java side.
List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete", "update"))));
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
if (resumeTokenStr == null) {
cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
} else {
BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
}
return cursor;
The above code throws the below error
com.mongodb.MongoCommandException: Command failed with error 10334 (BSONObjectTooLarge): 'BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }' on server crm-mongo-report01.prod.phenom.local:27017. The full response is {"operationTime": {"$timestamp": {"t": 1664707966, "i": 25}}, "ok": 0.0, "errmsg": "BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }", "code": 10334, "codeName": "BSONObjectTooLarge", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1664707966, "i": 26}}, "signature": {"hash": {"$binary": {"base64": "NZDJKhCse19Eud88kNh7XRWRgas=", "subType": "00"}}, "keyId": 7113062344413937666}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:644)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:347)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:538)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:343)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:221)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:174)
at com.company.cdc.services.CDCMain.getCursorAtResumeToken(CdcServiceMain.java:217)
line 217 points to the line : cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
2
Answers
This is how i ended up solving the problem. (just incase somebody else is searching for solution to-this problem)
removed
FullDocument.UPDATE_LOOKUP
when creating the cursor. So now my code looks likecursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
--Now this avoided the gigantic column that may endup in the document which was eventually error-ing out. This worked.In my case i didnt have to listen to the collection updates which had this bad data. So i modified my cursor to listen only on the databases and collections of my interest --instead of listening on the entire database and then ignoring the unwanted collections later. Below is the code
When writing to the destination i had done bulk lookup on mongo db, constructed the full document and then written it --This approach of lazy lookup reduced a lot of memory footprint of the java program.
The solutions available sofar are more inclined towards python code. So it was a challange translating them into Java.
My case is a little bit different. I have a trigger used to align documents from a collection to another, with the option to include the fullDocument in the change event. Problem is that the metadata added to the event, combined with the fullDocument, exceeds the BSON max size (16MB).
I solved removing the fullDocument from the trigger and getting the document from the collection itself performing an aggregate: