skip to Main Content

I have a stack which consists of 3 Kafka instances a producer and a consumer and a schema registry.

The producer sends a topic "Person" all the time and the consumer just receives it.

I have created an AVRO schema "Person" in the schema registry.

What do I have to change in my stack so that kafka checks if the topic or message matches the schema?

Here is my Docker Stack:

version: '3.8'
services:
  kafka1:
    image: confluentinc/cp-kafka
    hostname: kafka1
    ports:
      - "9192:9092"
    volumes:
      - kafka1-data:/var/lib/kafka/data

    environment:
      #Define the name for Broker and controller and protocols
      KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT

      #Define the ports used for broker, controller and external
      KAFKA_LISTENERS: BROKER://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://:9092
      # TODO: Adjust advertised IP address from 'localhost:9092' to hostname or ip address of running system
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:29092,EXTERNAL://localhost:9192

      # Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3

      # Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      # TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
      CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='

      # Required to avoid log spamming from MetdataLoader
      KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'

      CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
      SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
      
    deploy:
      placement:
        constraints:
          # TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
          - "node.role == manager"

  kafka2:
    image: confluentinc/cp-kafka
    hostname: kafka2
    ports:
      - "9193:9093"
    volumes:
      - kafka2-data:/var/lib/kafka/data

    environment:

      #Define the name for Broker and controller and protocols
      KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT

      #Define the ports used for broker, controller and external
      KAFKA_LISTENERS: BROKER://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://:9093
      # TODO: Adjust advertised IP address from 'localhost:9093' to hostname or ip address of running system
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:29092,EXTERNAL://localhost:9193

      # Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3

      # Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      # TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
      CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='

      # Required to avoid log spamming from MetdataLoader
      KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'

      CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
      SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
      
    deploy:
      placement:
        constraints:
          # TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
          - "node.role == manager"

  kafka3:
    image: confluentinc/cp-kafka
    hostname: kafka3
    ports:
      - "9194:9094"
    volumes:
      - kafka3-data:/var/lib/kafka/data

    environment:

      # Define the name for Broker and controller and protocols
      KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT

      # Define the ports used for broker, controller and external
      KAFKA_LISTENERS: BROKER://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://:9094
      # TODO: Adjust advertised IP address from 'localhost:9094' to hostname or ip address of running system
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:29092,EXTERNAL://localhost:9194

      # Configure replication factors for autocreate topics, consumer offsets,log-topic and re-balance delay
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3

      # Following parameters are required to run Kafka in Kraft mode (without Zookeeper)
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      # TODO: Generate ONE kafka-ID using ./bin/kafka-storage.sh random-uuid and set it to all instances
      CLUSTER_ID: 'aiWo7IWazngRchmPES6q5A=='

      # Required to avoid log spamming from MetdataLoader
      KAFKA_LOG4J_LOGGERS: 'org.apache.kafka.image.loader.MetadataLoader=WARN'

      CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
      SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"
      
    deploy:
      placement:
        constraints:
          # TODO: To ensure that kafka instances are not placed on the same VM, you should define different constraints such as manager or worker to ensure that at least two instances run on different VMs
          - "node.role == manager"

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - "9180:8080"

    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: digi-production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29092,kafka3:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry1:8085
      KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin
      KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: letmein
      
    healthcheck:
      test: ["CMD-SHELL", "wget -nv -t1 --spider 'http://localhost:8080'"]
      interval: 10s
      timeout: 10s
      retries: 3

  schemaregistry1:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schemaregistry1
    ports:
      - 18085:8085
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092,PLAINTEXT://kafka2:29092,PLAINTEXT://kafka3:29092
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085

      # Default credentials: admin/letmein
      #SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
      #SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistryProps
      #SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin
      #SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema_registry.jaas

      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

  kafka-producer:
    image: confluentinc/cp-kafka
    command:  >
      bash -c "while true;
      do echo '{"id": 1, "name": "John", "age": 30}' |
      kafka-console-producer --broker-list kafka1:29092 --topic test_topic 
      --property value.schema='"Person"' && sleep 5;
      done"


  kafka-consumer:
    image: confluentinc/cp-kafka
    command: >
      kafka-console-consumer --bootstrap-server kafka1:29092 --topic test_topic 
      --from-beginning --property print.key=true 
      --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
      --property value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer 
      --property schema.registry.url=http://schemaregistry1:8085 
      --property schema.registry.user=admin 
      --property schema.registry.password=letmein


volumes:
  kafka1-data:
    driver: distributed-docker-volumes
  kafka2-data:
    driver: distributed-docker-volumes
  kafka3-data:
    driver: distributed-docker-volumes

I have already tried the following environment variables but so far without success:

CONFLUENT_SUPPORT_SCHEMA_VALIDATION: "true"
SCHEMA_REGISTRY_URL: "http://schemaregistry1:8085"

The producer sends the topic with the flag " –property value.schema=’"Person"’"
This is the name of the created schema

I hope you can help me and thank you in advance

2

Answers


  1. Chosen as BEST ANSWER

    It was pretty simple, I now have my Producer built on the Kafka Connect image. In this I started the Kafka-avro-console-producer.

    kafka-avro-console-producer 
      --topic test_topic 
      --bootstrap-server bootstrapserveradress:9192 
      --property schema.registry.url=http://schemaregistryadress:18085/ 
      --property auto.register.schemas=false 
      --property key.separator=: 
      --property schema.registry.user=admin 
      --property schema.registry.password=letmein 
      --property value.schema.id=1
    

    After that, I can send an AVRO message, which is compared with the schema "test_topic-value". Here I give it still with that it concerns the ID=1.


  2. Kafka doesn’t check. Only the cp-server Enterprise image supports this feature, and requires a paid license when you’re using more than one broker

    But to add any property in the server.properties file, it must be prefixed with KAFKA_

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search