skip to Main Content

In my web-application I start backgrounds jobs with celery without storing their id. Some of the task are periodic and some are triggered by user-interaction. The celery-tasks just do their thing and eventually the user will see the updated data in their browser. When a task has recently failed, I want to notify all logged-in admin-users about it (since they are usually the ones who triggered the recent failure). So they at least know something’s up.

The relevant celery-methods I found, either require a valid task-id (e.g. celery.result.AsyncResult) or they only have infos about active tasks, but not about finished/failed tasks (e.g.

I am using a flask-frontend, a redis-backend for celery and also a regular DB for persistent data.

How would I collect information about recently finished or failed celery tasks in this scenario?

What I have tried:

# I setup celery with 
my_celery_project = Celery(__name__,

# later in the view I want to collect status information:

i = my_celery_project.control.inspect() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist

# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)

It should be possible, since I can get the results manually from redis with redis-cli --scan and then do my_task.AsyncResult('id_from_redis').status to check the result. Something similar to flower could also work, but that would’t work so well with the state-less nature of a web-application, I think.

this is not a duplicate of these questions, since they don’t assume a redis-backend. Also they are 4+ years out-of-date:

this is not a duplicate of these questions, since my redis-backend is in fact working:

this is not a duplicate of this questions, since it is exactly the opposite to my questions. They care about old results, while I care explicitly only about recent results: How to read celery results from redis result backend



  1. Chosen as BEST ANSWER

    in the end my solution was to fetch the IDs directly form the backend and then convert them to Object via my celery-instance:

      task_results: List[AsyncResult] = []
      for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
        task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
      return task_results

    then I used async_result.ready() to filter out the ones I'm interested on.

    on a side note: Now I also call async_result.forget() to cleanup old tasks, which I didn't do before.

  2. you should use signal ,like this:

    from celery import signals
    def exception_handle(sender, task_id, exception, **kwargs):
        if isinstance(exception, redis.exceptions.LockError):
            loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
        loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:n")
    def celery_log(logger, **kwargs):
        check_console(logger, **kwargs)
    def task_log(logger, **kwargs):
        # todo: add your loggre handle herre...
        check_console(logger, **kwargs)
    def clean_lock(**kwargs):'worker_ready')
    def hook_prefork(sender, **kwargs):
    def check_console(logger, format, **kwargs):
        if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
            console = logging.StreamHandler()
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top