skip to Main Content

I made a program to simulate heavy mail management by using python multiprocessing Pool and Queue :

from multiprocessing import Pool, Queue
import time
import uuid
import os


NB_WORKERS = 3
NB_MAILS_PER_5_SECONDS = 2
MAIL_MANAGEMENT_DURATION_SECONDS = 1


def list_new_mails_id():
    for i in range(NB_MAILS_PER_5_SECONDS):
        # fake mailbox msg list
        yield str(uuid.uuid1())


def mail_worker(msg_ids):
    pid = os.getpid()
    print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...")
    while True:
        print(f"[{pid}] Waiting a mail to manage...")
        msg_id = msg_ids.get()
        print(f"[{pid}] managing mail msg_id = {msg_id} ...")
        # here should read mail msg_id and remove it from mailbox when finish
        print(f"[{pid}] --> fake duration of {MAIL_MANAGEMENT_DURATION_SECONDS}s")
        time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)


if __name__ == "__main__":
    msg_id_queue = Queue()

    with Pool(NB_WORKERS, mail_worker, (msg_id_queue,)) as p:
        while True:            
            for msg_id in list_new_mails_id():
                msg_id_queue.put(msg_id)
            print("nWaiting for new mails to come...n")
            time.sleep(5)

The program puts some message ids in a queue which are read by workers.
Workers are started and initialized with the same Queue object.
It works well.

To know how tolerant is the python Pool() and to know if tasks processing can continue despite a worker death because of some temporary memory issues for example, I killed a worker :

The Pool() re-create automatically the worker(s) and initilize it(them) with the same Queue object than before but the workers are not able anymore to get items in the queue : it is stuck on msg_ids.get() : why ?

I am using ubuntu 22.04 LTS and python 3.10.4

3

