Here is my configuration I am using –
{
"name": "mongo-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://mongo:27017/?replicaSet=rs0",
"database.include.list": "sample",
"collection.include.list": "sample.workflows,sample.simulations",
"topic.prefix": "mongo",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true
}
}
Here is the document that has been inserted in Mongo –
{
"_id": {
"$oid": "676d8e51105e01702fe9496c"
},
"name": "workflow 3"
}
Here is the corresponding Kafka event inspected through Kowl –
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "before"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": true,
"field": "removedFields"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "updatedFields"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "field"
},
{
"type": "int32",
"optional": false,
"field": "size"
}
],
"optional": false,
"name": "io.debezium.connector.mongodb.changestream.truncatedarray",
"version": 1
},
"optional": true,
"field": "truncatedArrays"
}
],
"optional": true,
"name": "io.debezium.connector.mongodb.changestream.updatedescription",
"version": 1,
"field": "updateDescription"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "string",
"optional": true,
"field": "lsid"
},
{
"type": "int64",
"optional": true,
"field": "txnNumber"
},
{
"type": "int64",
"optional": true,
"field": "wallTime"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "mongo.sample.workflows.Envelope"
},
"payload": {
"before": null,
"after": "{"_id": {"$oid": "676d8e51105e01702fe9496c"},"name": "workflow 3"}",
"updateDescription": null,
"source": {
"version": "3.0.6.Final",
"connector": "mongodb",
"name": "mongo",
"ts_ms": 1735233105000,
"snapshot": "false",
"db": "sample",
"sequence": null,
"ts_us": 1735233105000000,
"ts_ns": 1735233105000000000,
"collection": "workflows",
"ord": 1,
"lsid": null,
"txnNumber": null,
"wallTime": 1735233105571
},
"op": "c",
"ts_ms": 1735233105616,
"transaction": null
}
}
See the payload.after
field is a String
instead of a plain Json object. Due to this, I cannot apply unwrap transform with type ExtractNewRecordState
. It throws the following error –
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [source field insertion], found: java.lang.String
What could be the reason behind this problem? It would be really helpful if someone points me to the right direction troubleshooting this.
Note: Version of the connector 3.0.6.Final
2
Answers
It turned out, the
after
field should be a String according to the documentation. The problem with the transform I was facing was due to using a wrong class. I was usingio.debezium.connector.transforms.ExtractNewDocumentState
, instead we have to useio.debezium.connector.mongodb.transforms.ExtractNewDocumentState
.The issue where the after field in your Kafka message appears as a String instead of a JSON object is likely due to the key.converter or value.converter configuration in your Debezium Kafka Connector.
There are 2 possible rootcauses: 1- Use of key.converter or value.converter and 2- schema misalignment
You need to modify your Debezium connector configuration to ensure that the after field is emitted as a JSON object (a Struct) rather than a raw string.
so add the following config to your connector:
the add following transformations:
dont forget to restart the connector uppon above changes.