skip to Main Content

I have "locker" decorator:

def lock_task(func):
    def wrapper(*args, **kwargs):
        if redis.set(func.__name__, 'lock', nx=True):
            try:
                result = func(*args, **kwargs)
            finally:
                redis.delete(func.__name__)
            return result or True
        else:
            return 'Skipped'
    return wrapper

Also I have celery-task with my decorator:

@celery_app.task
@lock_task
def test():
    call_command('test')

And I have my celery-beat settings:

celery_app.conf.beat_schedule = {
    'test': {
        'task': 'project.celery.test',
        'schedule': crontab(minute='*/1')
    }
}

After starting I got KeyError Received unregistered task of type ‘project.celery.test’.

How to call this construct correct?

2

Answers


  1. Chosen as BEST ANSWER

    The problem was solved by adding the name to decorator like

    @celery_app.task(name='TASKNAME')
    @lock_task
    def test():
        call_command('test')
    

    and then set this name to scheduler:

    celery_app.conf.beat_schedule = {
        'test': {
            'task': 'TASKNAME',
            'schedule': crontab(minute='*/1')
        }
    }
    

    It works for me.


  2. It seems that the wrapper function would be the one registered to celery instead of the actual test function. You can verify it if you see this logs when you start the celery worker:

    $ celery --app=tasks worker --loglevel=INFO
    ...
    [tasks]
      . project.celery.wrapper
    

    To register the name of the actual task, use functools.wraps() which as documented:

    Without the use of this decorator factory, the name of the example function would have been ‘wrapper’

    from functools import wraps
    
    def lock_task(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
        ...
    

    If the error persists, make sure that you have configured correctly either:

    • Celery imports e.g. celery_app.conf.update(imports=['project.celery']) or celery_app.conf.imports = ['project.celery']
    • Or Celery include (example) e.g. celery_app = Celery(..., include=['project.celery'])

    To verify, you should see your task named project.celery.test when you start the celery worker (to emphasize, the worker, not the scheduler):

    $ celery --app=tasks worker --loglevel=INFO
    ...
    [tasks]
      . project.celery.test
    
    • See the last line. It should be visible if you invoked the worker with the flag --loglevel=INFO. If you don’t see the task test in there or see the wrapper instead, then the above steps might help.
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search