I have a stack which consists of 3 Kafka instances a producer and a consumer and a schema registry.
The producer sends a topic "Person" all the time and the consumer just receives it.
I have created an AVRO schema "Person" in the schema registry.
What do I have to change in my stack so that kafka checks if the topic or message matches the schema?
Here is my Docker Stack:
version: '3.8'
services:
kafka1:
image: confluentinc/cp-kafka
hostname: kafka1
ports:
- "9192:9092"
volumes:
- kafka1-data:/var/lib/kafka/data
environment:
#Define the name for Broker and controller and protocols
KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
#Define the ports used for broker, controller and external
KAFKA_LISTENERS: BROKER://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://:9092
# TODO: Adjust advertised IP address from 'localhost:9092' to hostname or ip address of running system
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:29092,EXTERNAL://localhost:9192
# Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
# Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
# TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='
# Required to avoid log spamming from MetdataLoader
KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'
CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
deploy:
placement:
constraints:
# TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
- "node.role == manager"
kafka2:
image: confluentinc/cp-kafka
hostname: kafka2
ports:
- "9193:9093"
volumes:
- kafka2-data:/var/lib/kafka/data
environment:
#Define the name for Broker and controller and protocols
KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
#Define the ports used for broker, controller and external
KAFKA_LISTENERS: BROKER://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://:9093
# TODO: Adjust advertised IP address from 'localhost:9093' to hostname or ip address of running system
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:29092,EXTERNAL://localhost:9193
# Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
# Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
# TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='
# Required to avoid log spamming from MetdataLoader
KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'
CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
deploy:
placement:
constraints:
# TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
- "node.role == manager"
kafka3:
image: confluentinc/cp-kafka
hostname: kafka3
ports:
- "9194:9094"
volumes:
- kafka3-data:/var/lib/kafka/data
environment:
# Define the name for Broker and controller and protocols
KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
# Define the ports used for broker, controller and external
KAFKA_LISTENERS: BROKER://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://:9094
# TODO: Adjust advertised IP address from 'localhost:9094' to hostname or ip address of running system
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:29092,EXTERNAL://localhost:9194
# Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
# Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
# TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='
# Required to avoid log spamming from MetdataLoader
KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'
CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
deploy:
placement:
constraints:
# TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
- "node.role == manager"
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- "9180:8080"
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: digi-production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29092,kafka3:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry1:8085
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: letmein
healthcheck:
test: ["CMD-SHELL", "wget -nv -t1 --spider 'http://localhost:8080'"]
interval: 10s
timeout: 10s
retries: 3
schemaregistry1:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schemaregistry1
ports:
- 18085:8085
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092,PLAINTEXT://kafka2:29092,PLAINTEXT://kafka3:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
# Default credentials: admin/letmein
#SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
#SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistryProps
#SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin
#SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema_registry.jaas
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
kafka-producer:
image: confluentinc/cp-kafka
command: >
bash -c "while true;
do echo '{"id": 1, "name": "John", "age": 30}' |
kafka-console-producer --broker-list kafka1:29092 --topic test_topic
--property value.schema='"Person"' && sleep 5;
done"
kafka-consumer:
image: confluentinc/cp-kafka
command: >
kafka-console-consumer --bootstrap-server kafka1:29092 --topic test_topic
--from-beginning --property print.key=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
--property schema.registry.url=http://schemaregistry1:8085
--property schema.registry.user=admin
--property schema.registry.password=letmein
volumes:
kafka1-data:
driver: distributed-docker-volumes
kafka2-data:
driver: distributed-docker-volumes
kafka3-data:
driver: distributed-docker-volumes
I have already tried the following environment variables but so far without success:
CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
The producer sends the topic with the flag " –property value.schema=’"Person"’"
This is the name of the created schema
I hope you can help me and thank you in advance
2
Answers
It was pretty simple, I now have my Producer built on the Kafka Connect image. In this I started the Kafka-avro-console-producer.
After that, I can send an AVRO message, which is compared with the schema "test_topic-value". Here I give it still with that it concerns the ID=1.
Kafka doesn’t check. Only the
cp-server
Enterprise image supports this feature, and requires a paid license when you’re using more than one brokerBut to add any property in the server.properties file, it must be prefixed with
KAFKA_