I’m new to Kafka streaming. I setup a twitter listener using python and it is running in the localhost:9092 kafka server. I could consume the stream produced by the listener using a kafka client tool (conduktor) and also using the command "bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic twitter –from-beginning"
BUt when i try to consume the same stream using Spark Structured streaming, it is not capturing and throws the error – Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
Find the screenshot below
My Producer or listener code:
auth = tweepy.OAuthHandler("**********", "*************")
auth.set_access_token("*************", "***********************")
# session.set('request_token', auth.request_token)
api = tweepy.API(auth)
class KafkaPushListener(StreamListener):
def __init__(self):
#localhost:9092 = Default Zookeeper Producer Host and Port Adresses
self.client = pykafka.KafkaClient("0.0.0.0:9092")
#Get Producer that has topic name is Twitter
self.producer = self.client.topics[bytes("twitter", "ascii")].get_producer()
def on_data(self, data):
#Producer produces data for consumer
#Data comes from Twitter
self.producer.produce(bytes(data, "ascii"))
return True
def on_error(self, status):
print(status)
return True
twitter_stream = Stream(auth, KafkaPushListener())
twitter_stream.filter(track=['#fashion'])
Consumer access from Spark Structured streaming
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "twitter")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
2
Answers
Found what was missing, when I submitted the spark-job, I had to include the right dependency package version. I have spark 3.0.0 Therefore, I included - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 package
Add
sink
It will start consum data from kafka.Check below code.