skip to Main Content

I am trying to sink data from Kafka to Postgres and the following is what I have configured –

My zookeeper, Kafka, Schema registry & Kafka Connect is working fine
Even i have configured a Mongo Sink Connector which works fine.

The issue is only with Postgres connector

Kafka topic name – data-test-topic
Following is the data on this topic –

Key: "A"
{
    "name": "abhishek",
    "lastname": "Rathore"
}

Pictorial representation –
image
Following is the config i am using for JDBC Sink connector –

{
    "name": "postgres-jdbc-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "data-test-topic",
        "connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
        "connection.user": "postgres",
        "connection.password": "arathore",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "table.name.format": "init",
        "pk.mode": "record_key",
        "pk.fields": "name",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": false,
        "key.converter.schemas.enable": false
    }
}

Postgres table init struct –

  Column  | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
----------+------+-----------+----------+---------+----------+-------------+--------------+-------------
 name     | text |           | not null |         | extended |             |              | 
 lastname | text |           | not null |         | extended |             |              | 
 key      | text |           | not null |         | extended |             |              | 
Access method: heap

The Error which JDBC sink connector produces –

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    ... 11 more

I am not able to solve this problem even after trying different config values for sink connectors, please help

2

Answers


  1. Chosen as BEST ANSWER

    So found the following solution as suggested by @onecricketeer

    topic data -

    {
            schema: {
                type: 'struct',
                fields: [
                    {
                        field: 'name',
                        type: 'string',
                        optional: false,
                    },
                    {
                        field: 'lastname',
                        type: 'string',
                        optional: false,
                    },
                ],
            },
            payload: {
                name: 'abhishek',
                lastname: 'rathore',
            },
        }
    

    Sink connector config -

    {
        "name": "postgres-jdbc-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": 1,
            "topics": "data-test-topic",
            "connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
            "connection.user": "postgres",
            "connection.password": "arathore",
            "auto.create": true,
            "auto.evolve": true,
            "insert.mode": "upsert",
            "table.name.format": "init",
            "pk.mode": "record_value",
            "pk.fields": "name",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter.schemas.enable": true,
            "key.converter.schemas.enable": false
        }
    }
    

    Now the data is syncing properly with Postgres


  2. As the error says, the connector requires a Struct.

    StringConverter returns a plain String, not a Struct, and therefore the data cannot be parsed to know what fields in your data match the database columns and types.

    Refer Confluent blog on what Converter you should be using.

    In particular, org.apache.kafka.connect.json.StringConverter is an invalid class name, and schemas.enable does nothing for StringConverter, only JSONConverter, so you need

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true,
    

    But this also requires you to change your data to actually have a schema.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search