skip to Main Content

I’m attempting to have an asynchronous task kicked off in a Flask route, and use JS to poll the task status location and retrieve the result when it is complete. The task shows as "PENDING" and doesn’t ever succeeded or fail.

When I run celery -A app worker --loglevel=debug, I get this: error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'. I’ve done quite a bit of searching but haven’t found the cause of this. Below is a stripped-down example.

Here’s the basic flow:
enter image description here

  1. routes.py
...
from app.celery_tasks import *

# This route initiates a test
@main_bp.route('/test_celery')
def test_celery_route():
    run_test_url = url_for('main_bp.run_test_task', _external=True)
    response = requests.post(run_test_url)
    test_task_url = response.headers.get('Location')

    return render_template('main/celery_test_page.html', test_task_url=test_task_url)
  1. Test url endpoint and task status

# This endpoint receives the request and kicks of a celery task
@main_bp.route('/run_test_task', methods=['POST'])
def run_test_task():

    task = test_task.apply_async()

    return jsonify({}), 202, {'Location': url_for('main_bp.test_task_status',
                                                  task_id=task.id)}

# The task result lives here
@main_bp.route('/test_task_status/<task_id>')
def test_task_status(task_id):
    task = test_task.AsyncResult(task_id)
    response = {
        'state': task.state
        }

    return jsonify(response)
  1. test_celery.html: HTML + JS
{% extends 'base.html' %}

{% block app_content %}
<h1>Test Page</h1>

<div id="test-div">
    </div>
{% endblock app_content %}


{% block scripts %}

{{ super() }}

<script>
    var test_task_url = "{{ test_task_url }}";
    console.log(test_task_url);

    update_test_results(test_task_url);

    function update_test_results(status_url) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            console.log(data);
            console.log('Status Update function triggered')
            console.log(data['state'])
            if (data['state'] == 'SUCCESS'){
                console.log('Task succeeded')
                $("#test-div").append('<p>Task <b>Succeeded</b></p>')
                }

            else if (data['state'] == 'FAILURE') {
                console.log('Task failed')
                $("#test-div").append('<p>Task <b>Failed</b></p>')
            }

            else if (data['state'] == 'PENDING') {
                console.log('Task is pending')
                $("#test-div").append('<p>Task is <b>Pending</b></p>')
            }

            else {
                console.log('Other state found')
                // rerun in 2 seconds
                setTimeout(function() {
                    var status_url = "{{ test_task_url }}";
                    update_test_results(status_url);
                }, 2000);
            }
        });
    };

</script>

{% endblock scripts %}

I think the above code is OK, but I’m posting it anyway for additional context.

app/celery_tasks.py

...
from . import make_celery

@celery.task(bind=True)
def test_task(self):
    time.sleep(5)

    return 1

app/init.py

...
from celery import Celery

def make_celery(app=None):
    app = app or create_app()
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_BACKEND_URL'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    celery.app = app

    return celery

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)

    celery.conf.update(app.config)
...

config.py
Note that this lives at the same level as the app directory, not within the app directory

class Config(object):
    ...
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_BACKEND_URL = 'redis://localhost:6379/0'

So, with this setup, when I run test, I see:

[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Hub
[2021-05-15 07:43:00,384: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Pool
[2021-05-15 07:43:00,899: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,900: DEBUG/MainProcess] | Worker: Starting Consumer
[2021-05-15 07:43:00,901: DEBUG/MainProcess] | Consumer: Starting Connection
[2021-05-15 07:43:00,920: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-05-15 07:43:00,920: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,921: DEBUG/MainProcess] | Consumer: Starting Events
[2021-05-15 07:43:00,931: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,931: DEBUG/MainProcess] | Consumer: Starting Heart
[2021-05-15 07:43:00,934: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,934: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-05-15 07:43:00,934: INFO/MainProcess] mingle: searching for neighbors
[2021-05-15 07:43:01,961: INFO/MainProcess] mingle: all alone
[2021-05-15 07:43:01,961: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,961: DEBUG/MainProcess] | Consumer: Starting Tasks
[2021-05-15 07:43:01,965: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,965: DEBUG/MainProcess] | Consumer: Starting Control
[2021-05-15 07:43:01,968: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,968: DEBUG/MainProcess] | Consumer: Starting Gossip
[2021-05-15 07:43:01,971: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Consumer: Starting event loop
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2021-05-15 07:43:01,972: INFO/MainProcess] [email protected] ready.
[2021-05-15 07:43:01,972: DEBUG/MainProcess] basic.qos: prefetch_count->32

Then, when I load the route (/test_celery), I get this error:

[2021-05-15 07:43:06,717: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/Users/me/soul/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 555, in on_task_received
    strategy = strategies[type_]
KeyError: 'app.celery_tasks.test_task'

I checked the docs referenced in this error but still couldn’t figure out this issue.

2

Answers


  1. Chosen as BEST ANSWER

    I resolved this by changing celery -A app worker to celery -A app.celery_tasks worker. After this change I could see the tasks from celery_tasks.py as registered.


  2. In app/__init__.py you’re making one instance of Celery unconditionally, and a second when invoking make_celery(). That’s incidental to the problem you’re seeing, but does suggest confused structuring.

    Here, the problem you’re seeing starts with

    celery -A app worker
    

    and what happens (or more importantly, what doesn’t happen) when app is imported.

    error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'
    

    is suggesting that importing app isn’t sufficient to cause app.celery_tasks to get imported. I suspect that it’s getting imported indirectly from within create_app() if you’re importing blueprints there, but create_app() isn’t called when celery imports app, so your tasks aren’t visible to celery.

    Consider a wrapper around your app. Let’s call it myapp.py

    from app import celery, create_app
    
    app = create_app()
    

    then move the bulk of make_celery() into create_app().

    When starting up the app, FLASK_APP=myapp flask run will use myapp.app, which the the Flask instance, and celery -A myapp.celery will pick up the celery instance, after having run create_app(), which imports the blueprint, which imports app.celery_tasks as a side-effect.

    The benefit of this approach is that the setup is identical between the webapp and the celery worker instances, which avoids a set of hard-to-diagnose issues.

    I have a working example here that you can borrow from.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search