skip to Main Content

I would liketo listen only to 3 collections in a database: c1, c2, c3. I was not able to figureout how to limit listening to these 3 collections only. Below is my code.

  1. i would like to ignore this error and proceed further. How to do it? In this case the cursor itself is not getting created.
  2. Like i said previously, is there a way to limit the listening to the collections c1, c2 c3 collections only?– on the db side. Below code is listening to the full db and then filtering the collections on the java side.
        List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete", "update"))));
        MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
        String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
        if (resumeTokenStr == null) {
            cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
        } else {
            BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
            cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
        }
        return cursor;

The above code throws the below error

com.mongodb.MongoCommandException: Command failed with error 10334 (BSONObjectTooLarge): 'BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }' on server crm-mongo-report01.prod.phenom.local:27017. The full response is {"operationTime": {"$timestamp": {"t": 1664707966, "i": 25}}, "ok": 0.0, "errmsg": "BSONObj size: 16795345 (0x10046D1) is invalid. Size must be between 0 and 16793600(16MB) First element: _id: { _data: "826337A73B0000000A2B022C0100296E5A1004B317A529F739433BA840730515AC0EAC46645F6964006462624E8146E0FB000934F6560004" }", "code": 10334, "codeName": "BSONObjectTooLarge", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1664707966, "i": 26}}, "signature": {"hash": {"$binary": {"base64": "NZDJKhCse19Eud88kNh7XRWRgas=", "subType": "00"}}, "keyId": 7113062344413937666}}}
    at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:413)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:337)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:644)
    at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:240)
    at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:226)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:116)
    at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:345)
    at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:232)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:214)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:575)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:574)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:600)
    at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:573)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:211)
    at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:217)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:197)
    at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
    at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:347)
    at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:343)
    at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:538)
    at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:343)
    at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:221)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:174)
    at com.company.cdc.services.CDCMain.getCursorAtResumeToken(CdcServiceMain.java:217)

line 217 points to the line : cursor = mongoClient.watch(pipeline).batchSize(1).maxAwaitTime(60, TimeUnit.SECONDS).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();

2

Answers


  1. Chosen as BEST ANSWER

    This is how i ended up solving the problem. (just incase somebody else is searching for solution to-this problem)

    1. removed FullDocument.UPDATE_LOOKUP when creating the cursor. So now my code looks like cursor = mongoClient.watch(pipeline).batchSize(20000).cursor(); --Now this avoided the gigantic column that may endup in the document which was eventually error-ing out. This worked.

    2. In my case i didnt have to listen to the collection updates which had this bad data. So i modified my cursor to listen only on the databases and collections of my interest --instead of listening on the entire database and then ignoring the unwanted collections later. Below is the code

    3. When writing to the destination i had done bulk lookup on mongo db, constructed the full document and then written it --This approach of lazy lookup reduced a lot of memory footprint of the java program.

    
        private List<Bson> generatePipeline(CdcConfigs cdcConfig) {
            List<String> whiteListedCollections = getWhitelistedCollection(cdcConfig);
            List<String> whiteListedDbs = getWhitelistedDbs(cdcConfig);
            log.info("whitelisted dbs:" + whiteListedDbs + " coll:" + whiteListedCollections);
            List<Bson> pipeline;
            if (whiteListedDbs.size() > 0)
                pipeline = singletonList(match(and(
                        in("ns.db", whiteListedDbs),
                        in("ns.coll", whiteListedCollections),
                        in("operationType", asList("insert", "delete", "update")))));
            else
                pipeline = singletonList(match(and(
                        in("ns.coll", whiteListedCollections),
                        in("operationType", asList("insert", "delete", "update")))));
            return pipeline;
        }
    
        private MongoChangeStreamCursor<ChangeStreamDocument<Document>> getCursorAtResumeToken(CdcConfigs cdcConfig, MongoClient mongoClient) {
            List<Bson> pipeline = generatePipeline(cdcConfig);
            MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
            String resumeTokenStr = getResumeTokenFromS3(cdcConfig);
            if (resumeTokenStr == null) {
    //            cursor = mongoClient.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
                cursor = mongoClient.watch(pipeline).batchSize(20000).cursor();
                log.warn("RESUME TOKEN IS NULL. READING CDC FROM CURRENT TIMESTAMP FROM MONGO DB !!! ");
            } else {
                BsonDocument resumeToken = BsonDocument.parse(resumeTokenStr);
                cursor = mongoClient.watch(pipeline).batchSize(20000).maxAwaitTime(30, TimeUnit.MINUTES).startAfter(resumeToken).cursor();
    //            cursor = mongoClient.watch(pipeline).startAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
            }
            return cursor;
        }
    
    

    The solutions available sofar are more inclined towards python code. So it was a challange translating them into Java.


  2. My case is a little bit different. I have a trigger used to align documents from a collection to another, with the option to include the fullDocument in the change event. Problem is that the metadata added to the event, combined with the fullDocument, exceeds the BSON max size (16MB).

    I solved removing the fullDocument from the trigger and getting the document from the collection itself performing an aggregate:

       collection1.aggregate([
                {$match: {_id: changeEvent.documentKey._id}},
                { $merge: { into: "collection", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
              ]);
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search