skip to Main Content

Im trying to write a SIGTERM-handler that will have my run_forever()-loop

  • Stop accepting new tasks.
  • Complete running tasks.
  • Shutdown.

Here is a learning-demo I wrote:

import asyncio
import signal
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)


class Looper:
    def __init__(self, loop):
        self._loop = loop
        self._shutdown = False
        signal.signal(signal.SIGINT, self._exit)
        signal.signal(signal.SIGTERM, self._exit)

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info(f"Received shutdown-signal: {sig} ({name})")
        self._shutdown = True
        self._loop.stop() # << Stopping the event loop here.
        _log.info(f"Loop stop initiated.")
        pending = asyncio.all_tasks(loop=self._loop)
        _log.info(f"Collected {len(pending)} tasks that have been stopped.")
        if pending:
            _log.info("Attempting to gather pending tasks: " + str(pending))
            gatherer_set = asyncio.gather(*pending, loop=self._loop)
            # self._loop.run_until_complete(gatherer_set) # << "RuntimeError: This event loop is already running"
        _log.info("Shutting down for good.")

    async def thumper(self, id, t):
        print(f"{id}: Winding up...")
        while not self._shutdown:
            await asyncio.sleep(t)
            print(f'{id}: Thump!')
        print(f'{id}: Thud.')


loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 2))
loop.create_task(lp.thumper('South Hall', 3))
loop.run_forever()
_log.info("Done.")

Both on Windows 10 and Debian 10 above script reacts to SIGINT and produces the output

North Hall: Winding up...
South Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
09:55:53 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
09:55:53 INFO [__main__]: Loop stop initiated.
09:55:53 INFO [__main__]: Collected 2 tasks that have been stopped.
09:55:53 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91BF0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91C10>()]>>}
09:55:53 INFO [__main__]: Shutting down for good.
09:55:53 INFO [__main__]: Done.

Sadly, the “Thud.”-lines, signifying that the thumper(..) demo calls have actually
concluded, won’t show. I guess, this is because the “gather” just gets me a set
of unfulfilled futures. However, if I dare activate the run_until_complete()
line, even though it comes behind the self._loop.stop(), the output
ends as follows:

[...]
10:24:25 INFO [__main__]: Collected 2 tasks that have been stopped.
10:24:25 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E417D0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E41BF0>()]>>}
Traceback (most recent call last):
  File "amazing_grace.py", line 50, in <module>
    loop.run_forever()
  File "C:Python37libasynciobase_events.py", line 539, in run_forever
    self._run_once()
  File "C:Python37libasynciobase_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "C:Python37libselectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File "C:Python37libselectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
  File "amazing_grace.py", line 35, in _exit
    self._loop.run_until_complete(gatherer_set) # << "This event loop is already running"
  File "C:Python37libasynciobase_events.py", line 571, in run_until_complete
    self.run_forever()
  File "C:Python37libasynciobase_events.py", line 526, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

The question boils down to

  • how to call or substitute run_until_complete(..) in this scenario, and
  • why I see this “Loop is running”-error after stopping the loop.

The program should run on Python 3.7, both under Windows 10 and Linux.

Edit a couple of days later

As zaquest states in his/her answer, one asks for trouble when just assigning a signal handler and adding a create_task call inside it; as I observe, that routine may or may not run (even if there are no other tasks). So now I added a sys.platform check to see if the script runs under UNIX (). If it does I prefer the much more reliable loop.add_signal_handler to define the callback function, which is what I really need. Luckily UNIX is my main use-case. Main line:

