skip to Main Content

I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create).
What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)

I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.

Please :

  1. I would like to know if my code is the reason why my script hangs.
  2. Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?

UPDATE
Using answers, below, still the threads seems to get stuck at the end.

import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)

#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
    # logic for random password gen

# the most important funtion
#

def createAccounts(splitData, threadID):
            batchProgress = 0
            batch_size = splitData.shape[0]
            for  row in splitData.itertuples():
                try:
                    headers = {"Content-Type": "application/json", "Authorization":"Bearer "+access_token}  
                    randomLength = [8,9,12,13,16] 
                    passwordLength = random.choice(randomLength) 
                    password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
                    batchProgress+=1
                    post_request_body = {
                                "accountEnabled": True,
                                "displayName": row[5],
                                "givenName": row[3],
                                "surname": row[4],
                                "mobilePhone": row[1],
                                "mail": row[2],
                                "passwordProfile" : {
                                    "password": password,
                                    "forceChangePasswordNextSignIn": False
                                },
                                "state":"",
                                "identities": [
                                    {
                                        "signInType": "emailAddress",
                                        "issuer": tenantName,
                                        "issuerAssignedId": row[2]
                                    }
                                ]
                            }
                    # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty       
                    if(len(row[4])):
                        post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]}) 
                    responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
                    status = responseFromApi.status_code
                    if(status == 201): #success
                        id =  responseFromApi.json().get("id")
                        print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
                        errorDict =  f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
                    elif(status == 429): #throttling issues
                        print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
                        time.sleep(150)  
                    elif(status == 401): #token expiry
                        print(f"  Thread {threadID} | Token Expired. Getting it back !")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
                        getRefreshToken()      
                    else:   #any other error
                        msg = ""
                        try:
                            msg = responseFromApi.json().get("error").get("message")
                        except Exception as e:
                            msg = f"Error {e}"
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
                        print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}")
                    logger.error(errorDict)    
                except Exception as e:
                    # check for refresh token errors
                    errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
                    logger.error(errorDict)
                    msg = " Error "
                    print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}") 
            print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")         
            batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
    # get file name and appid from command line arguments
    storageFileName = sys.argv[1]
    appId = sys.argv[2]
    # setup credentials
    bigFilePath = f"./{storageFileName}"
    CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
    B2C_Tenant_Name = "tenantName"
    tenantName =  B2C_Tenant_Name + ".onmicrosoft.com"
    applicationID = appId
    accessSecret = "" # will be taken from command line in future revisions
    token_api_body = {    
        "grant_type": "client_credentials",
        "scope": "https://graph.microsoft.com/.default",
        "client_Id" : applicationID,
        "client_secret": accessSecret
    }
    # Get initial access token from MS
    print("Connecting to MS Graph API")
    token_api = "https://login.microsoftonline.com/"+tenantName+"/oauth2/v2.0/token"
    response = {}
    try:
        responseFromApi = requests.post(token_api, data=token_api_body)
        responseJson = responseFromApi.json()
        print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
    except Exception as e:
        print("ERROR | Token auth failed ")
    # if we get the token proceed else abort
    if(responseFromApi.status_code == 200):
        migrationData = pd.read_csv(bigFilePath)
        print(" We got the data from Storage !", migrationData.shape[0])

        global access_token
        access_token = responseJson.get('access_token')
        graph_api_create = "https://graph.microsoft.com/v1.0/users"
        dataSetSize = migrationData.shape[0]

        partitions = 50 # No of partitions # will be taken from command line in future revisions
        size = int(dataSetSize/partitions) # No of rows per file
        remainder = dataSetSize%partitions
        print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} |  Remainder: {remainder} | Start...... n")
        ##### ------- Dataset partioning. 
        datasets = []
        range_val = partitions + 1 if remainder !=0 else partitions
        for partition in range(range_val):
            if(partition == partitions):
                df = migrationData[size*partition:dataSetSize]
            else:
                df = migrationData[size*partition:size*(partition+1)]
            datasets.append(df)
        number_of_threads = len(datasets)
        start_time = time.time()    
        spawned_threads = []
