skip to Main Content

I am very new to python so I’m looking for help with this problem. My goal is to collect roughly 10,000 tweets that contain images and save it into a csv file. Since Twitter’s rate limit is 450 requests per 15mins, ideally I want to automate this process. The guides I’ve seen only have used the tweepy module but since I didn’t quite understand it so I’ve used the sample python code given on Twitter:

import requests
import pandas as pd
import os
import json

# To set your enviornment variables in your terminal run the following line:
os.environ['BEARER_TOKEN']=''


def auth():
    return os.environ.get("BEARER_TOKEN")


def create_url():
    query = "has:images lang:en -is:retweet"
    tweet_fields = "tweet.fields=attachments,created_at,author_id"
    expansions = "expansions=attachments.media_keys"
    media_fields = "media.fields=media_key,preview_image_url,type,url"
    max_results = "max_results=100"
    url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
        query, tweet_fields, expansions, media_fields, max_results
    )
    return url


def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

def save_json(file_name, file_content):
    with open(file_name, 'w', encoding='utf-8') as write_file:
        json.dump(file_content, write_file, sort_keys=True, ensure_ascii=False, indent=4)

def main():
    bearer_token = auth()
    url = create_url()
    headers = create_headers(bearer_token)
    json_response = connect_to_endpoint(url, headers)
    
    #Save the data as a json file
    #save_json('collected_tweets.json', json_response)
    
    #save tweets as csv
    #df = pd.json_normalize(data=json_response)
    df1 = pd.DataFrame(json_response['data'])
    df1.to_csv('tweets_data.csv', mode="a")
    df2 = pd.DataFrame(json_response['includes'])
    df2.to_csv('tweets_includes_media.csv', mode="a")
    print(json.dumps(json_response['meta'], sort_keys=True, indent=4))

if __name__ == "__main__":
    main()

How should I alter this code such that it will loop within Twitter’s v2 rate limits or would it be better to use tweepy instead?

As a side note, I do realise my code to save as csv has issues but this is the best I can do right now.

2

Answers


  1. Try using a scheduler:

    import sched
    import time
    
    scheduler = sched.scheduler(time.time, time.sleep)
    scheduler.enter(delay=16 * 60, priority=1, action=connect_to_endpoint)
    

    delay

    is the amount of time between two events.

    action

    is the method to execute every 16 minutes (in this example).

    Consider the exact time and the exact method to repeat.

    Login or Signup to reply.
  2. There are a couple of things to keep in mind here.

    • Tweepy has not been updated to use the new version of Twitter’s API (V2), so what you will find most of the time on Twitter’s documentation may not correspond to what Tweepy has to offer. Tweepy still works very well with V1, however, some of the tweet matching functionality may be different, you just need to be careful.
    • Given the goal you mentioned, it’s not clear that you want to use the Recent Search endpoint. For example, it may be easier to start a 1% stream using the sample stream. Here is Twitter’s example code for that endpoint. The major benefit of this is that you could run it in "the background" (see note below) with a conditional that kills the process once you’ve collected 10k tweets. That way, you would not need to worry about hitting a tweet limit – Twitter limits you by default to only ~1% of the volume of your query (in your case, "has:images lang:en -is:retweet") and just gathers those tweets in real-time. If you are trying to get the full record of non-retweet, English tweets between two periods of time, you will need to add those points in time to your query and then manage the limits as you requested above. Check out start_time and end_time in the API reference docs.

    Note: To run a script in the background, write your program, then execute it with nohup nameofstreamingcode.py > logfile.log 2>&1 & from the terminal. Any normal terminal output (i.e. print lines and/or errors) would be written to a new file called logfile.log, and the & at the very end of the command makes the process run in the background (so you can close your terminal and come back to it later).

    • To use the Recent Search endpoint you want to add a good amount to your connect_to_endpoint(url, headers) function. Also, you can use another function pause_until, written for a Twitter V2 API package I am in the process of developing (link to function code).
    def connect_to_endpoint(url, headers):
        response = requests.request("GET", url, headers=headers)
    
        # Twitter returns (in the header of the request object) how many
        # requests you have left. Lets use this to our advantage
        remaining_requests = int(response.headers["x-rate-limit-remaining"])
        
        # If that number is one, we get the reset-time
        #   and wait until then, plus 15 seconds (your welcome Twitter).
        # The regular 429 exception is caught below as well,
        #   however, we want to program defensively, where possible.
        if remaining_requests == 1:
            buffer_wait_time = 15
            resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
            print(f"Waiting on Twitter.ntResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer
    
        # We still may get some weird errors from Twitter.
        # We only care about the time dependent errors (i.e. errors
        #   that Twitter wants us to wait for).
        # Most of these errors can be solved simply by waiting
        #   a little while and pinging Twitter again - so that's what we do.
        if response.status_code != 200:
    
            # Too many requests error
            if response.status_code == 429:
                buffer_wait_time = 15
                resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
                print(f"Waiting on Twitter.ntResume Time: {resume_time}")
                pause_until(resume_time)  ## Link to this code in above answer
    
            # Twitter internal server error
            elif response.status_code == 500:
                # Twitter needs a break, so we wait 30 seconds
                resume_time = datetime.now().timestamp() + 30
                print(f"Waiting on Twitter.ntResume Time: {resume_time}")
                pause_until(resume_time)  ## Link to this code in above answer
    
            # Twitter service unavailable error
            elif response.status_code == 503:
                # Twitter needs a break, so we wait 30 seconds
                resume_time = datetime.now().timestamp() + 30
                print(f"Waiting on Twitter.ntResume Time: {resume_time}")
                pause_until(resume_time)  ## Link to this code in above answer
    
            # If we get this far, we've done something wrong and should exit
            raise Exception(
                "Request returned an error: {} {}".format(
                    response.status_code, response.text
                )
            )
    
        # Each time we get a 200 response, lets exit the function and return the response.json
        if response.ok:
            return response.json()
    

    Since the full query result will be much larger than the 100 tweets you are requesting at each query, you need to keep track of your location in the larger query. This is done via a next_token.

    To get the next_token, it’s actually quite easy. Simply grab it from the meta field in the response. To be clear, you can use the above function like so…

    # Get response
    response = connect_to_endpoint(url, headers)
    # Get next_token
    next_token = response["meta"]["next_token"]
    

    Then this token needs to be passed in the query details, which are contained in the url you create with your create_url() function. That means you’ll also need to update your create_url() function to something like the below…

    def create_url(pagination_token=None):
        query = "has:images lang:en -is:retweet"
        tweet_fields = "tweet.fields=attachments,created_at,author_id"
        expansions = "expansions=attachments.media_keys"
        media_fields = "media.fields=media_key,preview_image_url,type,url"
        max_results = "max_results=100"
        if pagination_token == None:
            url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
                query, tweet_fields, expansions, media_fields, max_results
            )
        else:
            url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}&{}".format(
                query, tweet_fields, expansions, media_fields, max_results, pagination_token
            )
        return url
    

    After altering the above functions your code should flow in the following manner.

    1. Make a request
    2. Get next_token from response["meta"]["next_token"]
    3. Update query parameter to include next_token with create_url()
    4. Rinse and repeat until either:
      1. You get to 10k tweets
      2. The query stops

    Final note: I would not try to work with pandas dataframes to write your file. I would create an empty list, append the results from each new query to that list, and then write the final list of dictionary objects to a json file (see this question for details). I’ve learned the hard way that raw tweets and pandas dataframes don’t really play nice. Much better to get used to how json objects and dictionaries work.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search