I have a JDBC sink connector retrieving data from a Postgres source connector, and I need to copy the data from my "parametros" source table to my "parametros_sistema" target table.
I’m using the org.apache.kafka.connect.transforms.ReplaceField$Value transformer to target the specific columns from my source to my target, since my columns names are different in source and target, but when I start my sink connector I’m receiving this error:
ERROR [collaborator-sink-postgres-connector|task-0] WorkerSinkTask{id=collaborator-sink-postgres-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "collab-service"."public"."parametros" is missing and auto-creation is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:586)
The problem is that I’m not being able to set the target table name, because of that the connector is trying to create a table with the same name of the source table on my target database. Does someone know how fix this?
P.S: Both, source and target are Postgres databases
My source connector:
debezium/debezium-connector-postgresql:2.2.1
name = collaborator-postgres-connector
connector.class = io.debezium.connector.postgresql.PostgresConnector
tasks.max = 1
topic.prefix = collab-service
database.hostname = host
database.server.name = collaborator-postgres-server
database.port = 5432
database.user = user
database.password = password
database.dbname = sflm_dev
plugin.name = pgoutput
slot.name = collab_test
decimal.handling.mode = double
snapshot.mode = always
schema.name.adjustment.mode = none
table.include.list = public.parametros
database.history.kafka.topic = postgres_history
database.history.kafka.bootstrap.servers = kafka:9092
message.prefix.include.list = after
My sink connector:
confluentinc/kafka-connect-jdbc:10.7.4
name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
insert.mode = upsert
pk.mode = record_key
transforms = timestampConverter,replaceField
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.timestampConverter.field = para_dt_cadastro, para_dt_ult_alt
transforms.timestampConverter.target.type = Timestamp
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, para_dt_cadastro:pasi_dt_cadastro, para_dt_ult_alt:pasi_dt_ult_alt, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = para_cd_id
Sample of one of my topic’s messages:
[
{
"topic": "collab-service.public.parametros",
"partition": 0,
"offset": 1,
"timestamp": 1704296444971,
"timestampType": "CREATE_TIME",
"headers": [],
"key": "Struct{para_cd_id=2}",
"value": {
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "para_cd_id"
},
{
"type": "string",
"optional": false,
"field": "para_tx_dominio"
},
{
"type": "string",
"optional": false,
"field": "para_tx_descricao"
},
{
"type": "string",
"optional": false,
"field": "para_tx_valor"
},
{
"type": "string",
"optional": false,
"field": "para_tx_tipo"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_cadastro"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_ult_alt"
},
{
"type": "int64",
"optional": true,
"field": "usua_cd_id_cadastro"
},
{
"type": "double",
"optional": true,
"field": "usua_cd_id_ult_alt"
},
{
"type": "string",
"optional": true,
"field": "para_tx_sistema"
}
],
"optional": true,
"name": "collab-service.public.parametros.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "para_cd_id"
},
{
"type": "string",
"optional": false,
"field": "para_tx_dominio"
},
{
"type": "string",
"optional": false,
"field": "para_tx_descricao"
},
{
"type": "string",
"optional": false,
"field": "para_tx_valor"
},
{
"type": "string",
"optional": false,
"field": "para_tx_tipo"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_cadastro"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "para_dt_ult_alt"
},
{
"type": "int64",
"optional": true,
"field": "usua_cd_id_cadastro"
},
{
"type": "double",
"optional": true,
"field": "usua_cd_id_ult_alt"
},
{
"type": "string",
"optional": true,
"field": "para_tx_sistema"
}
],
"optional": true,
"name": "collab-service.public.parametros.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "collab-service.public.parametros.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"para_cd_id": 2,
"para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
"para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
"para_tx_valor": "collaborator-364516",
"para_tx_tipo": "SISTEMA",
"para_dt_cadastro": 1669047531542921,
"para_dt_ult_alt": 1669047531542921,
"usua_cd_id_cadastro": null,
"usua_cd_id_ult_alt": null,
"para_tx_sistema": "R2D2"
},
"source": {
"version": "2.2.1.Final",
"connector": "postgresql",
"name": "collab-service",
"ts_ms": 1704296442190,
"snapshot": "last",
"db": "sflm_dev",
"sequence": "[null,"17335024878048"]",
"schema": "public",
"table": "parametros",
"txId": 989004,
"lsn": 17335024878048,
"xmin": null
},
"op": "r",
"ts_ms": 1704296444402,
"transaction": null
}
}
}
]
2
Answers
The property
table.name.format
did the job.Just to explain the property
table.name.format
formats the string for the destination table name. You can use ‘${topic}
‘ as a placeholder for the origin topic.Example:
Assuming the topic name is "parametros" it will produce the table name: