I have successfully created a mariadb database connection using debezium and kafka
When I tried to stream the topic using pyspark
this is the output that I get
-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------------------------------------------------------------------------------------------------------------------------+
|key |value |
+------+--------------------------------------------------------------------------------------------------------------------------+
||MaxDoe1.4.2.Finalnmysqlmariadbbtruebasecampemployees mysql-bin.000032�r�ȯݭd |
||bJanebMary1.4.2.Finalnmysqlmariadbbtruebasecampemployees mysql-bin.000032�r�ȯݭd |
||nAliceJohnson1.4.2.Finalnmysqlmariadbblastbasecampemployees mysql-bin.000032�r�ȯݭd |
||MaxDoebMaxxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�bu���߭d|
||bMaxxDoeMaxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�ru��߭d |
||bMaxxDoeMaxDoe1.4.2.Finalnmysqlmariadb���߭dnfalsebasecampemployees mysql-bin.000032�ru����d|
+------+--------------------------------------------------------------------------------------------------------------------------+
would this be a problem when I want to load/stream this data to the target database, basically what Im trying to do is mariadb database -> kafka-debezium -> pyspark -> mariadb database
this is my code that I got my output with:
#pysparkkafka1
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import traceback
import time
try:
# Initialize the Spark session
spark = SparkSession.builder
.appName("Kafka Spark Integration")
.getOrCreate()
print("Spark session started")
# Define Kafka topic and server
kafka_topic = "mariadb.basecamp.employees"
kafka_server = "kafka:9092" # Replace with your Kafka server address if different
# Print the Kafka topic
print(f"Reading from Kafka topic: {kafka_topic}")
# Read from the Kafka topic
df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", kafka_topic)
.option("startingOffsets", "earliest")
.load()
print("DataFrame created from Kafka topic")
# Select key and value and convert from bytes to string
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("DataFrame transformed")
# Display the dataframe in console
query = df.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
print("Query started")
# Timeout for termination
timeout = 60 # 1 minute timeout
start_time = time.time()
while time.time() - start_time < timeout:
if query.isActive:
print("Streaming...")
time.sleep(10) # Check every 10 seconds
else:
break
query.stop()
print("Query stopped")
except Exception as e:
print("An error occurred:", e)
traceback.print_exc()
finally:
spark.stop()
print("Spark session stopped")
this is my docker compose
version: '3.8'
services:
mariadb:
image: mariadb:10.5
restart: always
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: mydatabase
MYSQL_USER: root
MYSQL_PASSWORD: password
volumes:
- ./mariadb-config:/etc/mysql/mariadb.conf.d
- D:mariakafkamy.cnf:/etc/mysql/my.cnf
- ./mysql:/var/lib/mysql
ports:
- "3306:3306"
phpmyadmin:
image: phpmyadmin
restart: always
ports:
- 8080:80
environment:
- PMA_ARBITRARY=1
- UPLOAD_LIMIT=2G
- MEMORY_LIMIT=2G
postgres:
image: debezium/postgres:13
restart: always
volumes:
- ./postgres:/var/lib/postgresql/data
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=exampledb
pgadmin:
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: root
ports:
- "5050:80"
depends_on:
- postgres
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-server:5.5.1
restart: always
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- "9092:9092"
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
restart: always
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8087,http://localhost:8087
ports:
- 8087:8087
depends_on: [zookeeper, kafka]
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8087
OFFSET_FLUSH_INTERVAL_MS: 60000
OFFSET_FLUSH_TIMEOUT_MS: 5000
SHUTDOWN_TIMEOUT: 10000
ports:
- 8083:8083
depends_on:
- kafka
- schema-registry
volumes:
- D:mariakafkakafkaconfig:/kafka/config
- D:/mariakafka/mysql-connector:/usr/share/java/kafka-connect
pyspark:
image: jupyter/pyspark-notebook:latest
ports:
- "8888:8888"
environment:
- PYSPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --jars /home/jovyan/work/mysql-connector-j-9.0.0.jar pyspark-shell
volumes:
- D:/mariakafka/pyspark:/home/jovyan/work
- D:/mysql-connector-j-9.0.0.jar:/home/jovyan/work/mysql-connector-j-9.0.0.jar
depends_on:
- kafka
volumes:
postgres_data:
mariadb_data:
networks:
flink-network:
driver: bridge
This is what I was hoping to see, I got this output using the schema registry container
docker exec -it 3179874d15c23934fc55b841a5650d6e07a33a72cbdd74de308615a0c11c45e0 bash
and then
kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic mariadb.basecamp.employees --from-beginning --property schema.registry.url=http://schema-registry:8087
But I dont understand replicate it in pyspark
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623364},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":2,"first_name":"Jane","last_name":"Mary"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623367},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":3,"first_name":"Alice","last_name":"Johnson"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"last"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623369},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724126944000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":549,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724126944214},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127006261},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127912824},"transaction":null}
2
Answers
This was the code I use to deserialize using avro
You’re casting the data in Spark as a String, not deserializing to Avro, or using the Schema Registry as part of your Spark consumer. Therefore you’ll get unparsable UTF8 strings.
You could fix your Debezium config to use JsonConverter rather than Avro, otherwise you’ll have to write a UDF function to deserialize the Avro appropriately. Integrating Spark Structured Streaming with the Confluent Schema Registry
Debezium or Confluent offer a JDBC sink as well, so you could also choose to remove Spark