skip to Main Content

I have a JDBC sink connector retrieving data from a Postgres source connector, and I need to copy the data from my "parametros" source table to my "parametros_sistema" target table.
I’m using the org.apache.kafka.connect.transforms.ReplaceField$Value transformer to target the specific columns from my source to my target, since my columns names are different in source and target, but when I start my sink connector I’m receiving this error:

ERROR [collaborator-sink-postgres-connector|task-0] WorkerSinkTask{id=collaborator-sink-postgres-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "collab-service"."public"."parametros" is missing and auto-creation is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:586)

The problem is that I’m not being able to set the target table name, because of that the connector is trying to create a table with the same name of the source table on my target database. Does someone know how fix this?

P.S: Both, source and target are Postgres databases

My source connector:

debezium/debezium-connector-postgresql:2.2.1

name = collaborator-postgres-connector
connector.class = io.debezium.connector.postgresql.PostgresConnector
tasks.max = 1
topic.prefix = collab-service
database.hostname = host
database.server.name = collaborator-postgres-server
database.port = 5432
database.user = user
database.password = password
database.dbname = sflm_dev
plugin.name = pgoutput
slot.name = collab_test
decimal.handling.mode = double
snapshot.mode = always
schema.name.adjustment.mode = none
table.include.list = public.parametros
database.history.kafka.topic = postgres_history
database.history.kafka.bootstrap.servers = kafka:9092
message.prefix.include.list = after

My sink connector:

confluentinc/kafka-connect-jdbc:10.7.4

name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
insert.mode = upsert
pk.mode = record_key
transforms = timestampConverter,replaceField
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestampConverter.field = para_dt_cadastro, para_dt_ult_alt
transforms.timestampConverter.target.type = Timestamp
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, para_dt_cadastro:pasi_dt_cadastro, para_dt_ult_alt:pasi_dt_ult_alt, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = para_cd_id

Sample of one of my topic’s messages:

[
    {
        "topic": "collab-service.public.parametros",
        "partition": 0,
        "offset": 1,
        "timestamp": 1704296444971,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": "Struct{para_cd_id=2}",
        "value": {
            "schema": {
                "type": "struct",
                "fields": [
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "before"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "after"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "version"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "connector"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "name"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "ts_ms"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "name": "io.debezium.data.Enum",
                                "version": 1,
                                "parameters": {
                                    "allowed": "true,last,false,incremental"
                                },
                                "default": "false",
                                "field": "snapshot"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "db"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "sequence"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "schema"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "table"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "txId"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "lsn"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "xmin"
                            }
                        ],
                        "optional": false,
                        "name": "io.debezium.connector.postgresql.Source",
                        "field": "source"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "op"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "id"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "total_order"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "data_collection_order"
                            }
                        ],
                        "optional": true,
                        "name": "event.block",
                        "version": 1,
                        "field": "transaction"
                    }
                ],
                "optional": false,
                "name": "collab-service.public.parametros.Envelope",
                "version": 1
            },
            "payload": {
                "before": null,
                "after": {
                    "para_cd_id": 2,
                    "para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
                    "para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
                    "para_tx_valor": "collaborator-364516",
                    "para_tx_tipo": "SISTEMA",
                    "para_dt_cadastro": 1669047531542921,
                    "para_dt_ult_alt": 1669047531542921,
                    "usua_cd_id_cadastro": null,
                    "usua_cd_id_ult_alt": null,
                    "para_tx_sistema": "R2D2"
                },
                "source": {
                    "version": "2.2.1.Final",
                    "connector": "postgresql",
                    "name": "collab-service",
                    "ts_ms": 1704296442190,
                    "snapshot": "last",
                    "db": "sflm_dev",
                    "sequence": "[null,"17335024878048"]",
                    "schema": "public",
                    "table": "parametros",
                    "txId": 989004,
                    "lsn": 17335024878048,
                    "xmin": null
                },
                "op": "r",
                "ts_ms": 1704296444402,
                "transaction": null
            }
        }
    }
]

2

Answers


  1. Chosen as BEST ANSWER

    The property table.name.format did the job.


  2. Just to explain the property table.name.format formats the string for the destination table name. You can use ‘${topic}‘ as a placeholder for the origin topic.

    Example:

    table.name.format=${topic}_sistema
    

    Assuming the topic name is "parametros" it will produce the table name:

    parametros_sistema
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search