skip to Main Content

I’m getting this error when trying to run a Celery task in my Flask app:

celery.exceptions.TimeoutError: The operation timed out.

Here’s my config file:

from celery import Celery
import os
from flask import Flask

def create_celery_app():
    celery = Celery(
        'celery-app',
        broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'),
        backend=os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
    )

    celery.conf.update(
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        timezone='America/Los_Angeles',
        enable_utc=True,
        task_track_started=True,
        imports=['tasks'],
        task_routes={
            'tasks.run_action_task': {'queue': 'default'},
            'tasks.add': {'queue': 'default'},
        }
    )
    return celery

celery_app = create_celery_app()

def init_celery(app: Flask):
    class ContextTask(celery_app.Task):
        abstract = True
        
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app.Task = ContextTask

__all__ = ['celery_app', 'init_celery']

And the task definition:

from celery_config import celery_app
from flask import current_app

@celery_app.task
def add(x, y):
    try:
        current_app.logger.info(f"Starting add task with {x} and {y}")
        return 'Celery add task executed successfully! ' + str(x + y)
    except Exception as e:
        current_app.logger.error(f"Error in task add: {str(e)}")
        raise

The Celery app is initiated in app.py:

# Initialize Celery with Flask app context
init_celery(app)

The other odd thing is that in the log, the task appears but the code doesn’t execute.

2024-11-14 22:30:35,600 - flower.command - INFO - Broker: redis://localhost:6379/0
2024-11-14 22:30:35,602 - flower.command - INFO - Registered tasks: 
['celery.accumulate',
 'celery.backend_cleanup',
 'celery.chain',
 'celery.chord',
 'celery.chord_unlock',
 'celery.chunks',
 'celery.group',
 'celery.map',
 'celery.starmap',
 'tasks.add',
 'tasks.run_action_task',
 'tasks.simple_test_task']
2024-11-14 22:30:35,610 - kombu.mixins - INFO - Connected to redis://localhost:6379/0
2024-11-14 22:30:47,063 - celery.utils.functional - DEBUG - 
def add(x, y):
    return 1

I’m starting Celery with this command:

celery -A app:celery worker --loglevel=info

I have Redis and Flower running. Flower shows the worker running, but the tasks aren’t being assigned to the worker. I’m running everything in Python venv on Windows.

2

Answers


  1. Chosen as BEST ANSWER

    The combination of using the command from u/BcK (naming the queue) and this workaround for Windows solves the issue:

    Celery is not sending task messages to Docker Redis, tasks stuck on "received"

    The default value for the --pool flag for celery is prefork. Unfortunately it seems that this is not supported for windows. the gevent pool does work.

    just install gevent with pip install gevent and then add --pool=gevent to your celery worker command.


  2. You’re instructing celery to send the add task to the default queue. In that case, when you are running a worker you need to give it the queue name it should subscribe to, otherwise it defaults to "celery".

    celery --app flask_app.celery_app worker --loglevel=info -Q default,celery

    Also you’re having a circular dependency issue. Here’s the file structure I used for your case:

    # flask_app.py
    
    from flask import Flask
    
    from celery_app import celery_init_app
    
    app = Flask(__name__)
    celery_app = celery_init_app(app)
    
    
    @app.route("/")
    def index():
        from tasks import add
    
        add.delay(1, 2)
        return "Hello, World!"
    
    # tasks.py
    
    from flask import current_app
    
    from flask_app import celery_app
    
    
    @celery_app.task
    def add(x, y):
        try:
            current_app.logger.info(f"Starting add task with {x} and {y}")
            return "Celery add task executed successfully! " + str(x + y)
        except Exception as e:
            current_app.logger.error(f"Error in task add: {str(e)}")
            raise
    
    # celery_app.py
    
    import os
    
    from celery import Celery, Task
    from flask import Flask
    
    
    def create_celery_app(task_cls):
        celery = Celery(
            "celery-app",
            broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
            backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
            task_cls=task_cls,
        )
    
        celery.conf.update(
            task_serializer="json",
            accept_content=["json"],
            result_serializer="json",
            timezone="America/Los_Angeles",
            enable_utc=True,
            task_track_started=True,
            imports=["tasks"],
            task_routes={
                "tasks.run_action_task": {"queue": "default"},
                "tasks.add": {"queue": "default"},
            },
        )
        return celery
    
    
    def celery_init_app(app: Flask) -> Celery:
        class FlaskTask(Task):
            def __call__(self, *args: object, **kwargs: object) -> object:
                with app.app_context():
                    return self.run(*args, **kwargs)
    
        celery_app = create_celery_app(task_cls=FlaskTask)
        celery_app.set_default()
        app.extensions["celery"] = celery_app
        return celery_app
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search