skip to Main Content

I want to continuously update a document’s field named value
{'_id': 'count', 'value':0} by a certain number.

My MongoSinkConnector has

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

I’m using a python script to produce messages to the appropiate topic

        self._aio_producer.produce(
            topic='mongo',
            value=json.dumps(
                {
                    "_id":"count",
                    "$inc":{"value":len(task['payload'].split(','))}
                 }
            )
        )

But I get this error on the Kafka Connect standalone process:

Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}] 
(com.mongodb.kafka.connect.sink.MongoSinkTask:244) 
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={} 

I’ve tried dropping the $inc part, but it seems to just be replacing the document over and over without incrementing the value. Is there any way to increment a value or do I have to write my own custom Class?

2

Answers


  1. Chosen as BEST ANSWER

    Currently the way is to create your own Custom WriteModel Strategy.

        BsonDocument setOnInsertFields =
            new BsonDocument().append(A_FIELD_NAME, aValue).append(B_FIELD_NAME, bValue);
    
        BsonDocument incFields =
            new BsonDocument()
                .append(C_COUNT_FIELD_NAME, cType)
                .append(D_COUNT_FIELD_NAME, DType);
    
        // Create new document with specific update operations
        // such as set or setOnInsert, inc, etc
        BsonDocument newDocument =
            new BsonDocument().append("$setOnInsert", setOnInsertFields).append("$inc", incFields);
    

  2. In general your code should work fine. Here the equivalent in mongosh (JavaScript):

    task = { payload: 'a,b,c' }
    db.collection.insertOne({ _id: 'count', value: 0 })
    
    db.collection.updateOne(
       { _id: 'count' },
       { $inc: { value: task['payload'].split(',').length } }
    )
    
    db.collection.findOne()
    => { _id: 'count', value: 3 }
    

    Have a look at Kafka documentation, perhaps $inc operator is not supported.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search