I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create).
What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)
I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.
Please :
- I would like to know if my code is the reason why my script hangs.
- Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?
UPDATE
Using answers, below, still the threads seems to get stuck at the end.
import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)
#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
# logic for random password gen
# the most important funtion
#
def createAccounts(splitData, threadID):
batchProgress = 0
batch_size = splitData.shape[0]
for row in splitData.itertuples():
try:
headers = {"Content-Type": "application/json", "Authorization":"Bearer "+access_token}
randomLength = [8,9,12,13,16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
batchProgress+=1
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile" : {
"password": password,
"forceChangePasswordNextSignIn": False
},
"state":"",
"identities": [
{
"signInType": "emailAddress",
"issuer": tenantName,
"issuerAssignedId": row[2]
}
]
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if(len(row[4])):
post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]})
responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
status = responseFromApi.status_code
if(status == 201): #success
id = responseFromApi.json().get("id")
print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
elif(status == 401): #token expiry
print(f" Thread {threadID} | Token Expired. Getting it back !")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
getRefreshToken()
else: #any other error
msg = ""
try:
msg = responseFromApi.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
logger.error(errorDict)
except Exception as e:
# check for refresh token errors
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
logger.error(errorDict)
msg = " Error "
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")
batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
# get file name and appid from command line arguments
storageFileName = sys.argv[1]
appId = sys.argv[2]
# setup credentials
bigFilePath = f"./{storageFileName}"
CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
B2C_Tenant_Name = "tenantName"
tenantName = B2C_Tenant_Name + ".onmicrosoft.com"
applicationID = appId
accessSecret = "" # will be taken from command line in future revisions
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id" : applicationID,
"client_secret": accessSecret
}
# Get initial access token from MS
print("Connecting to MS Graph API")
token_api = "https://login.microsoftonline.com/"+tenantName+"/oauth2/v2.0/token"
response = {}
try:
responseFromApi = requests.post(token_api, data=token_api_body)
responseJson = responseFromApi.json()
print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
except Exception as e:
print("ERROR | Token auth failed ")
# if we get the token proceed else abort
if(responseFromApi.status_code == 200):
migrationData = pd.read_csv(bigFilePath)
print(" We got the data from Storage !", migrationData.shape[0])
global access_token
access_token = responseJson.get('access_token')
graph_api_create = "https://graph.microsoft.com/v1.0/users"
dataSetSize = migrationData.shape[0]
partitions = 50 # No of partitions # will be taken from command line in future revisions
size = int(dataSetSize/partitions) # No of rows per file
remainder = dataSetSize%partitions
print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} | Remainder: {remainder} | Start...... n")
##### ------- Dataset partioning.
datasets = []
range_val = partitions + 1 if remainder !=0 else partitions
for partition in range(range_val):
if(partition == partitions):
df = migrationData[size*partition:dataSetSize]
else:
df = migrationData[size*partition:size*(partition+1)]
datasets.append(df)
number_of_threads = len(datasets)
start_time = time.time()
spawned_threads = []
######## ---- Threads are spawned ! here --------
for i in range(number_of_threads): # spawn threads
t = threading.Thread(target=createAccounts, args=(datasets[i], i))
t.start()
spawned_threads.append(t)
number_spawned = len(spawned_threads)
print(f"Started {number_spawned} threads !")
###### - Threads are killed here ! ---------
for thread in spawned_threads: # let the script wait for thread execution
thread.join()
print(f"Done! It took {time.time() - start_time}s to execute") # time check
#### ------ Retry Mechanism -----
print("RETRYING....... !")
os.system(f'python3 retry.py pylogs.log {appId}')
else:
print(f"Token Missing ! API response {responseJson}")```
2
Answers
un-fair use of MS Graph
Due to possible throttling by the server, the usage of the MS Graph resource might be un-fair between threads. I use fair in the resource starvation sense.
One thread making a million calls can get a disproportionate amount of
429
responses each followed by a penalty of 150 seconds. This sleep doesn’t stop the other threads from making calls though and achieving forward progress.This would result in one thread lagging far behind the others and giving the appearance of being stuck.
Here’s a refactoring of your code to use the standard library
multiprocessing.ThreadPool
for simplicity.Naturally I couldn’t have tested it since I don’t have your data, but the basic idea should work. I removed the logging and retry stuff, since I really couldn’t understand why you’d need it (but feel free to add it back); this will attempt to retry each row if the problem appears to be transient.