skip to Main Content

In my web-application I start backgrounds jobs with celery without storing their id. Some of the task are periodic and some are triggered by user-interaction. The celery-tasks just do their thing and eventually the user will see the updated data in their browser. When a task has recently failed, I want to notify all logged-in admin-users about it (since they are usually the ones who triggered the recent failure). So they at least know something’s up.

The relevant celery-methods I found, either require a valid task-id (e.g. celery.result.AsyncResult) or they only have infos about active tasks, but not about finished/failed tasks (e.g. celery.app.control.Inspect).

I am using a flask-frontend, a redis-backend for celery and also a regular DB for persistent data.

How would I collect information about recently finished or failed celery tasks in this scenario?

What I have tried:

# I setup celery with 
my_celery_project = Celery(__name__,
                backend='redis://localhost:1234/0',
                broker='redis://localhost:1234/0')

# later in the view I want to collect status information:

i = my_celery_project.control.inspect()

i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist


# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)

It should be possible, since I can get the results manually from redis with redis-cli --scan and then do my_task.AsyncResult('id_from_redis').status to check the result. Something similar to flower could also work, but that would’t work so well with the state-less nature of a web-application, I think.


this is not a duplicate of these questions, since they don’t assume a redis-backend. Also they are 4+ years out-of-date:

this is not a duplicate of these questions, since my redis-backend is in fact working:

this is not a duplicate of this questions, since it is exactly the opposite to my questions. They care about old results, while I care explicitly only about recent results: How to read celery results from redis result backend

2

Answers


  1. Chosen as BEST ANSWER

    in the end my solution was to fetch the IDs directly form the backend and then convert them to Object via my celery-instance:

    
      task_results: List[AsyncResult] = []
      for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
        task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
        task_results.append(self.celery.AsyncResult(task_id))
      return task_results
    

    then I used async_result.ready() to filter out the ones I'm interested on.

    on a side note: Now I also call async_result.forget() to cleanup old tasks, which I didn't do before.


  2. you should use signal ,like this:

    from celery import signals
    
    @signals.task_failure.connect
    def exception_handle(sender, task_id, exception, **kwargs):
        if isinstance(exception, redis.exceptions.LockError):
            loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
            return
        loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:n")
    
    @signals.after_setup_logger.connect
    def celery_log(logger, **kwargs):
        check_console(logger, **kwargs)
    
    
    @signals.after_setup_task_logger.connect
    def task_log(logger, **kwargs):
        # todo: add your loggre handle herre...
        check_console(logger, **kwargs)
    
    
    @signals.worker_ready.connect
    def clean_lock(**kwargs):
        loggert.info('worker_ready')
    
    
    @signals.worker_init.connect
    def hook_prefork(sender, **kwargs):
        ...
    
    def check_console(logger, format, **kwargs):
        if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
            console = logging.StreamHandler()
            console.setFormatter(logging.Formatter(format))
            console.setLevel(logging.INFO)
            logger.addHandler(console)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search