PS : Run on Ubuntu with python
I am trying to solve a bunch of routing problems (Travelling Salesman). The set of customers can change every day so I take random samples of the network and solve each of those samples. With sufficient samples, I will be able to get an estimate of the expected average cost. So i would like to keep generating a new sample and solve the sample until the sample average converges and doesnt fluctuate much (represented below using a CONVERGED() which returns True if converged).
So, I wanted to run these samples in parallel to speed up computation, where a new sample is generated and solved IFF the cost has not converged. So my code(v1) looked like below.
manager = multiprocessing.Manager()
q = manager.Queue()
pool = multiprocessing.Pool(multiprocessing.cpu_count() + 2)
while not <CONVERGED()>:
job = pool.apply_async(<FUNCTION TO CALCULATE OUTPUT>, <ARGUMENTS>))
job.get()
I eventually realized that when job.get() is called, it does not run till all CPUs are busy but instead wait till finishing all outstanding jobs (i.e. 1) before continuing with the while loop. So, I tweaked the above to run a batch of 100 each time as showed below in code(v2).
manager = multiprocessing.Manager()
q = manager.Queue()
pool = multiprocessing.Pool(multiprocessing.cpu_count() + 2)
while not <CONVERGED()>:
jobs = []
for i in range(100):
jobs.append(pool.apply_async(<FUNCTION TO CALCULATE OUTPUT>, <ARGUMENTS>))
for job in jobs:
job.get()
This worked decently but due to the nature of the problem, the time solve each sample can vary drastically. So if one of the 100 took EXTREMELY long to run (not uncommon), all 99 runs would finish and it would wait for the last slow run to finish before starting another batch of 100.
Ask
Is it possible to modify it such that i dont need to set a batch size or it constantly just keeps adding runs/samples when a previous run/sample has ended and the average has not converged?
Edit 1
There are no outputs for each run and the input/arguement to each of these is just a simple integer which acts as a random seed. They just write to a file/generate a new file each run. <CONVERGED()> loops through those files/written results to determine if convergence has been achieved.
Edit 2 – Follow up/My implementation issues to BooBoo’s solution
Some of the details probably dont matter, but in case thats the cause I am erring on the side of caution. I had tried to use a global variable (like converged) to track convergence before but that never worked. When the variable is updated, the outer loop does not register the change and does not stop. If I were to use the magic function within a callback, it works FLAWLESSLY. So there are 2 things i am trying to understand. 1) Is there a benefit to the callback function as opposed to having it inside the function as i have it? 2) Why does my version not update the global variable?
def MC(args1, args2, args3):
manager = multiprocessing.Manager()
q = manager.Queue()
pool = BoundedQueueProcessPool(multiprocessing.cpu_count() + 2)
# Activating Listener
pool.apply_async(listener, (q, csv_name, message, data, csv_col)) # this is just a process that i am using to read and write to a file as needed
count = 0
global _CONVERGED
while not _CONVERGED and count <= iterlimit:
print(count, '-', _CONVERGED)
count += 1
pool.apply_async(mc_call, (data, target, q, count, csv_name))
pool.close()
q.put([-2]) # this kills the listener
pool.join()
return None
def mc_call(data: MDVRPI, target: str, q: int, seed:float, csv_name:str):
global _CONVERGED
# TODO : Below could be put as a callback. Any benefit?!
if magic_function_to_calc_if_converged():
_CONVERGED = True
return None
gen_data = generate_data(...)
run_output = run_generated_data(...)
q.put([run_output]) # gives output to listener to write to file
return None
if __name__ == '__main__':
_CONVERGED = False # Flag for convergence
MC(Args1, Args2, Args3)
return 0
2
Answers
you can set a callback in
apply_async
so that each time a job is done, another job is submitted, and when converged you just terminate the pool.note that
done_event
is used as "just wait until i am terminated from the callback"personally "sleep till woken up" is very dangerous in production, so i’d put a timeout in it (of a few hours maybe ?), and after the timeout just terminate the pool and raise an error, so that you don’t end up with a machine that’s hanging forever,
you also might want to wrap your
callback_func
body in a try/except, because if it throws then your PC will hang forever too… but at least you will see the error on your screen.Please see my comment posted on your question. It’s not entirely clear what your "listener" is attempting to do because you have posted incomplete code. It’s also not clear, for the same reason, how arguments for any given job submission is generated. But I will attempt to give you some ideas that may be useful to you.
Let’s supposed that you had a generator function that can generate arguments for your , which we will arbitrarily name
calculate
:Now we can use a special subclass of
multiprocessing.pool.Pool
I calledBoundedQueuePool
for which you can repeatedly submit jobs withapply_async
but will throttle job submission so that the number of total jobs sitting on the pool’s input job queue does not by default exceed twice the number of pool processes. We continue to generate arguments and submit new jobs until a callback function that gets invoked whenever a new result has been generated. This function will detect whether the results have converged or not and (1) Set the global flagconverged
to True so that no more jobs are submitted and (2) A call toterminate
on the pool to kill any submitted jobs that have not yet completed. This happens implicitly when thewith BoundedQueueProcessPool(8) as pool:
exits.Putting it all together:
Explanation
Suppose you have a large number of tasks to submit to a processing pool via method
multiprocessing.pool.Pool.apply_async
and that you can generate these tasks much faster than pool can process each task and generate a result. You would have the situation where the input task queue of the pool, which holds tasks waiting to be processed, will continue to grow and consume memory. This is not an issue if the number of tasks that will eventually be put on queue is not that great and the amount of data (the arguments to the worker function(s)) is not so large and you have adequate memory. But take the example of where you have a multi-gigabyte text file where each line of that file represents another task to be submitted. If the main process goes into a loop reading the file line by line submitting each one to the pool, you will soon exhaust all of memory. Or take the case where you have potentially an unlimited number of tasks you are submitting. In your example you might be in a loop generating tasks for as long as it takes until a certain result is produced.In these cases you would like a method to "throttle" the submission of tasks. Let’s assume the pool as N processes. Then there is really no need for the input queue to have more N * 2 tasks on the input queue at any point in time. I specify N * 2 instead of N just to ensure that when a pool process becomes idle there is a task on the queue ready to be fetched and executed rather than having to wait for the main process to generate another task. But whether it is N or N * 2 tasks or a larger number really isn’t anything to worry about for the purposes of this discussion.
So class
BoundedQueueProcessPool
, a specialization ofmultiprocessing.pool.Pool
, supports a modified version of methodapply_async
. "Under the covers" it creates a semaphore initialized to size N * 2 (by default) where N is the pool size. When you callapply_async
the code first does anacquire
method call on the semaphore. You can do this N * 2 times before the semaphore count goes to 0 and a subsequent call will block. However, whenever a task completes, thus making a pool process idle and ready to take a new task from the input queue, a call torelease
is made on the semaphore thus allowing a blockedapply_async
call to acquire the semaphore and complete the submission of a new task.To be able to support both "bounded queue-size" multiprocessing pools and bounded queue-size multithreading pools, much of the aforementioned logic resides in a parent class
BoundedQueuePool
, which is the actual direct child ofPool
andBoundedQueueProcessPool
is then a child ofBoundedQueuePool
. This make it easy to create another classBoundedQueueThreadPool
(not shown) that is a child ofBoundedQueuePool
to handle bounded queue-size multithreading pools.The important point of the code is that the main process is in a loop calling
apply_async
with an inexhaustible collection of tasks it will submit until a certain result is detected. So as to not allow the queue size to grow unwieldly large, we throttle task submission so that the queue size never exceeds a certain amount.It appears that the solution posted by Ahmed AEK is also trying to keep the pool size limited in a different way by first submitting a fixed number of tasks and then only waiting until a task completes submitting another task. The difference with my solution is that the logic is encapsulated in a way that makes it (in my opinion) more readily reusable in other solutions you might have. All you have to do is change the class you are using for the pool and no other code changes are required.