What’s the best way to use the Twitter Stream API with Python to collect tweets in a large area?
I’m interested in geolocation, particularly the nationwide collection of tweets in North America. I’m currently using Python and Tweepy to dump tweets from the Twitter streaming API into a MongoDB database.
I’m currently using the API’s location-filter to pull tweets within a boundary box, and then I further filter to only store tweets with coordinates. I’ve found that if my boundary box is large enough, I run into a Python connection error:
raise ProtocolError('Connection broken: %r' % e, e)
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))
I’ve made the bounding box smaller (I’ve successfully tried NYC and NYC + New England), but it seemms like the error returns with a large enough bounding box. I’ve also tried threading with the intention of running multiple StreamListeners concurrently, but I don’t think the API allows this (I’m getting 420 errors), or at least not in the manner that I’m attempting.
I’m using Tweepy to set up a custom StreamListener
class:
class MyListener(StreamListener):
"""Custom StreamListener for streaming data."""
# def __init__(self):
def on_data(self, data):
try:
db = pymongo.MongoClient(config.db_uri).twitter
col = db.tweets
decoded_json = json.loads(data)
geo = str(decoded_json['coordinates'])
user = decoded_json['user']['screen_name']
if geo != "None":
col.insert(decoded_json)
print("Geolocated tweet saved from user %s" % user)
else: print("No geo data from user %s" % user)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
time.sleep(5)
return True
def on_error(self, status):
print(status)
return True
This is what my Thread
class looks like:
class myThread(threading.Thread):
def __init__(self, threadID, name, streamFilter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.streamFilter = streamFilter
def run(self):
print("Starting " + self.name)
#twitter_stream.filter(locations=self.streamFilter)
Stream(auth, MyListener()).filter(locations=self.streamFilter)
And main
:
if __name__ == '__main__':
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
twitter_stream = Stream(auth, MyListener())
# Bounding boxes:
northeast = [-78.44,40.88,-66.97,47.64]
texas = [-107.31,25.68,-93.25,36.7]
california = [-124.63,32.44,-113.47,42.2]
northeastThread = myThread(1,"ne-thread", northeast)
texasThread = myThread(2,"texas-thread", texas)
caliThread = myThread(3,"cali-thread", california)
northeastThread.start()
time.sleep(5)
texasThread.start()
time.sleep(10)
caliThread.start()
2
Answers
There is nothing bad or unusual about getting a
ProtocolError
. Connections do break from time to time. You should catch this error in your code and simply restart the stream. All will be good.BTW, I noticed you are interrogating the
geo
field which has been deprecated. The field you want iscoordinates
. You might also findplaces
useful.(The Twitter API docs say multiple streaming connections are not allowed.)
It seems twitter allocates one block of tweets when you try to search a keyword in a big geo (let’s say country or city). I think this can be overcome by running multiple streams of program concurrently, but as separate programs.