skip to Main Content

What’s the best way to use the Twitter Stream API with Python to collect tweets in a large area?

I’m interested in geolocation, particularly the nationwide collection of tweets in North America. I’m currently using Python and Tweepy to dump tweets from the Twitter streaming API into a MongoDB database.

I’m currently using the API’s location-filter to pull tweets within a boundary box, and then I further filter to only store tweets with coordinates. I’ve found that if my boundary box is large enough, I run into a Python connection error:

raise ProtocolError('Connection broken: %r' % e, e)
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

I’ve made the bounding box smaller (I’ve successfully tried NYC and NYC + New England), but it seemms like the error returns with a large enough bounding box. I’ve also tried threading with the intention of running multiple StreamListeners concurrently, but I don’t think the API allows this (I’m getting 420 errors), or at least not in the manner that I’m attempting.

I’m using Tweepy to set up a custom StreamListener class:

class MyListener(StreamListener):
    """Custom StreamListener for streaming data."""

    # def __init__(self):

    def on_data(self, data):
        try:
            db = pymongo.MongoClient(config.db_uri).twitter
            col = db.tweets

            decoded_json = json.loads(data)
            geo = str(decoded_json['coordinates'])
            user = decoded_json['user']['screen_name']

            if geo != "None":
                col.insert(decoded_json)
                print("Geolocated tweet saved from user %s" % user)
            else: print("No geo data from user %s" % user)
            return True         

        except BaseException as e:
            print("Error on_data: %s" % str(e))
            time.sleep(5)
        return True

    def on_error(self, status):
        print(status)
        return True

This is what my Thread class looks like:

class myThread(threading.Thread):
    def __init__(self, threadID, name, streamFilter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.streamFilter = streamFilter

    def run(self):
        print("Starting " + self.name)
        #twitter_stream.filter(locations=self.streamFilter)
        Stream(auth, MyListener()).filter(locations=self.streamFilter)

And main:

if __name__ == '__main__':

    auth = OAuthHandler(config.consumer_key, config.consumer_secret)
    auth.set_access_token(config.access_token, config.access_secret)
    api = tweepy.API(auth)

    twitter_stream = Stream(auth, MyListener())

    # Bounding boxes:
    northeast = [-78.44,40.88,-66.97,47.64]
    texas = [-107.31,25.68,-93.25,36.7]
    california = [-124.63,32.44,-113.47,42.2]


    northeastThread = myThread(1,"ne-thread", northeast)
    texasThread = myThread(2,"texas-thread", texas)
    caliThread = myThread(3,"cali-thread", california)

    northeastThread.start()
    time.sleep(5)
    texasThread.start()
    time.sleep(10)
    caliThread.start()

2

Answers


  1. There is nothing bad or unusual about getting a ProtocolError. Connections do break from time to time. You should catch this error in your code and simply restart the stream. All will be good.

    BTW, I noticed you are interrogating the geo field which has been deprecated. The field you want is coordinates. You might also find places useful.

    (The Twitter API docs say multiple streaming connections are not allowed.)

    Login or Signup to reply.
  2. It seems twitter allocates one block of tweets when you try to search a keyword in a big geo (let’s say country or city). I think this can be overcome by running multiple streams of program concurrently, but as separate programs.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search