I have "locker" decorator:
def lock_task(func):
def wrapper(*args, **kwargs):
if redis.set(func.__name__, 'lock', nx=True):
try:
result = func(*args, **kwargs)
finally:
redis.delete(func.__name__)
return result or True
else:
return 'Skipped'
return wrapper
Also I have celery-task with my decorator:
@celery_app.task
@lock_task
def test():
call_command('test')
And I have my celery-beat settings:
celery_app.conf.beat_schedule = {
'test': {
'task': 'project.celery.test',
'schedule': crontab(minute='*/1')
}
}
After starting I got KeyError Received unregistered task of type ‘project.celery.test’.
How to call this construct correct?
2
Answers
The problem was solved by adding the name to decorator like
and then set this name to scheduler:
It works for me.
It seems that the
wrapper
function would be the one registered to celery instead of the actualtest
function. You can verify it if you see this logs when you start the celery worker:To register the name of the actual task, use functools.wraps() which as documented:
If the error persists, make sure that you have configured correctly either:
celery_app.conf.update(imports=['project.celery'])
orcelery_app.conf.imports = ['project.celery']
celery_app = Celery(..., include=['project.celery'])
To verify, you should see your task named
project.celery.test
when you start the celery worker (to emphasize, the worker, not the scheduler):--loglevel=INFO
. If you don’t see the tasktest
in there or see thewrapper
instead, then the above steps might help.