skip to Main Content

I have a Kafka cluster deployed by Strimzi and Apicurio Registry for Kafka schema registry.

I am hoping to use AvroConverter in the JDBC sink connector to sink data from Kafka to TimescaleDB.

Here is my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ 
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip 
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ 
  && rm -f jdbc-connector-for-apache-kafka.zip
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

My Kafka Connect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: hm-kafka-iot-kafka-connect
  namespace: hm-kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
  replicas: 1
  bootstrapServers: hm-kafka-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: hm-kafka-cluster-ca-cert
        certificate: ca.crt
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: hm-iot-db-credentials-volume
        secret:
          secretName: hm-iot-db-credentials

My JDBC sink connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: hm-motor-jdbc-sink-kafka-connector
  namespace: hm-kafka
  labels:
    strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
  class: io.aiven.connect.jdbc.JdbcSinkConnector
  tasksMax: 32
  config:
    connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
    tasks.max: 32
    topics: hm.motor
    connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
    connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
    connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
    insert.mode: multi
    batch.size: 100000

    # table
    table.name.format: motor

    # timestamp
    transforms: convertTimestamp
    transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.convertTimestamp.field: timestamp
    transforms.convertTimestamp.target.type: Timestamp

    # value
    value.converter: io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
    value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
    value.converter.apicurio.registry.as-confluent: true

(Note the config related with apicurio.registry most likely have issues too.)

However, I met this error (let’s call it Error 1):

Error 1

2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
  at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
  at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
  ... 6 more

Attempt 1 (to fix Error 1, Succeed)

Based on the error, I added apicurio-registry-utils-converter in my Kafka Connect Dockerfle:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ 
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip 
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ 
  && rm -f jdbc-connector-for-apache-kafka.zip 

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar 
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ 
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Now it can succeed locate io.apicurio.registry.utils.converter.AvroConverter, however I have a new error. (Let’s call it error 2)

Error 2

2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values: 
  task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
  at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
  at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
  at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
  ... 10 more

Attempt 2 (to fix Error 2, failed)

Based on the error, I added apicurio-registry-serdes-avro-serde in my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ 
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip 
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ 
  && rm -f jdbc-connector-for-apache-kafka.zip 

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar 
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ 
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/ 

  # apicurio-registry-serdes-avro-serde
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar 
  && mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/ 
  && mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

But this time, the Error 2 is still there.

apicurio-registry-serdes-avro-serde seems not correct dependency to fix Error 2. What would be the correct dependency? Thanks!

Attempt 3 (Different direction)

I have followed @OneCricketeer’s suggestion switching to kafka-connect-avro-converter and use with Apicurio Registry’s Confluent compatible REST API endpoint /apis/ccompat/v6/ now.

Here is my Kafka Connect to use io.confluent.connect.avro.AvroConverter:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
  # kafka-connect-avro-converter
  # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
  && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip 
  && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ 
  && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ 
  && rm -f kafka-connect-avro-converter.zip 

  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar 
  && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
  && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
  && rm -f jdbc-connector-for-apache-kafka.tar
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Regarding the corresponding JDBC Sink Connector config, I have a different question at org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx

UPDATE: I found Confluent Avro format is different with vanilla Apache Avro which causes some inconvenience for Spark and other tools. So they are two different directions. Besides Confluent direction, I will continue looking for solution in this direction too.

2

Answers


  1. Chosen as BEST ANSWER

    The issue is before I added the dependency apicurio-registry-utils-converter.

    However, the correct one is apicurio-registry-distro-connect-converter.

    So here is my final Kafka Connect Dockerfile to use io.apicurio.registry.utils.converter.AvroConverter:

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ 
      # apicurio-registry-distro-connect-converter
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-distro-connect-converter
      && wget --no-verbose --output-document=apicurio-registry-distro-connect-converter.tar.gz https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.2.Final/apicurio-registry-distro-connect-converter-2.4.2.Final.tar.gz 
      && mkdir -p /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ 
      && tar -x -f apicurio-registry-distro-connect-converter.tar.gz -C /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ 
      && rm -f apicurio-registry-distro-connect-converter.tar.gz 
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar 
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    For comparison purpose, here is the way to use io.confluent.connect.avro.AvroConverter

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
      # kafka-connect-avro-converter
      # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
      && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip 
      && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ 
      && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ 
      && rm -f kafka-connect-avro-converter.zip 
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar 
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ 
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

  2. apicurio-registry-serdes-avro-serde dependency is correct for that class. But it should already be part of the Avro converter package.

    But (De)Serializer classes are not Connect "plugins" in the same way Converters are. You need to export CLASSPATH environment variable to include extra directories where you’re putting JAR files

    I’d also suggest not using multi stage builds here, unless wget and unzip aren’t available in the Strimzi image. Plus, the Apicurio Registry is compatible with Confluent Converter, so I’d recommend installing those (and Aiven connectors) using plugins feature, anyway

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search