skip to Main Content

I have a question for someone with more experience with Python’s multiprocessing library, I’m pretty much lost at this point.

I’m currently building an image processing app that should run on both Windows 11 and an OrangePi5 with Debian Linux on it. My setup is that alongside the main program, there are two other processes, one for handling uninterrupted button inputs and other IO and another for separating a camera’s functionality from the rest of the app.

Both of the classes that hold these processes have their multiprocessing guts defined the same:

  • There are three queues.
    • Data queue holds one tuple with the main output of the class. Data is put in it periodically and there is one tuple max at any given point. The main program either gets what’s there or uses default values.
    • Config queue is used to send tuples with config method names and corresponding args to the worker process.
    • Status queue is used to get confirmation of config settings from the worker process.
  • No matter the OS, multiprocessing start method is always set to spawn.
  • Both classes have a start_process and stop_process method.
    • The start method initializes multiprocessing-related fields and starts the worker with an init event.
    • The stop method sets a stop event, attempts to join the worker and if that doesn’t work, it terminates it and calls itself recursively to log the details about whether and how the process was stopped.

The problem is that, even though the button process joins with no issues (when i make the join timeout 0.1s or greater), the camera always has to be terminated, no matter how long i make the joining timeout, it never joins. I was thinking that since the responses from the camera can be very time expensive at times, it could be just taking it too long to recognize that the stop event has been set, but I’ve repeatedly managed to catch the process being stopped with the join still failing.

This happens the same on both operating systems, both when the classes are used on their own and with each other. I’m starting to think that I’m unknowingly blocking some resources, making the camera worker unjoinable, but I have no idea how I could troubleshoot that. The button is sending different kinds of immutable values, the camera sends one or two np.array images (it gets them as image.copy()) and either a None or an Exception.

Any nudge in the right direction, either towards what exactly could be causing this or how I could pinpoint what exactly is stopping the process from joining, would be much appreciated. Thanks in advance!

The button process (joins without problems)

def _worker_process(self) -> None:
    try:
        while not self._stop_event.is_set():
            if not self._is_parent_alive():
                break

            if not self._config_queue.empty():
                config_message: tuple = self._config_queue.get()
                self._apply_config(config_message)

            is_pressed: bool = self._button.check_if_pressed()
            button_state_from_info, time_until_long_press = self._button.get_button_info(is_pressed)

            # Determine if the state should be updated
            state_idx: int = self._button_state_map.get(button_state_from_info, -1)
            update_state: bool = False

            # Button state pattern matching and state change record update here
            ...

            self._clear_queue(self._data_queue)
            self._data_queue_send(self._worker_but_st, time_until_long_press, st_ch_rec_tuple)

            time.sleep(0.05)

    except Exception:
        full_traceback = traceback.format_exc()
        self._clear_queue(self._data_queue)
        self._data_queue_send(exception=full_traceback)

The camera process (can’t seem to join, no matter how long i wait)

def _worker_process(self) -> None:
    try:
        while not self._stop_event.is_set():

            if not self._is_parent_alive():
                break

            if not self._config_queue.empty():
                config_message: tuple = self._config_queue.get()
                self._apply_config(config_message)

            # Capture image(s)
            self.camera.capture()

            # Get the latest captured images based on exposure mode
            if self.camera.exp_mode == ExposureMode.SINGLE_EXP:
                latest_img_loexp = self.camera.get_last_img().copy()
                latest_img_hiexp = None
            elif self.camera.exp_mode == ExposureMode.DOUBLE_EXP:
                latest_img_loexp = self.camera.get_last_img_loexp().copy()
                latest_img_hiexp = self.camera.get_last_img_hiexp().copy()
            else:
                raise ValueError(f"Exposure mode ({self.camera.exp_mode}) either invalid or not recognised.")

            self._clear_queue(self._data_queue)
            self._data_queue_send(latest_img_loexp, latest_img_hiexp, None)

            time.sleep(0.05)

    except Exception:
        full_traceback = traceback.format_exc()
        self._clear_queue(self._data_queue)
        self._data_queue_send(exception=full_traceback)

The stop_process method (same for both classes)

def stop_process(self, is_recursive_call=False) -> None:
    
    self._stop_event.set()

    if self._process is not None and self._process.is_alive():
        try:
            self._process.join(timeout=0.2)

            # If the process is still alive and this is not a recursive call, terminate and retry
            if self._process.is_alive() and not is_recursive_call:
                print(f"{self.CLASS_NAME}: Failed to join worker process, terminating.")
                self._process.terminate()
                self.stop_process(is_recursive_call=True)
            else:
                self._report_on_stop_attempt()
        
            except KeyboardInterrupt as e:
            print(f"{self.CLASS_NAME}: KeyboardInterrupt in process joining, joining failed.")
            raise e

    elif self._process and not self._process.is_alive():
        print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was not alive "
            f"on 'stop_process' ({self._process=}).")
    else:
        print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was None.")

If it could help telling what’s wrong with this, I can provide the test files, test logs or relevant parts of the class that defines ‘self.camera’.

2

Answers


  1. Chosen as BEST ANSWER

    Ok, so it seems that the heart of the problem wasn't in the process hanging on join (although that was a strongly valid suggestion). My issues disappeared after simply calling this in the finally block of the worker process:

    def _worker_process_cleanup(self) -> None:
        """
        Clears all three queues, closes them and joins their threads 
        to ensure smooth joining of the worker. 
        Call this inside the 'finally' block at the end of '_worker_process'.
        """
        queues: tuple = (self._data_queue, self._config_queue, self._status_queue)
        with self._lock():
            for idx, queue_to_clear in enumerate(queues):
                self._clear_queue(queue_to_clear)
                queue_to_clear.close()
                queue_to_clear.join_thread()
    

    I've added locks and other fancy stuff to the whole class to be a bit more confident that I don't run into unexpected issues further down the line, but I don't think that any of that was strictly necessary for this to work right.

    For anyone wondering, the _clear_queue method basically just repeatedly calls get_nowait() until the queue.is_empty() returns True.


  2. If you get a timeout in join(), then the code hangs somewhere. That might be an endless loop in Python or a system call. The code above looks correct, so it’s probably somewhere else.

    I would start debugging by adding lot’s of print() statements that show which code gets executed and what important variables like _stop_event look like.

    Also check you don’t overwrite values like _process. This can happen, for example, when the init code is run twice.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search