skip to Main Content

In my django application I am using celery. In a post_save signal, I am updating the index in elastic search. But for some reason the task gets hung and never actually executes the code:

What I use to run celery:

celery -A collegeapp worker -l info

The Signal:

@receiver(post_save, sender=University)
def university_saved(sender, instance, created, **kwargs):
    """
    University save signal
    """
    print('calling celery task')
    update_university_index.delay(instance.id)
    print('finished')

The task:

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
    print('updating university index')

The only output I get is calling celery task. after waiting over 30 minutes, it doesn’t ever get to any other print statements and the view continue to wait. Nothing ever shows in celery terminal.

Versions:
Django 3.0,
Celery 4.3,
Redis 5.0.9,
Ubuntu 18

UPDATE:
after doing some testing, using the debug_task defined inside the celery.py file in place of update_university_index does not lead to hanging. It behaves as expect. I thought maybe it could have been app.task vs task decorator but it seems that’s not it.

@app.task(bind=True)
def debug_task(text, second_value):
    print('printing  debug_task {} {}'.format(text, second_value))

2

Answers


  1. Chosen as BEST ANSWER

    I'm still not sure as to why it doesn't work but I found a solution by replace task with app.task

    importing app from my celery.py seemed to have resolved the issue.

    from collegeapp.celery import app
    
    @app.task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
    def update_university_index(self, instance_id):
        print('updating university index')
    

  2. This happened with me once, I had made the dumbest error, django tells us to specify celery tasks in tasks.py file, and uses that for task discovery. After that it worked. Could you provide more insight into the directory structure using tree command?

    This tutorial is for flask, but the same can be achieved in django. Where this particular tutorial shines is that after you tell celery to execute a task, it also provides you with a uuid and you can ping that url and monitor the progress of the task you triggered.

    Verify that the tasks have been registered by celery using (Do make sure that celery is running):

    from celery.task.control import inspect
    i = inspect()
    i.registered_tasks()
    

    Or bash

    $ celery inspect registered
    $ celery -A collegeapp inspect registered
    

    From https://docs.celeryproject.org/en/latest/faq.html#the-worker-isn-t-doing-anything-just-hanging

    Why is Task.delay/apply*/the worker just hanging?

    Answer: There’s a bug in some AMQP clients that’ll make it hang if it’s not able to authenticate the current user, the password doesn’t match or the user doesn’t have access to the virtual host specified. Be sure to check your broker logs (for RabbitMQ that’s /var/log/rabbitmq/rabbit.log on most systems), it usually contains a message describing the reason.

    Change this line

    @task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
    def update_university_index(instance_id):
        print('updating university index')
    

    To

    @task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
    def update_university_index(self, instance_id):
        print('updating university index')
    

    Or add self to the task definition.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search