skip to Main Content

I’m trying to submit around 150 million jobs to celery using the following code:

from celery import chain

from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'

if __name__ == '__main__':
    fh = open(urls,'r')
    alldat = fh.readlines()
    fh.close()
    for line in alldat:
        try:
            result = chain(get_url.s(line[:-1]),do_work.s(line[:-1])).apply_async()
        except:
            print ("failed to submit job")
        print('task submitted ' + str(line[:-1]))

Would it be faster to split the file into chunks and run multiple instances of this code? Or what can I do? I’m using memcached as the backend, rabbitmq as the broker.

2

Answers


  1. Chosen as BEST ANSWER
    import multiprocessing
    
    from celery import chain
    
    from .task_receiver import do_work,handle_results,get_url
    urls = '/home/ubuntu/celery_main/urls'
    num_workers = 200
    
    def worker(urls,id):
        """worker function"""
        for url in urls:
            print ("%s - %s" % (id,url))
            result = chain(get_url.s(url),do_work.s(url)).apply_async() 
        return
    
    if __name__ == '__main__':
        fh = open(urls,'r')
        alldat = fh.readlines()
        fh.close()
        jobs = []
        stack = []
        id = 0
        for i in alldat:
            if (len(stack) < len(alldat) / num_workers):
               stack.append(i[:-1])
               continue
            else:
                id = id + 1
                p = multiprocessing.Process(target=worker, args=(stack,id,))
                jobs.append(p)
                p.start()
                stack = []
    
        for j in jobs:
            j.join()
    

  2. If I understand your problem correctly:

    1. you have a list of 150M urls
    2. you want to run get_url() then do_work() on each of the urls

    so you have two issues:

    1. going over the 150M urls
    2. queuing the tasks

    Regarding the main for loop in your code, yes you could do that faster if you use multithreading, especially if you are using multicore cpu. Your master thread could read the file and pass chunks of it to sub-threads that will be creating the celery tasks.

    Check the guide and the documentation:

    https://realpython.com/intro-to-python-threading/

    https://docs.python.org/3/library/threading.html

    And now let’s imagine you have 1 worker that is receiving these tasks. The code will generate 150M new tasks that will be pushed to the queue. Each chain will be a chain of get_url(), and do_work(), the next chain will run only when do_work() finishes.

    If get_url() takes a short time and do_work() takes a long time, it will be a series of quick-task, slow-task, and the total time:

    t_total_per_worker = (t_get_url_average+t_do_work_average) X 150M

    If you have n workers

    t_total = t_total_per_worker/n

    t_total = (t_get_url_average+t_do_work_average) X 150M / n

    Now if get_url() is time critical while do_work() is not, then, if you can, you should run all 150M get_url() first and when that is done run all 150M do_work(), but that may require changes to your process design.

    That is what I would do. Maybe others have better ideas!?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search