I made a program to simulate heavy mail management by using python multiprocessing Pool and Queue :
from multiprocessing import Pool, Queue
import time
import uuid
import os
NB_WORKERS = 3
NB_MAILS_PER_5_SECONDS = 2
MAIL_MANAGEMENT_DURATION_SECONDS = 1
def list_new_mails_id():
for i in range(NB_MAILS_PER_5_SECONDS):
# fake mailbox msg list
yield str(uuid.uuid1())
def mail_worker(msg_ids):
pid = os.getpid()
print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...")
while True:
print(f"[{pid}] Waiting a mail to manage...")
msg_id = msg_ids.get()
print(f"[{pid}] managing mail msg_id = {msg_id} ...")
# here should read mail msg_id and remove it from mailbox when finish
print(f"[{pid}] --> fake duration of {MAIL_MANAGEMENT_DURATION_SECONDS}s")
time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)
if __name__ == "__main__":
msg_id_queue = Queue()
with Pool(NB_WORKERS, mail_worker, (msg_id_queue,)) as p:
while True:
for msg_id in list_new_mails_id():
msg_id_queue.put(msg_id)
print("nWaiting for new mails to come...n")
time.sleep(5)
The program puts some message ids in a queue which are read by workers.
Workers are started and initialized with the same Queue object.
It works well.
To know how tolerant is the python Pool()
and to know if tasks processing can continue despite a worker death because of some temporary memory issues for example, I killed a worker :
The Pool()
re-create automatically the worker(s) and initilize it(them) with the same Queue object than before but the workers are not able anymore to get items in the queue : it is stuck on msg_ids.get()
: why ?
I am using ubuntu 22.04 LTS and python 3.10.4
3
Answers
Finally the solution was to not use
Queue
but simplyapply_async()
(Pool
is using its own SimpleQueue internally). Unfortunately, the only solution to ensure tasks to be processed if a worker dies is ... to commit harakiri and let systemd restart the deamon...I must say that I find something very unusual in your code. Contrary to what you say, function
mail_worker
is not a "worker" function, i.e. a function that handles tasks that have been submitted to the pool, since in your code no tasks are being submitted to the pool.mail_worker
is a pool initializer meant to be executed once to initialize each pool process and from that point on you would submit tasks to the pool using a method likemap
orapply_async
. But your initializer function never actually terminates.Given that you are concerned about a process dying (an Exception or explicitly being killed — but why?), you should also be concerned about losing a submitted "task" the process was working on. For this using an explicit task queue with
multiprocessing.Process
instances can offer a solution. Your process worker function would attempt to handle any exceptions and if it was in the middle of processing a task and had not completed it, it could re-queue the task before ending. You could even handle a SIGTERM signal sent to it (at least on Linux).In the following demo a couple of processes are killed by sending a SIGTERM signal. The program only places 10 messages on the task queue so that it will eventually complete and all 10 messages do get processed.
Prints:
Finally, the solution to not get stuck when a worker is killed is to declare a SIGTERM handler to exit properly. This is also the place to put back the killed task into the queue so it will be managed by another worker :