I’m getting this error when trying to run a Celery task in my Flask app:
celery.exceptions.TimeoutError: The operation timed out.
Here’s my config file:
from celery import Celery
import os
from flask import Flask
def create_celery_app():
celery = Celery(
'celery-app',
broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
)
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='America/Los_Angeles',
enable_utc=True,
task_track_started=True,
imports=['tasks'],
task_routes={
'tasks.run_action_task': {'queue': 'default'},
'tasks.add': {'queue': 'default'},
}
)
return celery
celery_app = create_celery_app()
def init_celery(app: Flask):
class ContextTask(celery_app.Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery_app.Task = ContextTask
__all__ = ['celery_app', 'init_celery']
And the task definition:
from celery_config import celery_app
from flask import current_app
@celery_app.task
def add(x, y):
try:
current_app.logger.info(f"Starting add task with {x} and {y}")
return 'Celery add task executed successfully! ' + str(x + y)
except Exception as e:
current_app.logger.error(f"Error in task add: {str(e)}")
raise
The Celery app is initiated in app.py:
# Initialize Celery with Flask app context
init_celery(app)
The other odd thing is that in the log, the task appears but the code doesn’t execute.
2024-11-14 22:30:35,600 - flower.command - INFO - Broker: redis://localhost:6379/0
2024-11-14 22:30:35,602 - flower.command - INFO - Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap',
'tasks.add',
'tasks.run_action_task',
'tasks.simple_test_task']
2024-11-14 22:30:35,610 - kombu.mixins - INFO - Connected to redis://localhost:6379/0
2024-11-14 22:30:47,063 - celery.utils.functional - DEBUG -
def add(x, y):
return 1
I’m starting Celery with this command:
celery -A app:celery worker --loglevel=info
I have Redis and Flower running. Flower shows the worker running, but the tasks aren’t being assigned to the worker. I’m running everything in Python venv on Windows.
2
Answers
The combination of using the command from u/BcK (naming the queue) and this workaround for Windows solves the issue:
Celery is not sending task messages to Docker Redis, tasks stuck on "received"
The default value for the --pool flag for celery is prefork. Unfortunately it seems that this is not supported for windows. the gevent pool does work.
just install gevent with pip install gevent and then add --pool=gevent to your celery worker command.
You’re instructing celery to send the
add
task to thedefault
queue. In that case, when you are running a worker you need to give it the queue name it should subscribe to, otherwise it defaults to "celery".celery --app flask_app.celery_app worker --loglevel=info -Q default,celery
Also you’re having a circular dependency issue. Here’s the file structure I used for your case: