I want to continuously update a document’s field named value
{'_id': 'count', 'value':0}
by a certain number.
My MongoSinkConnector has
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
I’m using a python script to produce messages to the appropiate topic
self._aio_producer.produce(
topic='mongo',
value=json.dumps(
{
"_id":"count",
"$inc":{"value":len(task['payload'].split(','))}
}
)
)
But I get this error on the Kafka Connect standalone process:
Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}]
(com.mongodb.kafka.connect.sink.MongoSinkTask:244)
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={}
I’ve tried dropping the $inc
part, but it seems to just be replacing the document over and over without incrementing the value. Is there any way to increment a value or do I have to write my own custom Class?
2
Answers
Currently the way is to create your own Custom WriteModel Strategy.
In general your code should work fine. Here the equivalent in
mongosh
(JavaScript):Have a look at Kafka documentation, perhaps
$inc
operator is not supported.