I’m trying to submit around 150 million jobs to celery using the following code:
from celery import chain
from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
if __name__ == '__main__':
fh = open(urls,'r')
alldat = fh.readlines()
fh.close()
for line in alldat:
try:
result = chain(get_url.s(line[:-1]),do_work.s(line[:-1])).apply_async()
except:
print ("failed to submit job")
print('task submitted ' + str(line[:-1]))
Would it be faster to split the file into chunks and run multiple instances of this code? Or what can I do? I’m using memcached as the backend, rabbitmq as the broker.
2
Answers
If I understand your problem correctly:
so you have two issues:
Regarding the main for loop in your code, yes you could do that faster if you use multithreading, especially if you are using multicore cpu. Your master thread could read the file and pass chunks of it to sub-threads that will be creating the celery tasks.
Check the guide and the documentation:
https://realpython.com/intro-to-python-threading/
https://docs.python.org/3/library/threading.html
And now let’s imagine you have 1 worker that is receiving these tasks. The code will generate 150M new tasks that will be pushed to the queue. Each chain will be a chain of get_url(), and do_work(), the next chain will run only when do_work() finishes.
If get_url() takes a short time and do_work() takes a long time, it will be a series of quick-task, slow-task, and the total time:
If you have n workers
Now if get_url() is time critical while do_work() is not, then, if you can, you should run all 150M get_url() first and when that is done run all 150M do_work(), but that may require changes to your process design.
That is what I would do. Maybe others have better ideas!?