I have a Kafka cluster deployed by Strimzi and Apicurio Registry for Kafka schema registry.
I am hoping to use AvroConverter
in the JDBC sink connector to sink data from Kafka to TimescaleDB.
Here is my Kafka Connect Dockerfile:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/
&& rm -f jdbc-connector-for-apache-kafka.zip
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
My Kafka Connect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-kafka-iot-kafka-connect
namespace: hm-kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
replicas: 1
bootstrapServers: hm-kafka-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: hm-kafka-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-iot-db-credentials-volume
secret:
secretName: hm-iot-db-credentials
My JDBC sink connector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: hm-motor-jdbc-sink-kafka-connector
namespace: hm-kafka
labels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
class: io.aiven.connect.jdbc.JdbcSinkConnector
tasksMax: 32
config:
connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
tasks.max: 32
topics: hm.motor
connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
insert.mode: multi
batch.size: 100000
# table
table.name.format: motor
# timestamp
transforms: convertTimestamp
transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convertTimestamp.field: timestamp
transforms.convertTimestamp.target.type: Timestamp
# value
value.converter: io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter.apicurio.registry.as-confluent: true
(Note the config related with apicurio.registry
most likely have issues too.)
However, I met this error (let’s call it Error 1):
Error 1
2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
... 6 more
Attempt 1 (to fix Error 1, Succeed)
Based on the error, I added apicurio-registry-utils-converter in my Kafka Connect Dockerfle:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/
&& rm -f jdbc-connector-for-apache-kafka.zip
# apicurio-registry-utils-converter
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar
&& mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/
&& mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
Now it can succeed locate io.apicurio.registry.utils.converter.AvroConverter
, however I have a new error. (Let’s call it error 2)
Error 2
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values:
task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
(org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
... 10 more
Attempt 2 (to fix Error 2, failed)
Based on the error, I added apicurio-registry-serdes-avro-serde in my Kafka Connect Dockerfile:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/
&& rm -f jdbc-connector-for-apache-kafka.zip
# apicurio-registry-utils-converter
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar
&& mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/
&& mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
# apicurio-registry-serdes-avro-serde
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar
&& mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
&& mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
But this time, the Error 2 is still there.
apicurio-registry-serdes-avro-serde
seems not correct dependency to fix Error 2. What would be the correct dependency? Thanks!
Attempt 3 (Different direction)
I have followed @OneCricketeer’s suggestion switching to kafka-connect-avro-converter
and use with Apicurio Registry’s Confluent compatible REST API endpoint /apis/ccompat/v6/
now.
Here is my Kafka Connect to use io.confluent.connect.avro.AvroConverter
:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
# kafka-connect-avro-converter
# https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
&& wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip
&& mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/
&& unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/
&& rm -f kafka-connect-avro-converter.zip
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar
&& mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/
&& tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/
&& rm -f jdbc-connector-for-apache-kafka.tar
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
Regarding the corresponding JDBC Sink Connector config, I have a different question at org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx
UPDATE: I found Confluent Avro format is different with vanilla Apache Avro which causes some inconvenience for Spark and other tools. So they are two different directions. Besides Confluent direction, I will continue looking for solution in this direction too.
2
Answers
The issue is before I added the dependency apicurio-registry-utils-converter.
However, the correct one is apicurio-registry-distro-connect-converter.
So here is my final Kafka Connect Dockerfile to use
io.apicurio.registry.utils.converter.AvroConverter
:For comparison purpose, here is the way to use
io.confluent.connect.avro.AvroConverter
apicurio-registry-serdes-avro-serde
dependency is correct for that class. But it should already be part of the Avro converter package.But (De)Serializer classes are not Connect "plugins" in the same way Converters are. You need to export
CLASSPATH
environment variable to include extra directories where you’re putting JAR filesI’d also suggest not using multi stage builds here, unless wget and unzip aren’t available in the Strimzi image. Plus, the Apicurio Registry is compatible with Confluent Converter, so I’d recommend installing those (and Aiven connectors) using plugins feature, anyway