skip to Main Content

I just started the testing of Kafka Stream to Spark using Pyspark library.

I have been running the whole setup on Jupyter Notebook.
I am trying to get data from the Twitter Streaming.

Twitter Streaming Code:

import json
import tweepy
from uuid import uuid4
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer

auth = tweepy.OAuthHandler("key", "key")
auth.set_access_token("token", "token")
api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
                 retry_errors=set([401, 404, 500, 503]))
class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

    def on_data(self, tweet):
        print tweet
        # Kafka Producer to send data to twitter topic
        producer.send('twitter', json.dumps(tweet))

    def on_error(self, status_code):
        print status_code
        return True # Don't kill the stream

    def on_timeout(self):
        print 'on_timeout'
        return True # Don't kill the stream
producer = KafkaProducer(bootstrap_servers='localhost:9092')
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["#party"])

Spark Streaming Code

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
sc.setLogLevel("WARN")

streaming_context = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})  
parsed = kafkaStream.map(lambda v: v)
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

streaming_context.start()
streaming_context.awaitTermination()

Output printed:

Time: 2017-09-30 11:21:00


Time: 2017-09-30 11:21:10


Time: 2017-09-30 11:21:20

What particular part I am doing wrong?

2

Answers


  1. You can debug application using following two steps.

    1) Use sample consumer like KafkaWordCount to test if data comes in (Kafka topic has message or not)

    Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.

    Run the producer and then type a few messages into the console to send to the server.

         kafka-console-producer.sh 
        --broker-list <brokeer list> 
        --topic <topic name> 
        --property parse.key=true 
        --property key.separator=, 
        --new-producer  
    

    Example :

       > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    If you see printing message then you have message in kafka, if not not then your producer is not working

    2) Turn on logging

      Logger.getLogger("org").setLevel(Level.WARNING);
      Logger.getLogger("akka").setLevel(Level.WARNING);       
      Logger.getLogger("kafka").setLevel(Level.WARNING);
    
    Login or Signup to reply.
  2. You can also use some GUI tool like Kafdrop. Its something that came in very handy while debugging kafka messages. You can not only look into the message queues, but also the partition their offsets etc.

    Its a good tool and you should be able to deploy it easily.

    here is the link: https://github.com/HomeAdvisor/Kafdrop

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search