I have a question for someone with more experience with Python’s multiprocessing library, I’m pretty much lost at this point.
I’m currently building an image processing app that should run on both Windows 11 and an OrangePi5 with Debian Linux on it. My setup is that alongside the main program, there are two other processes, one for handling uninterrupted button inputs and other IO and another for separating a camera’s functionality from the rest of the app.
Both of the classes that hold these processes have their multiprocessing guts defined the same:
- There are three queues.
- Data queue holds one tuple with the main output of the class. Data is put in it periodically and there is one tuple max at any given point. The main program either gets what’s there or uses default values.
- Config queue is used to send tuples with config method names and corresponding args to the worker process.
- Status queue is used to get confirmation of config settings from the worker process.
- No matter the OS, multiprocessing start method is always set to spawn.
- Both classes have a start_process and stop_process method.
- The start method initializes multiprocessing-related fields and starts the worker with an init event.
- The stop method sets a stop event, attempts to join the worker and if that doesn’t work, it terminates it and calls itself recursively to log the details about whether and how the process was stopped.
The problem is that, even though the button process joins with no issues (when i make the join timeout 0.1s or greater), the camera always has to be terminated, no matter how long i make the joining timeout, it never joins. I was thinking that since the responses from the camera can be very time expensive at times, it could be just taking it too long to recognize that the stop event has been set, but I’ve repeatedly managed to catch the process being stopped with the join still failing.
This happens the same on both operating systems, both when the classes are used on their own and with each other. I’m starting to think that I’m unknowingly blocking some resources, making the camera worker unjoinable, but I have no idea how I could troubleshoot that. The button is sending different kinds of immutable values, the camera sends one or two np.array images (it gets them as image.copy()) and either a None or an Exception.
Any nudge in the right direction, either towards what exactly could be causing this or how I could pinpoint what exactly is stopping the process from joining, would be much appreciated. Thanks in advance!
The button process (joins without problems)
def _worker_process(self) -> None:
try:
while not self._stop_event.is_set():
if not self._is_parent_alive():
break
if not self._config_queue.empty():
config_message: tuple = self._config_queue.get()
self._apply_config(config_message)
is_pressed: bool = self._button.check_if_pressed()
button_state_from_info, time_until_long_press = self._button.get_button_info(is_pressed)
# Determine if the state should be updated
state_idx: int = self._button_state_map.get(button_state_from_info, -1)
update_state: bool = False
# Button state pattern matching and state change record update here
...
self._clear_queue(self._data_queue)
self._data_queue_send(self._worker_but_st, time_until_long_press, st_ch_rec_tuple)
time.sleep(0.05)
except Exception:
full_traceback = traceback.format_exc()
self._clear_queue(self._data_queue)
self._data_queue_send(exception=full_traceback)
The camera process (can’t seem to join, no matter how long i wait)
def _worker_process(self) -> None:
try:
while not self._stop_event.is_set():
if not self._is_parent_alive():
break
if not self._config_queue.empty():
config_message: tuple = self._config_queue.get()
self._apply_config(config_message)
# Capture image(s)
self.camera.capture()
# Get the latest captured images based on exposure mode
if self.camera.exp_mode == ExposureMode.SINGLE_EXP:
latest_img_loexp = self.camera.get_last_img().copy()
latest_img_hiexp = None
elif self.camera.exp_mode == ExposureMode.DOUBLE_EXP:
latest_img_loexp = self.camera.get_last_img_loexp().copy()
latest_img_hiexp = self.camera.get_last_img_hiexp().copy()
else:
raise ValueError(f"Exposure mode ({self.camera.exp_mode}) either invalid or not recognised.")
self._clear_queue(self._data_queue)
self._data_queue_send(latest_img_loexp, latest_img_hiexp, None)
time.sleep(0.05)
except Exception:
full_traceback = traceback.format_exc()
self._clear_queue(self._data_queue)
self._data_queue_send(exception=full_traceback)
The stop_process method (same for both classes)
def stop_process(self, is_recursive_call=False) -> None:
self._stop_event.set()
if self._process is not None and self._process.is_alive():
try:
self._process.join(timeout=0.2)
# If the process is still alive and this is not a recursive call, terminate and retry
if self._process.is_alive() and not is_recursive_call:
print(f"{self.CLASS_NAME}: Failed to join worker process, terminating.")
self._process.terminate()
self.stop_process(is_recursive_call=True)
else:
self._report_on_stop_attempt()
except KeyboardInterrupt as e:
print(f"{self.CLASS_NAME}: KeyboardInterrupt in process joining, joining failed.")
raise e
elif self._process and not self._process.is_alive():
print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was not alive "
f"on 'stop_process' ({self._process=}).")
else:
print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was None.")
If it could help telling what’s wrong with this, I can provide the test files, test logs or relevant parts of the class that defines ‘self.camera’.
2
Answers
Ok, so it seems that the heart of the problem wasn't in the process hanging on join (although that was a strongly valid suggestion). My issues disappeared after simply calling this in the finally block of the worker process:
I've added locks and other fancy stuff to the whole class to be a bit more confident that I don't run into unexpected issues further down the line, but I don't think that any of that was strictly necessary for this to work right.
For anyone wondering, the
_clear_queue
method basically just repeatedly callsget_nowait()
until thequeue.is_empty()
returns True.If you get a timeout in
join()
, then the code hangs somewhere. That might be an endless loop in Python or a system call. The code above looks correct, so it’s probably somewhere else.I would start debugging by adding lot’s of
print()
statements that show which code gets executed and what important variables like_stop_event
look like.Also check you don’t overwrite values like
_process
. This can happen, for example, when the init code is run twice.