skip to Main Content

I have a following project directory:

azima:
    __init.py
    main.py
    tasks.py
    monitor.py

tasks.py

from .main import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

main.py

from celery import Celery

app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

monitor.py

from .main import app

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK FAILED: {task.name}[{task.uuid}]')

    def announce_succeeded_tasks(event):
        print('task succeeded')
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')

    def worker_online_handler(event):
        state.event(event)
        print("New worker gets online")
        print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'task-succeeded': announce_succeeded_tasks,
                'worker-online': worker_online_handler,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    # app = Celery('azima')
    my_monitor(app)

Started celery worker with

celery -A azima.main worker -l INFO

And started monitor.py with

python -m azima.monitor

But Only worker-online event is being triggered, while other events like task-succeeded is not triggered or handled.

enter image description here

What am I missing here?

3

Answers


  1. By comparing your code to flower‘s code:

    try_interval = 1
    while True:
        try:
            try_interval *= 2
    
            with self.capp.connection() as conn:
                recv = EventReceiver(conn,
                                        handlers={"*": self.on_event},
                                        app=self.capp)
                try_interval = 1
                logger.debug("Capturing events...")
                recv.capture(limit=None, timeout=None, wakeup=True)
        except (KeyboardInterrupt, SystemExit):
            try:
                import _thread as thread
            except ImportError:
                import thread
            thread.interrupt_main()
        except Exception as e:
            logger.error("Failed to capture events: '%s', "
                            "trying again in %s seconds.",
                            e, try_interval)
            logger.debug(e, exc_info=True)
            time.sleep(try_interval)
    

    There are 2 differences:

    1. celery app is missing in your EventReceiver.
    2. Infinity loop (while True) although I assume capture method is blocking and waiting for events and the loop is just in case of an error.
    Login or Signup to reply.
  2. Enable worker task- group events with cli option -E or --task-events and try to capture all events:

    def my_monitor(app):
        def on_event(event):
            print("Event.type", event.get('type'))
    
        with app.connection() as connection:
            recv = app.events.Receiver(connection, handlers={'*': on_event})
            recv.capture(limit=None, timeout=None, wakeup=True)
    
    Login or Signup to reply.
  3. By default, Celery workers do not send events. However, like most of the useful features, it can be configured by enabling the worker_send_task_events in your config, or running Celery worker(s) with the -E flag.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search