skip to Main Content

I have successfully created a mariadb database connection using debezium and kafka

When I tried to stream the topic using pyspark
this is the output that I get

-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------------------------------------------------------------------------------------------------------------------------+
|key   |value                                                                                                                     |
+------+--------------------------------------------------------------------------------------------------------------------------+
||MaxDoe1.4.2.Finalnmysqlmariadbbtruebasecampemployees mysql-bin.000032�r�ȯݭd                   |
||bJanebMary1.4.2.Finalnmysqlmariadbbtruebasecampemployees mysql-bin.000032�r�ȯݭd               |
||nAliceJohnson1.4.2.Finalnmysqlmariadbblastbasecampemployees mysql-bin.000032�r�ȯݭd            |
||MaxDoebMaxxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�bu���߭d|
||bMaxxDoeMaxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�ru��߭d |
||bMaxxDoeMaxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�ru����d|
+------+--------------------------------------------------------------------------------------------------------------------------+

would this be a problem when I want to load/stream this data to the target database, basically what Im trying to do is mariadb database -> kafka-debezium -> pyspark -> mariadb database

this is my code that I got my output with:

#pysparkkafka1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import traceback
import time

try:
    # Initialize the Spark session
    spark = SparkSession.builder 
        .appName("Kafka Spark Integration") 
        .getOrCreate()

    print("Spark session started")

    # Define Kafka topic and server
    kafka_topic = "mariadb.basecamp.employees"
    kafka_server = "kafka:9092"  # Replace with your Kafka server address if different

    # Print the Kafka topic
    print(f"Reading from Kafka topic: {kafka_topic}")

    # Read from the Kafka topic
    df = spark.readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", kafka_server) 
        .option("subscribe", kafka_topic) 
        .option("startingOffsets", "earliest") 
        .load()

    print("DataFrame created from Kafka topic")

    # Select key and value and convert from bytes to string
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    print("DataFrame transformed")

    # Display the dataframe in console
    query = df.writeStream 
        .outputMode("append") 
        .format("console") 
        .option("truncate", False) 
        .start()

    print("Query started")

    # Timeout for termination
    timeout = 60  # 1 minute timeout
    start_time = time.time()
    while time.time() - start_time < timeout:
        if query.isActive:
            print("Streaming...")
            time.sleep(10)  # Check every 10 seconds
        else:
            break

    query.stop()
    print("Query stopped")
    
except Exception as e:
    print("An error occurred:", e)
    traceback.print_exc()

finally:
    spark.stop()
    print("Spark session stopped")

this is my docker compose

version: '3.8'

services:
  mariadb:
    image: mariadb:10.5
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: root
      MYSQL_PASSWORD: password
    volumes:
      - ./mariadb-config:/etc/mysql/mariadb.conf.d
      - D:mariakafkamy.cnf:/etc/mysql/my.cnf
      - ./mysql:/var/lib/mysql
    ports:
      - "3306:3306"

  phpmyadmin:
    image: phpmyadmin
    restart: always
    ports:
      - 8080:80
    environment:
      - PMA_ARBITRARY=1
      - UPLOAD_LIMIT=2G
      - MEMORY_LIMIT=2G

  postgres:
    image: debezium/postgres:13
    restart: always
    volumes:
      - ./postgres:/var/lib/postgresql/data
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb

  pgadmin:
    image: dpage/pgadmin4
    restart: always
    environment:
      PGADMIN_DEFAULT_EMAIL: [email protected]
      PGADMIN_DEFAULT_PASSWORD: root
    ports:
      - "5050:80"
    depends_on:
      - postgres
 
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-server:5.5.1
    restart: always
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
      KAFKA_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - "9092:9092"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    restart: always
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8087,http://localhost:8087
    ports:
      - 8087:8087
    depends_on: [zookeeper, kafka]

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_status
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
      OFFSET_FLUSH_INTERVAL_MS: 60000
      OFFSET_FLUSH_TIMEOUT_MS: 5000
      SHUTDOWN_TIMEOUT: 10000
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schema-registry
    volumes:
      - D:mariakafkakafkaconfig:/kafka/config
      - D:/mariakafka/mysql-connector:/usr/share/java/kafka-connect

  pyspark:
    image: jupyter/pyspark-notebook:latest
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --jars /home/jovyan/work/mysql-connector-j-9.0.0.jar pyspark-shell
    volumes:
      - D:/mariakafka/pyspark:/home/jovyan/work
      - D:/mysql-connector-j-9.0.0.jar:/home/jovyan/work/mysql-connector-j-9.0.0.jar
    depends_on:
      - kafka

