I have a following project directory:
azima:
__init.py
main.py
tasks.py
monitor.py
tasks.py
from .main import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
main.py
from celery import Celery
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
monitor.py
from .main import app
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK FAILED: {task.name}[{task.uuid}]')
def announce_succeeded_tasks(event):
print('task succeeded')
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
def worker_online_handler(event):
state.event(event)
print("New worker gets online")
print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'task-succeeded': announce_succeeded_tasks,
'worker-online': worker_online_handler,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
# app = Celery('azima')
my_monitor(app)
Started celery worker with
celery -A azima.main worker -l INFO
And started monitor.py
with
python -m azima.monitor
But Only worker-online
event is being triggered, while other events like task-succeeded
is not triggered or handled.
What am I missing here?
3
Answers
By comparing your code to flower‘s code:
There are 2 differences:
app
is missing in yourEventReceiver
.while True
) although I assumecapture
method is blocking and waiting for events and the loop is just in case of an error.Enable worker
task-
group events with cli option-E
or--task-events
and try to capture all events:By default, Celery workers do not send events. However, like most of the useful features, it can be configured by enabling the worker_send_task_events in your config, or running Celery worker(s) with the
-E
flag.