I am trying to sink data from Kafka to Postgres and the following is what I have configured –
My zookeeper, Kafka, Schema registry & Kafka Connect is working fine
Even i have configured a Mongo Sink Connector which works fine.
The issue is only with Postgres connector
Kafka topic name – data-test-topic
Following is the data on this topic –
Key: "A"
{
"name": "abhishek",
"lastname": "Rathore"
}
Pictorial representation –
Following is the config i am using for JDBC Sink connector –
{
"name": "postgres-jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "data-test-topic",
"connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
"connection.user": "postgres",
"connection.password": "arathore",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "upsert",
"table.name.format": "init",
"pk.mode": "record_key",
"pk.fields": "name",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"key.converter.schemas.enable": false
}
}
Postgres table init struct –
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
----------+------+-----------+----------+---------+----------+-------------+--------------+-------------
name | text | | not null | | extended | | |
lastname | text | | not null | | extended | | |
key | text | | not null | | extended | | |
Access method: heap
The Error which JDBC sink connector produces –
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
... 11 more
I am not able to solve this problem even after trying different config values for sink connectors, please help
2
Answers
So found the following solution as suggested by @onecricketeer
topic data -
Sink connector config -
Now the data is syncing properly with Postgres
As the error says, the connector requires a Struct.
StringConverter returns a plain String, not a Struct, and therefore the data cannot be parsed to know what fields in your data match the database columns and types.
Refer Confluent blog on what Converter you should be using.
In particular,
org.apache.kafka.connect.json.StringConverter
is an invalid class name, andschemas.enable
does nothing for StringConverter, only JSONConverter, so you needBut this also requires you to change your data to actually have a schema.