skip to Main Content

i want to send this simple pyspark dataframe to kafka and i done everything but i always get an error. i try it with simple python producer script and it works and the pyspark read stream work my only problem is with the write to kafka using pyspark. can anyone help me please.

from pyspark.sql import SparkSession
import os 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

# Create a SparkSession
spark = SparkSession.builder 
    .appName("KafkaSinkExample") 
    .getOrCreate()

# Sample data
data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
df = spark.createDataFrame(data, ["key", "value"])

# Write data to Kafka
(df.write 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "broker:29092") 
    .option("topic", "test") 
    .save())

This is my docker-compose file:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  pyspark:
    image: jupyter/pyspark-notebook:latest
    container_name: pyspark
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_PYTHON=python3
      - PYSPARK_DRIVER_PYTHON=python3
      - SPARK_HOME=/usr/local/spark
    volumes:
      - ./notebooks:/home/jovyan/work # Mount your notebooks folder or adjust as needed


the Error that i get:

23/12/25 16:50:56 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):                                                                                                                                                                           (0 + 1) / 2]
  File "/home/jovyan/preprocessing/bing.py", line 23, in <module>
    .save())
     ^^^^^^
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (0f08baeba383 executor driver): java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$4()'

2

Answers


  1. i had the same problem if you use the latest pyspark version 3.5.0 you should edit your package to

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
    
    Login or Signup to reply.
  2. os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell"
    

    This variables defines the dependency for structured streaming and it will be downloaded from maven central repository when you build spark session after declaring this variable.This incudes 12 jar files

    This sometime doesn’t work due to firewall restrictions and internet connectivity.

    C:Users<user_name>.ivy2jars>dir
     Volume in drive C is Windows
     Volume Serial Number is 9AC2-8000
    
     Directory of C:Users<user_name>.ivy2jars
    
    03-02-2024  02:28    <DIR>          .
    03-02-2024  00:57    <DIR>          ..
    10-07-2014  17:25            33,031 com.google.code.findbugs_jsr305-3.0.0.jar
    17-05-2013  01:35            62,050 commons-logging_commons-logging-1.1.3.jar
    14-08-2021  19:29           145,516 org.apache.commons_commons-pool2-2.11.1.jar
    29-07-2022  19:06        19,458,635 org.apache.hadoop_hadoop-client-api-3.3.4.jar
    29-07-2022  19:09        30,085,504 org.apache.hadoop_hadoop-client-runtime-3.3.4.jar
    26-05-2023  07:43         5,050,443 org.apache.kafka_kafka-clients-3.4.1.jar
    09-09-2023  13:41           432,335 org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar
    09-09-2023  13:42           141,000 org.apache.spark_spark-streaming-kafka-0-10_2.12-3.5.0.jar
    20-01-1970  20:07            56,808 org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar
    19-06-2021  12:01           682,804 org.lz4_lz4-java-1.8.0.jar
    18-03-2023  01:06            63,635 org.slf4j_slf4j-api-2.0.7.jar
    21-07-2023  23:12         2,058,806 org.xerial.snappy_snappy-java-1.1.10.3.jar
                  12 File(s)     58,270,567 bytes
                   2 Dir(s)  161,241,489,408 bytes free
    

    it is not possible to do structured streaming without building spark session with these jar dependencies ,Basically building spark session with the "PYSPARK_SUBMIT_ARGS" variable makes these dependencies as user defined dependency . If you want to make it as default dependency you need to download all these jars from maven central repo and you have to paste in spark’s default jar path

    <path >spark-3.5.0jars
    

    if you pasted all these dependencies in the spark’s default jars path then you don’t need to explicitly mention the dependencies using "PYSPARK_SUBMIT_ARGS" variables.

    Once you did that you can easily do structured streaming without using "PYSPARK_SUBMIT_ARGS" and downloading these jars from internet everytime

    from pyspark.sql import SparkSession
    spark = SparkSession 
        .builder 
        .appName("Sample Ingestion") 
        .master("local[2]").getOrCreate()  # this get or create is must or else the program will throw error
    df = spark.read.csv('datasets/apple_quality.csv',header=True,escape=""")
    kafka_topic_name="ingestionSpeedTest"
    query = df 
      .selectExpr("CAST(A_id AS STRING) AS key", "to_json(struct(*)) AS value") 
      .write 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("topic", kafka_topic_name) 
      .save()
    print("Sucessfully Ingested :)")
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search