I’m attempting to have an asynchronous task kicked off in a Flask route, and use JS to poll the task status location and retrieve the result when it is complete. The task shows as "PENDING" and doesn’t ever succeeded or fail.
When I run celery -A app worker --loglevel=debug
, I get this: error: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
I’ve done quite a bit of searching but haven’t found the cause of this. Below is a stripped-down example.
- routes.py
...
from app.celery_tasks import *
# This route initiates a test
@main_bp.route('/test_celery')
def test_celery_route():
run_test_url = url_for('main_bp.run_test_task', _external=True)
response = requests.post(run_test_url)
test_task_url = response.headers.get('Location')
return render_template('main/celery_test_page.html', test_task_url=test_task_url)
- Test url endpoint and task status
# This endpoint receives the request and kicks of a celery task
@main_bp.route('/run_test_task', methods=['POST'])
def run_test_task():
task = test_task.apply_async()
return jsonify({}), 202, {'Location': url_for('main_bp.test_task_status',
task_id=task.id)}
# The task result lives here
@main_bp.route('/test_task_status/<task_id>')
def test_task_status(task_id):
task = test_task.AsyncResult(task_id)
response = {
'state': task.state
}
return jsonify(response)
- test_celery.html: HTML + JS
{% extends 'base.html' %}
{% block app_content %}
<h1>Test Page</h1>
<div id="test-div">
</div>
{% endblock app_content %}
{% block scripts %}
{{ super() }}
<script>
var test_task_url = "{{ test_task_url }}";
console.log(test_task_url);
update_test_results(test_task_url);
function update_test_results(status_url) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
console.log(data);
console.log('Status Update function triggered')
console.log(data['state'])
if (data['state'] == 'SUCCESS'){
console.log('Task succeeded')
$("#test-div").append('<p>Task <b>Succeeded</b></p>')
}
else if (data['state'] == 'FAILURE') {
console.log('Task failed')
$("#test-div").append('<p>Task <b>Failed</b></p>')
}
else if (data['state'] == 'PENDING') {
console.log('Task is pending')
$("#test-div").append('<p>Task is <b>Pending</b></p>')
}
else {
console.log('Other state found')
// rerun in 2 seconds
setTimeout(function() {
var status_url = "{{ test_task_url }}";
update_test_results(status_url);
}, 2000);
}
});
};
</script>
{% endblock scripts %}
I think the above code is OK, but I’m posting it anyway for additional context.
app/celery_tasks.py
...
from . import make_celery
@celery.task(bind=True)
def test_task(self):
time.sleep(5)
return 1
app/init.py
...
from celery import Celery
def make_celery(app=None):
app = app or create_app()
celery = Celery(
app.import_name,
backend=app.config['CELERY_BACKEND_URL'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
celery.app = app
return celery
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
celery.conf.update(app.config)
...
config.py
Note that this lives at the same level as the app directory, not within the app directory
class Config(object):
...
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_BACKEND_URL = 'redis://localhost:6379/0'
So, with this setup, when I run test
, I see:
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Hub
[2021-05-15 07:43:00,384: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,384: DEBUG/MainProcess] | Worker: Starting Pool
[2021-05-15 07:43:00,899: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,900: DEBUG/MainProcess] | Worker: Starting Consumer
[2021-05-15 07:43:00,901: DEBUG/MainProcess] | Consumer: Starting Connection
[2021-05-15 07:43:00,920: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-05-15 07:43:00,920: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,921: DEBUG/MainProcess] | Consumer: Starting Events
[2021-05-15 07:43:00,931: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,931: DEBUG/MainProcess] | Consumer: Starting Heart
[2021-05-15 07:43:00,934: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:00,934: DEBUG/MainProcess] | Consumer: Starting Mingle
[2021-05-15 07:43:00,934: INFO/MainProcess] mingle: searching for neighbors
[2021-05-15 07:43:01,961: INFO/MainProcess] mingle: all alone
[2021-05-15 07:43:01,961: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,961: DEBUG/MainProcess] | Consumer: Starting Tasks
[2021-05-15 07:43:01,965: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,965: DEBUG/MainProcess] | Consumer: Starting Control
[2021-05-15 07:43:01,968: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,968: DEBUG/MainProcess] | Consumer: Starting Gossip
[2021-05-15 07:43:01,971: DEBUG/MainProcess] ^-- substep ok
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Consumer: Starting event loop
[2021-05-15 07:43:01,971: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2021-05-15 07:43:01,972: INFO/MainProcess] [email protected] ready.
[2021-05-15 07:43:01,972: DEBUG/MainProcess] basic.qos: prefetch_count->32
Then, when I load the route (/test_celery), I get this error:
[2021-05-15 07:43:06,717: ERROR/MainProcess] Received unregistered task of type 'app.celery_tasks.test_task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/Users/me/soul/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 555, in on_task_received
strategy = strategies[type_]
KeyError: 'app.celery_tasks.test_task'
I checked the docs referenced in this error but still couldn’t figure out this issue.
2
Answers
I resolved this by changing
celery -A app worker
tocelery -A app.celery_tasks worker
. After this change I could see the tasks from celery_tasks.py as registered.In
app/__init__.py
you’re making one instance of Celery unconditionally, and a second when invokingmake_celery()
. That’s incidental to the problem you’re seeing, but does suggest confused structuring.Here, the problem you’re seeing starts with
and what happens (or more importantly, what doesn’t happen) when
app
is imported.is suggesting that importing
app
isn’t sufficient to causeapp.celery_tasks
to get imported. I suspect that it’s getting imported indirectly from withincreate_app()
if you’re importing blueprints there, butcreate_app()
isn’t called when celery importsapp
, so your tasks aren’t visible to celery.Consider a wrapper around your app. Let’s call it
myapp.py
then move the bulk of
make_celery()
intocreate_app()
.When starting up the app,
FLASK_APP=myapp flask run
will usemyapp.app
, which the the Flask instance, andcelery -A myapp.celery
will pick up the celery instance, after having runcreate_app()
, which imports the blueprint, which importsapp.celery_tasks
as a side-effect.The benefit of this approach is that the setup is identical between the webapp and the celery worker instances, which avoids a set of hard-to-diagnose issues.
I have a working example here that you can borrow from.