Answers


  1. Chosen as BEST ANSWER

    Finally the solution was to not use Queue but simply apply_async() (Pool is using its own SimpleQueue internally). Unfortunately, the only solution to ensure tasks to be processed if a worker dies is ... to commit harakiri and let systemd restart the deamon...

    from multiprocessing import Pool
    import time
    import os
    import signal
    
    NB_WORKERS = 3
    NB_MAILS_PER_5_SECONDS = 2
    MAIL_MANAGEMENT_DURATION_SECONDS = 1
    
    
    def mail_worker_init():
        pid = os.getpid()
        print(f"Starting worker PID = {pid} ...", flush=True)
    
    
    def mail_worker_task(msg_id):
        pid = os.getpid()
        print(f"[{pid}] managing mail msg_id = {msg_id} ...", flush=True)
        # here should read mail msg_id and remove it from mailbox when finish
        time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)
        print(f"[{pid}] --> msg_id = {msg_id} has been processed", flush=True)
    
    
    if __name__ == "__main__":
        msg_id = 0
        with Pool(NB_WORKERS, mail_worker_init) as pool:
            pool_original_pids = [p.pid for p in pool._pool]
            print(f"Original PIDs = {pool_original_pids}")
            while True:
                pool_actual_pids = [p.pid for p in pool._pool]
                print(f"Actual PIDs = {pool_actual_pids}")
                # if pool processes changed, it mostly means tasks won't be
                # managed anymore (why ? this is the question)
                # So killing the whole deamon and let systemd restart it totally
                if pool_actual_pids != pool_original_pids:
                    os.kill(os.getpid(), signal.SIGKILL)
                for _ in range(NB_MAILS_PER_5_SECONDS):
                    msg_id += 1
                    pool.apply_async(mail_worker_task, (msg_id,))
                print("nWaiting for new mails to come...n", flush=True)
                time.sleep(5)
    
    

  2. I must say that I find something very unusual in your code. Contrary to what you say, function mail_worker is not a "worker" function, i.e. a function that handles tasks that have been submitted to the pool, since in your code no tasks are being submitted to the pool.

    mail_worker is a pool initializer meant to be executed once to initialize each pool process and from that point on you would submit tasks to the pool using a method like map or apply_async. But your initializer function never actually terminates.

    Given that you are concerned about a process dying (an Exception or explicitly being killed — but why?), you should also be concerned about losing a submitted "task" the process was working on. For this using an explicit task queue with multiprocessing.Process instances can offer a solution. Your process worker function would attempt to handle any exceptions and if it was in the middle of processing a task and had not completed it, it could re-queue the task before ending. You could even handle a SIGTERM signal sent to it (at least on Linux).

    In the following demo a couple of processes are killed by sending a SIGTERM signal. The program only places 10 messages on the task queue so that it will eventually complete and all 10 messages do get processed.

    from multiprocessing import Process, Queue
    import time
    import uuid
    import os
    import signal
    
    NB_WORKERS = 3
    NB_MAILS_PER_5_SECONDS = 2
    MAIL_MANAGEMENT_DURATION_SECONDS = 1
    
    class StopSentinel:
        pass
    
    def list_new_mails_id():
        for i in range(NB_MAILS_PER_5_SECONDS):
            # fake mailbox msg list
            yield str(uuid.uuid1())
    
    def mail_worker(msg_ids):
    
        def handle_termination():
            print(f'pid {pid} is terminating.')
            if msg_id is not None:
                # Put back on queue:
                msg_ids.put(msg_id)
    
        msg_id = None
        signal.signal(signal.SIGTERM, handle_termination)
        # Handle any other signals
    
        pid = os.getpid()
        print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...")
        try:
            while True:
                # prevent lost messages by preventing the main
                print(f"[{pid}] Waiting a mail to manage...")
                msg_id = msg_ids.get()
                if isinstance(msg_id, StopSentinel):
                    break
                print(f"[{pid}] managing mail msg_id = {msg_id} ...")
                msg_id = None # show this has been processed
                # here should read mail msg_id and remove it from mailbox when finish
                print(f"[{pid}] --> fake duration of {MAIL_MANAGEMENT_DURATION_SECONDS}s")
                time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)
        except Exception:
            handle_termination()
    
    
    if __name__ == "__main__":
        msg_id_queue = Queue()
        processes = [Process(target=mail_worker, args=(msg_id_queue,)) for i in range(NB_WORKERS)]
        for p in processes:
            p.start()
    
        for iteration_number in range(5): # so we eventually terminate
            for msg_id in list_new_mails_id():
                msg_id_queue.put(msg_id)
            print("nWaiting for new mails to come...n")
            time.sleep(5)
            # Kill some processes from time to time
            if iteration_number == 1:
                # Kill the first process
                print('terminating process 0')
                os.kill(processes[0].pid, signal.SIGTERM)
            elif iteration_number == 2:
                # Kill the first process
                print('terminating process 1')
                os.kill(processes[1].pid, signal.SIGTERM)
    
        # Put enough stop sentinels in the queue (extras are okay)
        stop_sentinel = StopSentinel()
        for _ in range(NB_WORKERS):
            msg_id_queue.put(stop_sentinel)
        for p in processes:
            p.join()
        print('Done!')
    

    Prints:

    Starting worker PID = 35 and queue=<multiprocessing.queues.Queue object at 0x7fd318b18fd0> queue size=0 ...
    [35] Waiting a mail to manage...
    Starting worker PID = 36 and queue=<multiprocessing.queues.Queue object at 0x7fd318b18fd0> queue size=0 ...
    [36] Waiting a mail to manage...
    Starting worker PID = 37 and queue=<multiprocessing.queues.Queue object at 0x7fd318b18fd0> queue size=0 ...
    [37] Waiting a mail to manage...
    
    Waiting for new mails to come...
    [35] managing mail msg_id = c9d4b5dd-c105-11ed-9fe7-00155d5aa9bc ...
    [35] --> fake duration of 1s
    
    [36] managing mail msg_id = c9d55dab-c105-11ed-885b-00155d5aa9bc ...
    [36] --> fake duration of 1s
    [35] Waiting a mail to manage...
    [36] Waiting a mail to manage...
    
    Waiting for new mails to come...
    
    [37] managing mail msg_id = ccd12a4c-c105-11ed-ab69-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [36] managing mail msg_id = ccd12e18-c105-11ed-9702-00155d5aa9bc ...
    [36] --> fake duration of 1s
    [36] Waiting a mail to manage...
    [37] Waiting a mail to manage...
    terminating process 0
    pid 35 is terminating.
    
    Waiting for new mails to come...
    
    [37] managing mail msg_id = cfccf70d-c105-11ed-bffe-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [36] managing mail msg_id = cfccfb8e-c105-11ed-b879-00155d5aa9bc ...
    [36] --> fake duration of 1s
    [36] Waiting a mail to manage...
    [37] Waiting a mail to manage...
    terminating process 1
    pid 36 is terminating.
    
    Waiting for new mails to come...
    
    [37] managing mail msg_id = d2c8d578-c105-11ed-8964-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [37] Waiting a mail to manage...
    [37] managing mail msg_id = d2c8e72e-c105-11ed-9181-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [37] Waiting a mail to manage...
    
    Waiting for new mails to come...
    
    [37] managing mail msg_id = d5c4c7ea-c105-11ed-9edc-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [37] Waiting a mail to manage...
    [37] managing mail msg_id = d5c4d781-c105-11ed-9195-00155d5aa9bc ...
    [37] --> fake duration of 1s
    [37] Waiting a mail to manage...
    Done!
    
    Login or Signup to reply.
  3. Finally, the solution to not get stuck when a worker is killed is to declare a SIGTERM handler to exit properly. This is also the place to put back the killed task into the queue so it will be managed by another worker :

    from multiprocessing import Pool, Queue
    import time
    import uuid
    import os
    import signal
    
    NB_WORKERS = 3
    NB_MAILS_PER_5_SECONDS = 2
    MAIL_MANAGEMENT_DURATION_SECONDS = 1
    
    
    def list_new_mails_id():
        for i in range(NB_MAILS_PER_5_SECONDS):
            # fake mailbox msg list
            yield str(uuid.uuid1())
    
    
    def mail_worker(msg_ids):
        pid = os.getpid()
        msg_id = None
    
        def handle_termination(*args):
            print(f'pid {pid} is terminating.')
            if msg_id is not None:
                # Put back msg_id on queue before exiting:
                msg_ids.put(msg_id)
            exit()
        signal.signal(signal.SIGTERM, handle_termination)
    
        print(f"Starting worker PID = {pid} and queue={msg_ids} queue size={msg_ids.qsize()} ...")
        while True:
            print(f"[{pid}] Waiting a mail to manage...")
            msg_id = msg_ids.get()
            print(f"[{pid}] managing mail msg_id = {msg_id} ...")
            # here should read mail msg_id and remove it from mailbox when finish
            time.sleep(MAIL_MANAGEMENT_DURATION_SECONDS)
            print(f"[{pid}] --> mail msg_id = {msg_id} processed")
    
    
    if __name__ == "__main__":
        msg_id_queue = Queue()
    
        with Pool(NB_WORKERS, mail_worker, (msg_id_queue,)) as p:
            while True:            
                for msg_id in list_new_mails_id():
                    msg_id_queue.put(msg_id)
                print("nWaiting for new mails to come...n")
                time.sleep(5)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search