skip to Main Content

I’m new to Kafka streaming. I setup a twitter listener using python and it is running in the localhost:9092 kafka server. I could consume the stream produced by the listener using a kafka client tool (conduktor) and also using the command "bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic twitter –from-beginning"
BUt when i try to consume the same stream using Spark Structured streaming, it is not capturing and throws the error – Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
Find the screenshot below

  1. Command output – Consumes Data
  2. Jupyter output for spark consumer – Doesn’t consume data

My Producer or listener code:

auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")
# session.set('request_token', auth.request_token)
api = tweepy.API(auth)
class KafkaPushListener(StreamListener):          
    def __init__(self):
        #localhost:9092 = Default Zookeeper Producer Host and Port Adresses
        self.client = pykafka.KafkaClient("0.0.0.0:9092")

    #Get Producer that has topic name is Twitter
        self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()

    def on_data(self, data):
        #Producer produces data for consumer
        #Data comes from Twitter
        self.producer.produce(bytes(data, "ascii"))
        return True

    def on_error(self, status):
        print(status)
        return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])

Consumer access from Spark Structured streaming

df = spark 
  .readStream 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("subscribe", "twitter") 
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

2

Answers


  1. Chosen as BEST ANSWER

    Found what was missing, when I submitted the spark-job, I had to include the right dependency package version. I have spark 3.0.0 Therefore, I included - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 package


  2. Add sink It will start consum data from kafka.

    Check below code.

    df = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("subscribe", "twitter") 
      .load()
    
    query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
        .writeStream 
        .outputMode("append") 
        .format("console")  # here I am using console format .. you may change as per your requirement.
        .start()
    
    query.awaitTermination()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search