######## ---- Threads are spawned ! here --------
        for i in range(number_of_threads): # spawn threads
            t = threading.Thread(target=createAccounts, args=(datasets[i], i))
            t.start()
            spawned_threads.append(t)
        number_spawned = len(spawned_threads)  
        print(f"Started {number_spawned} threads !")   
       ###### - Threads are killed here ! ---------  
        for thread in spawned_threads: # let the script wait for thread execution
            thread.join()
        print(f"Done! It took {time.time() - start_time}s to execute")  # time check
    #### ------ Retry Mechanism -----
        print("RETRYING....... !")
        os.system(f'python3 retry.py pylogs.log {appId}')
    else:
        print(f"Token Missing ! API response {responseJson}")```



2

Answers


  1. un-fair use of MS Graph

    Due to possible throttling by the server, the usage of the MS Graph resource might be un-fair between threads. I use fair in the resource starvation sense.

    elif(status == 429): #throttling issues
        print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
        time.sleep(150)
    

    One thread making a million calls can get a disproportionate amount of 429 responses each followed by a penalty of 150 seconds. This sleep doesn’t stop the other threads from making calls though and achieving forward progress.

    This would result in one thread lagging far behind the others and giving the appearance of being stuck.

    Login or Signup to reply.
  2. Here’s a refactoring of your code to use the standard library multiprocessing.ThreadPool for simplicity.

    Naturally I couldn’t have tested it since I don’t have your data, but the basic idea should work. I removed the logging and retry stuff, since I really couldn’t understand why you’d need it (but feel free to add it back); this will attempt to retry each row if the problem appears to be transient.

    import random
    import sys
    import time
    from multiprocessing.pool import ThreadPool
    
    import pandas as pd
    import requests
    
    sess = requests.Session()
    
    # globals filled in by `main`
    tenantName = None
    access_token = None
    
    
    
    def submit_user_create(row):
        headers = {"Content-Type": "application/json", "Authorization": "Bearer " + access_token}
        randomLength = [8, 9, 12, 13, 16]
        passwordLength = random.choice(randomLength)
        password = generateRandomPassword(passwordLength)  # will be generated randomly - for debugging purpose
        post_request_body = {
            "accountEnabled": True,
            "displayName": row[5],
            "givenName": row[3],
            "surname": row[4],
            "mobilePhone": row[1],
            "mail": row[2],
            "passwordProfile": {"password": password, "forceChangePasswordNextSignIn": False},
            "state": "",
            "identities": [{"signInType": "emailAddress", "issuer": tenantName, "issuerAssignedId": row[2]}],
        }
        # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
        if len(row[4]):
            post_request_body["identities"].append({"signInType": "phoneNumber", "issuer": tenantName, "issuerAssignedId": row[1]})
        return sess.post("https://graph.microsoft.com/v1.0/users", headers=headers, json=post_request_body)
    
    
    def get_access_token(tenantName, applicationID, accessSecret):
        token_api_body = {
            "grant_type": "client_credentials",
            "scope": "https://graph.microsoft.com/.default",
            "client_Id": applicationID,
            "client_secret": accessSecret,
        }
        token_api = f"https://login.microsoftonline.com/{tenantName}/oauth2/v2.0/token"
        resp = sess.post(token_api, data=token_api_body)
        if resp.status_code != 200:
            raise RuntimeError(f"Token Missing ! API response {resp.content}")
        json = resp.json()
        print(f"Token API Success ! Expires in {json.get('expires_in')} seconds")
        return json["access_token"]
    
    
    def process_row(row):
        while True:
            response = submit_user_create(row)
            status = response.status_code
    
            if status == 201:  # success
                id = response.json().get("id")
                print(f"Success {id}")
                return True
    
            if status == 429:  # throttling issues
                print(f"Throttled by server ! Sleeping for 150 seconds")
                time.sleep(150)
                continue
    
            if status == 401:  # token expiry?
                print(f"Token Expired. Getting it back !")
                getRefreshToken()  # TODO
                continue
    
            try:
                msg = response.json().get("error").get("message")
            except Exception as e:
                msg = f"Error {e}"
            print(f" {status} | {msg}  {row[2]}")
            return False
    
    
    def main():
        global tenantName, access_token
        # get file name and appid from command line arguments
        bigFilePath = sys.argv[1]
        appId = sys.argv[2]
        # setup credentials
        B2C_Tenant_Name = "tenantName"
        tenantName = f"{B2C_Tenant_Name}.onmicrosoft.com"
        accessSecret = ""  # will be taken from command line in future revisions
        access_token = get_access_token(tenantName, appId, accessSecret)
        migrationData = pd.read_csv(bigFilePath)
        start_time = time.time()
        with ThreadPool(10) as pool:
            for i, result in enumerate(pool.imap_unordered(process_row, migrationData.itertuples()), 1):
                progress = i / len(migrationData) * 100
                print(f"{i} / {len(migrationData)} | {progress:.2f}% | {time.time() - start_time:.2f} seconds")
    
        print(f"Done! It took {time.time() - start_time}s to execute")
    
    
    if __name__ == "__main__":
        main()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search