self._loop.add_signal_handler(signal.signal(signal.SIGINT, self._exit, signal.SIGINT, None)

Why the platform check?: Following the doc, https://docs.python.org/3/library/asyncio-eventloop.html#unix-signals , the loop.add_signal_handler() is not available on Windows, which is no real surprise thinking that the signals in question are UNIX lingo.

2

Answers


  1. Chosen as BEST ANSWER

    Found a solution that will call self._loop.stop() from an async function that will first wait for all other tasks. Note that it does not wait for itself! If it tried, the program would lock.

    In addition, the asyncio.wait_for(..) co-routines allow for timeouts.

    import asyncio
    import signal
    import logging
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
    _log = logging.getLogger(__name__)
    
    
    class Looper:
        def __init__(self, loop):
            self._loop = loop
            self._shutdown = False
            signal.signal(signal.SIGINT, self._exit)
            signal.signal(signal.SIGTERM, self._exit)
    
        async def _a_exit(self):
            self._shutdown = True
            my_task = asyncio.current_task()
            pending = list(filter(lambda x: x is not my_task, asyncio.all_tasks(loop=self._loop)))
            waiters = [asyncio.wait_for(p, timeout = 1.5, loop=self._loop) for p in pending]
            results = await asyncio.gather(*waiters, loop=self._loop, return_exceptions=True)
            n_failure = len(list(filter(lambda x: isinstance(x, Exception), results)))
            _log.info(f"{n_failure} failed processes when quick-gathering the remaining {len(results)} tasks. Stopping loop now.")
            self._loop.stop()
    
        def _exit(self, sig, frame):
            name = signal.Signals(sig).name
            _log.info(f"Received shutdown-signal: {sig} ({name})")
            self._loop.create_task(self._a_exit())
    
        async def thumper(self, id, t):
            print(f"{id}: Winding up...")
            while not self._shutdown:
                await asyncio.sleep(t)
                print(f'{id}: Thump!')
            print(f'{id}: Thud.')
    
    
    loop = asyncio.get_event_loop()
    lp = Looper(loop)
    loop.create_task(lp.thumper('North Hall', 1))
    loop.create_task(lp.thumper('South Hall', 2))
    loop.create_task(lp.thumper(' West Hall', 3))
    loop.create_task(lp.thumper(' East Hall', 4))
    loop.run_forever()
    _log.info("Done.")
    

    On Windows 10 this may lead to the output

    North Hall: Winding up...
    South Hall: Winding up...
     West Hall: Winding up...
     East Hall: Winding up...
    North Hall: Thump!
    South Hall: Thump!
    [..]
    South Hall: Thump!
    North Hall: Thump!
    14:20:59 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
     West Hall: Thump!
     West Hall: Thud.
    North Hall: Thump!
    North Hall: Thud.
    South Hall: Thump!
    South Hall: Thud.
    14:21:01 INFO [__main__]: 1 failed processes when quick-gathering the remaining 4 tasks. Stopping loop now.
    14:21:01 INFO [__main__]: Done.
    

    The failed process fell prey to the timeout.

    Note that this solves my issue. However, the question as to why loop.run_until_complete(..) fails after loop.stop() has been called remains open.


  2. Python signal handlers are executed in the main thread, in the same thread in which your loop is running. BaseEventLoop.stop() method does not immediately stops the loop, instead it just sets a flag, so that when your loop runs next time it only executes the callbacks that has already been scheduled, and does not schedule any more callbacks (see run_forever). However, the loop can’t be run until your signal handler returns. This means you can’t wait until the loop stops in the signal handler. Instead you could schedule another task, that would wait for your long running tasks to react to the change in self._shutdown and then stop to loop.

    class Looper:
        ...
    
        def _exit(self, sig, frame):
            name = signal.Signals(sig).name
            _log.info("Received shutdown-signal: %s (%s)", sig, name)
            self._shutdown = True
    
            pending = asyncio.all_tasks(loop=self._loop)
            _log.info("Attempting to gather pending tasks: " + str(pending))
            if pending:
                self._loop.create_task(self._wait_for_stop(pending))
    
        async def _wait_for_stop(self, tasks):
            await asyncio.gather(*tasks)
            self._loop.stop()  # << Stopping the event loop here.
            _log.info("Loop stop initiated.")
    
        ...
    

    One more thing to mention is that documentation says that signal.signal() handlers are not allowed to interact with the loop, without stating the reason (see)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search