volumes:
  postgres_data:
  mariadb_data:
              
networks:
  flink-network:
    driver: bridge

This is what I was hoping to see, I got this output using the schema registry container

docker exec -it  3179874d15c23934fc55b841a5650d6e07a33a72cbdd74de308615a0c11c45e0 bash

and then

kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic mariadb.basecamp.employees --from-beginning --property schema.registry.url=http://schema-registry:8087

But I dont understand replicate it in pyspark

{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623364},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":2,"first_name":"Jane","last_name":"Mary"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623367},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":3,"first_name":"Alice","last_name":"Johnson"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"last"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623369},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724126944000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":549,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724126944214},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127006261},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127912824},"transaction":null}

2

Answers


  1. Chosen as BEST ANSWER

    This was the code I use to deserialize using avro

    #pysparkkafka3
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, expr
    from pyspark.sql.avro.functions import from_avro
    from confluent_kafka.schema_registry import SchemaRegistryClient
    import traceback
    import time
    
    # Kafka and Schema Registry configurations
    kafka_topic = "mariadb.basecamp.employees"
    kafka_server = "kafka:9092"
    schema_registry_url = "http://schema-registry:8087"
    schema_registry_subject = f"{kafka_topic}-value"
    
    def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
        sr = SchemaRegistryClient({'url': schema_registry_url})
        latest_version = sr.get_latest_version(schema_registry_subject)
        schema_str = latest_version.schema.schema_str
        return sr, schema_str
    
    try:
        # Initialize the Spark session with Avro package
        spark = SparkSession.builder 
            .appName("Kafka Spark Integration") 
            .config("spark.sql.shuffle.partitions", "2") 
            .config("spark.sql.adaptive.enabled", "true") 
            .config("spark.sql.codegen.wholeStage", "false") 
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") 
            .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") 
            .getOrCreate()
    
        spark.sparkContext.setLogLevel("ERROR")
    
        print("Spark session started")
    
        # Read from Kafka topic
        df = spark.readStream 
            .format("kafka") 
            .option("kafka.bootstrap.servers", kafka_server) 
            .option("subscribe", kafka_topic) 
            .option("startingOffsets", "earliest") 
            .load()
    
        print("DataFrame created from Kafka topic")
    
        # Remove first 5 bytes from value
        df = df.withColumn('fixedValue', expr("substring(value, 6, length(value) - 5)"))
    
        # Get schema from Schema Registry
        _, schema_str = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
        print("Schema retrieved from Schema Registry")
    
        # Deserialize data using Avro schema
        decoded_output = df.select(
            from_avro(col("fixedValue"), schema_str).alias("data")
        )
        
        df = decoded_output.select("data.*")
        df.printSchema()
    
        # Display the dataframe in console
        query = df.writeStream 
            .outputMode("append") 
            .format("console") 
            .option("truncate", False) 
            .start()
    
        print("Query started")
    
        # Timeout for termination
        timeout = 60  # 1 minute timeout
        start_time = time.time()
        while time.time() - start_time < timeout:
            if query.isActive:
                print("Streaming...")
                time.sleep(10)  # Check every 10 seconds
            else:
                break
    
        query.stop()
        print("Query stopped")
    
    except Exception as e:
        print("An error occurred:", e)
        traceback.print_exc()
    
    finally:
        spark.stop()
        print("Spark session stopped")
    

  2. You’re casting the data in Spark as a String, not deserializing to Avro, or using the Schema Registry as part of your Spark consumer. Therefore you’ll get unparsable UTF8 strings.

    You could fix your Debezium config to use JsonConverter rather than Avro, otherwise you’ll have to write a UDF function to deserialize the Avro appropriately. Integrating Spark Structured Streaming with the Confluent Schema Registry

    Debezium or Confluent offer a JDBC sink as well, so you could also choose to remove Spark

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search