skip to Main Content

I’m a newbie to Kafka and started experimenting. I’m using this tutorial https://medium.com/high-alpha/data-stream-processing-for-newbies-with-kafka-ksql-and-postgres-c30309cfaaf8
I ran the following docker-compose.yml

version: '2'
services:
  postgres:
    image: 'debezium/postgres:11'
    environment:
      POSTGRES_PASSWORD: postgres
    command: postgres -c config_file=/home/config/postgresql.conf
    ports:
      - '5432:5432'
    volumes:
      - 'C:postgres-kafkaconfig:/home/config'
      - 'C:postgres-kafkaconfigpg_hba.conf:/home/config/pg_hba.conf'
      - 'C:postgres-kafkadata:/home/data/'
  zookeeper:
    image: 'confluentinc/cp-zookeeper:5.1.2'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: 'confluentinc/cp-kafka:5.1.2'
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  schema-registry:
    image: 'confluentinc/cp-schema-registry:5.1.2'
    hostname: schema-registry
    ports:
      - '8082:8082'
    depends_on:
      - zookeeper
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8082'
  ksql-server:
    image: 'confluentinc/cp-ksql-server:5.1.2'
    depends_on:
      - kafka
      - schema-registry
    environment:
      KSQL_BOOTSTRAP_SERVERS: 'kafka:9092'
      KSQL_LISTENERS: 'http://0.0.0.0:8088'
      KSQL_KSQL_SCHEMA_REGISTRY_URL: 'http://schema-registry:8082'
      KSQL_KSQL_SERVICE_ID: ksql-server
  connect:
    image: mtpatter/debezium-connect
    depends_on:
      - zookeeper
      - kafka
      - postgres
    ports:
      - '8083:8083'
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my-connect-configs
      OFFSET_STORAGE_TOPIC: my-connect-offsets
      ADVERTISED_HOST_NAME: connect
      BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_TOPIC_CREATION_ENABLE: 'true'
   # Runs the KSQL CLI
  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.2.2
    container_name: ksql-cli
    depends_on:
      - kafka
      - ksql-server
    entrypoint: /bin/sh
    tty: true

As described in the example,I created two tables "admission" and "research" in the "students" db in postgres,and filled them with values, copied from csv files.
After creating the tables I created a connector with the following configuration:

{"name": "postgres-source",
  "config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max":"1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "students",
    "database.server.name": "dbserver1",
    "database.whitelist": "students",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.students",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8082",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"   
  }
}

The connector exists but when I check the topics list, there are no topics for the two postgres table. I tried to create the connector first and then the tables and vice versa but no way.
Somebody suggested to disable the automatic topic creation for the Kafka broker and to enable it in at Connect level, see debezium docs.

By checking the connect logs, I see this error:

2023-11-04 17:18:50,198 ERROR  ||  WorkerSourceTask{id=postgres-source-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Cannot create replication connection
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:89)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:39)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:409)
        at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:66)
        at io.debezium.connector.postgresql.RecordsStreamProducer.<init>(RecordsStreamProducer.java:82)
        at io.debezium.connector.postgresql.RecordsSnapshotProducer.<init>(RecordsSnapshotProducer.java:75)
        at io.debezium.connector.postgresql.PostgresConnectorTask.createSnapshotProducer(PostgresConnectorTask.java:141)
        at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:94)
        at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.jdbc.JdbcConnectionException: ERROR: logical decoding requires wal_level >= logical
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:163)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.<init>(PostgresReplicationConnection.java:81)
        ... 16 more
Caused by: org.postgresql.util.PSQLException: ERROR: logical decoding requires wal_level >= logical
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
        at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:266)
        at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
        at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:124)
        ... 17 more
2023-11-04 17:18:50,199 ERROR  ||  WorkerSourceTask{id=postgres-source-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

Any hint is welcome !

2

Answers


  1. ERROR: logical decoding requires wal_level >= logical

    Change the parameter wal_level to logical and restart.

    Login or Signup to reply.
  2. if "auto.topic.creation.enable" config is not set to true, kafka will not create topics automatically. So you have the choice to either activate the creation by setting the config "auto.topic.creation.enable" to true or creating manually the topics needed.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search