skip to Main Content

I’m rather new to celery development and I have an issue implementing signals.
I have an application that consists of many different workers.
currently it uses rabbitmq as a broker and redis as a backend.

Each worker has its own queue. This is the way we have it configured at the moment :

celery = Celery(queueDict['test'], broker=config.REDIS_SERVER, backend=config.REDIS_SERVER)
default_exchange = Exchange('default', type='direct')
test_queue = Queue(queueDict['test'], default_exchange, routing_key=queueDict['test'])


logger = get_task_logger(__name__)

celery.conf.task_queues = (test_queue, )


@celery.task(name='signal2', bind=True)
def signal2(self, param):
    print("dog" + param)

I would like to use signals so that I will be able to catch failed tasks on any worker in the application. When I use it inside the same worker with a task_failure event it works.
But I would like to have another worker catch these events (or even my flask app)
but I seem to be missing something…
Here is my current attempt at making it work.

celery = Celery('consumer', broker=config.REDIS_SERVER, backend=config.REDIS_SERVER)
default_exchange = Exchange('default', type='direct')
default_queue = Queue(queueDict['default'], default_exchange, routing_key=queueDict['default'])

logger = get_task_logger(__name__)

celery.conf.task_queues = (default_queue, )


@task_failure.connect
def process_failure_signal(sender=None, task_id=None, exception=None,
                           args=None, kwargs=None, traceback=None, einfo=None, **akwargs):

    msg = 'Signal exception: %s (%s)' % (
        exception.__class__.__name__, exception)
    exc_info = (type(exception), exception, traceback)
    extra = {
        'data': {
            'task_id': str(task_id),
            'sender': str(sender),
            'args': str(args),
            'kwargs': str(kwargs),
        }
    }

    logger.error(msg, exc_info=exc_info, extra=extra)

But it never receives any signals…
Thanks for the help.

2

Answers


  1. Chosen as BEST ANSWER

    DejanLekic was correct and the page he shared had exactly what I wanted.

    for those interested: https://docs.celeryproject.org/en/stable/userguide/monitoring.html#real-time-processing

    This can be easily used to capture events and monitor tasks.


  2. Real-time processing

    To process events in real-time you need the following

    • An event consumer (this is the Receiver)

    • A set of handlers called when events come in.

      You can have different handlers for each event type, or a catch-all handler can be used (‘*’)

    • State (optional)

      app.events.State is a convenient in-memory representation of tasks and workers in the cluster that’s updated as events come in.

      It encapsulates solutions for many common things, like checking if a worker is still alive (by verifying heartbeats), merging event fields together as events come in, making sure time-stamps are in sync, and so on.

    Combining these you can easily process events in real-time:

    from celery import Celery
    
    def my_monitor(app):
        state = app.events.State()
    
        def announce_failed_tasks(event):
            state.event(event)
            # task name is sent only with -received event, and state
            # will keep track of this for us.
            task = state.tasks.get(event['uuid'])
    
            print('TASK FAILED: %s[%s] %s' % (
                task.name, task.uuid, task.info(),))
    
        with app.connection() as connection:
            recv = app.events.Receiver(connection, handlers={
                    'task-failed': announce_failed_tasks,
                    '*': state.event,
            })
            recv.capture(limit=None, timeout=None, wakeup=True)
    
    if __name__ == '__main__':
        app = Celery(broker='amqp://guest@localhost//')
        my_monitor(app)
    

    Note: The wakeup argument to capture sends a signal to all workers to force them to send a heartbeat. This way you can immediately see workers when the monitor starts.

    You can listen to specific events by specifying the handlers:

    from celery import Celery
    
    def my_monitor(app):
        state = app.events.State()
    
        def announce_failed_tasks(event):
            state.event(event)
            # task name is sent only with -received event, and state
            # will keep track of this for us.
            task = state.tasks.get(event['uuid'])
    
            print('TASK FAILED: %s[%s] %s' % (
                task.name, task.uuid, task.info(),))
    
        with app.connection() as connection:
            recv = app.events.Receiver(connection, handlers={
                    'task-failed': announce_failed_tasks,
            })
            recv.capture(limit=None, timeout=None, wakeup=True)  
    
    if __name__ == '__main__':
        app = Celery(broker='amqp://guest@localhost//')
        my_monitor(app)
    

    Monitoring and Management Guide — Celery 4.4.2 documention

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search