I’m a newbie to Kafka and started experimenting. I’m using this tutorial https://medium.com/high-alpha/data-stream-processing-for-newbies-with-kafka-ksql-and-postgres-c30309cfaaf8
I ran the following docker-compose.yml
version: '2'
services:
postgres:
image: 'debezium/postgres:11'
environment:
POSTGRES_PASSWORD: postgres
command: postgres -c config_file=/home/config/postgresql.conf
ports:
- '5432:5432'
volumes:
- 'C:postgres-kafkaconfig:/home/config'
- 'C:postgres-kafkaconfigpg_hba.conf:/home/config/pg_hba.conf'
- 'C:postgres-kafkadata:/home/data/'
zookeeper:
image: 'confluentinc/cp-zookeeper:5.1.2'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: 'confluentinc/cp-kafka:5.1.2'
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: 'confluentinc/cp-schema-registry:5.1.2'
hostname: schema-registry
ports:
- '8082:8082'
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8082'
ksql-server:
image: 'confluentinc/cp-ksql-server:5.1.2'
depends_on:
- kafka
- schema-registry
environment:
KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
KSQL_LISTENERS: 'http://0.0.0.0:8088'
KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8082'
KSQL_KSQL_SERVICE_ID: ksql-server
connect:
image: mtpatter/debezium-connect
depends_on:
- zookeeper
- kafka
- postgres
ports:
- '8083:8083'
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my-connect-configs
OFFSET_STORAGE_TOPIC: my-connect-offsets
ADVERTISED_HOST_NAME: connect
BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_TOPIC_CREATION_ENABLE: 'true'
# Runs the KSQL CLI
ksql-cli:
image: confluentinc/cp-ksql-cli:5.2.2
container_name: ksql-cli
depends_on:
- kafka
- ksql-server
entrypoint: /bin/sh
tty: true
As described in the example,I created two tables "admission" and "research" in the "students" db in postgres,and filled them with values, copied from csv files.
After creating the tables I created a connector with the following configuration:
{"name": "postgres-source",
"config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max":"1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "students",
"database.server.name": "dbserver1",
"database.whitelist": "students",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.students",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8082",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}
The connector exists but when I check the topics list, there are no topics for the two postgres table. I tried to create the connector first and then the tables and vice versa but no way.
Somebody suggested to disable the automatic topic creation for the Kafka broker and to enable it in at Connect level, see debezium docs.
By checking the connect logs, I see this error:
2023-11-04 17:18:50,198 ERROR || WorkerSourceTask{id=postgres-source-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Cannot create replication connection
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:89)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:39)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:409)
at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:66)
at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:82)
at io.debezium.connector.postgresql.RecordsSnapshotProducer.<init>(RecordsSnapshotProducer.java:75)
at io.debezium.connector.postgresql.PostgresConnectorTask.createSnapshotProducer(PostgresConnectorTask.java:141)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:94)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.jdbc.JdbcConnectionException: ERROR: logical decoding requires wal_level >= logical
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:163)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:81)
... 16 more
Caused by: org.postgresql.util.PSQLException: ERROR: logical decoding requires wal_level >= logical
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:266)
at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:124)
... 17 more
2023-11-04 17:18:50,199 ERROR || WorkerSourceTask{id=postgres-source-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
Any hint is welcome !
2
Answers
Change the parameter wal_level to logical and restart.
if "auto.topic.creation.enable" config is not set to true, kafka will not create topics automatically. So you have the choice to either activate the creation by setting the config "auto.topic.creation.enable" to true or creating manually the topics needed.