skip to Main Content

I have the following project tree (for testing purposes) and I am trying to understand how Celery is loading tasks.

app
├── __init__.py
├── app.py
├── celery.py
└── my_tasks
    ├── __init__.py
    └── tasks.py

celery.py contains the following code for creating a Celery instance:

from celery import Celery

app = Celery("app", backend="rpc://", broker="redis://localhost:6379/0")
app.autodiscover_tasks()

tasks.py creates a task:

from app.celery import app
from time import sleep


@app.task
def run_tests_for_hash():
   # task code here

and app.py contains a FastAPI application with one endpoint to create a task

from fastapi import FastAPI
from app.call_flow_tasks.tasks import run_tests_for_hash

app = FastAPI(title="Testing Studio runners")

@app.post("/create_task")
def create_task():

    results = run_tests_for_hash.delay()

    return {"task_id": results.id, "status": results.status}

but trying to run Celery worker from command line I get an error:

$celery -A app worker
Traceback (most recent call last):
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/__main__.py", line 16, in main
    _main()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 322, in main
    cmd.execute_from_commandline(argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 495, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 487, in handle_argv
    return self.execute(command, argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 419, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 223, in run_from_argv
    return self(*args, **options)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 253, in __call__
    ret = self.run(*args, **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 258, in run
    **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/worker/worker.py", line 96, in __init__
    self.app.loader.init_worker()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 114, in init_worker
    self.import_default_modules()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 108, in import_default_modules
    raise response
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/utils/dispatch/signal.py", line 288, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 695, in _autodiscover_tasks
    return self._autodiscover_tasks_from_fixups(related_name)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 705, in _autodiscover_tasks_from_fixups
    pkg for fixup in self._fixups
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 706, in <listcomp>
    for pkg in fixup.autodiscover_tasks()
AttributeError: 'NoneType' object has no attribute 'autodiscover_tasks'

Command is executed on the same level as the app python package. If I remove autodiscover it works as it should and workers are loaded normally. Any help on how celery autodiscovers tasks and how to load task from different modules?

3

Answers


  1. From Celery docs:

    https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.autodiscover_tasks

    You should provide a list of package names to the autodiscover_tasks method, if you don’t provide it will be delegated to fix-ups (e.g.: Django – as long you properly configure those settings).

    The easiest way to go would be to add your tasks module under the packages list:

    app.autodiscover_tasks(packages=['my_tasks'])

    then you would be able to run the worker on the command line.

    Hope that helps!

    Login or Signup to reply.
  2. You can combine two types of autodiscovers:

    app.autodiscover_tasks()  # Find tasks using celery.fixups (i.e. Django apps via INSTALLED_APPS).
    app.autodiscover_tasks(  # Add other tasks not included in the apps.
        [
            'project.other.tasks',
            'another_project.another.app',
        ]
    )
    

    Celery has only one BUILTIN_FIXUPS = {'celery.fixups.django:fixup'}, see it’s code to understand.

        def autodiscover_tasks(self):
            from django.apps import apps
            return [config.name for config in apps.get_app_configs()]
    
    Login or Signup to reply.
  3. If your application is bigger may will be:

    ├── api
    │   ├── __init__.py
    │   └── v1
    │       ├── __init__.py
    │       └── routers
    │           ├── __init__.py
    │           ├── entry.py
    │           └── feed.py
    ├── core
    │   ├── __init__.py
    │   ├── config.py
    │   └── db.py
    ├── main.py
    ├── models
    │   ├── __init__.py
       ...
    └── workers
        ├── __init__.py
        ├── entries
        │   ├── __init__.py
        │   └── tasks.py
        └── worker.py
    

    in worker.py

    from celery import Celery
    
    celery_app = Celery(
        "worker",
        backend="redis://localhost:6379/0",
        broker="amqp://localhost:5672//"
    )
    
    celery_app.autodiscover_tasks(['app.workers.entries'])
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search