I’ve have a fastapi server and I want to implement background task using Redis and RQ.
I’ve created FASTAPI_SERVER, REDIS_SERVER and WORKER_SERVER in docker-compose.
Here is what my fastapi code looks like for creating a task in Redis Queue:
@app.get("/api/greet")
async def greet():
try:
redis_connection = Redis(host=host, port=port, password=password)
queue = Queue(name=report_queue, connection=redis_connection)
logger.debug(f"queue created successfully! {queue.connection}")
job = queue.enqueue("app.tasks.generate_report_task", {"name": "test"})
logger.debug(f"job created successfully! {job.id}")
except Exception as e:
logger.error(f"queue creation failed: {e}")
raise Exception(f"queue creation failed: {e}")
return {"message": "Hello World"}
This is my folder structure of WORKER_SERVER:
──> ls
app Dockerfile requirements.txt scripts supervisord.conf venv
──> cd app && tree
├── config.py
├── database.py
├── __init__.py
├── model.py
├── reports
│ ├── generation.py
│ ├── __init__.py
│ └── utils.py
├── tasks.py
└── templates
└── ppt_template.pptx
Fastapi is able to create task in redis queue
fastapi-4 | 2024-01-26 08:57:53.463 | DEBUG | app.main:greet:176 - queue created successfully! Redis<ConnectionPool<Connection<host=redis,port=6379,db=0>>>
fastapi-4 | 2024-01-26 08:58:00.584 | DEBUG | app.main:greet:179 - job created successfully! 71dc6cb5-29b6-4d30-b078-5b315bd8ea44
But when RQ finds the task, it raises Attribute error:
worker_server | 2024-01-26 08:58:01,153 DEBG 'worker-0' stderr output:
worker_server | 08:58:01 [Job 71dc6cb5-29b6-4d30-b078-5b315bd8ea44]: exception raised while executing (app.tasks.generate_report_task)
worker_server | Traceback (most recent call last):
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 118, in import_attribute
worker_server | attribute_owner = getattr(module, attribute_owner_name)
worker_server | AttributeError: module 'app' has no attribute 'tasks'
worker_server |
worker_server | During handling of the above exception, another exception occurred:
worker_server |
worker_server | Traceback (most recent call last):
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/worker.py", line 1428, in perform_job
worker_server | rv = job.perform()
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1278, in perform
worker_server | self._result = self._execute()
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1315, in _execute
worker_server | result = self.func(*self.args, **self.kwargs)
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 425, in func
worker_server | return import_attribute(self.func_name)
worker_server | File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 120, in import_attribute
worker_server | raise ValueError('Invalid attribute name: %s' % attribute_name)
worker_server | ValueError: Invalid attribute name: generate_report_task
I’ve tried to recreate a simple version of this issue without using docker and it worked.
# enqueue_tasks.py
from rq import Queue
from redis import Redis
if __name__ == "__main__":
redis_conn = Redis()
queue = Queue(name="report-generation", connection=redis_conn)
job = queue.enqueue("worker.tasks.example_task", kwargs={"x": 1, "y": 2})
print(f"Task enqueued with job ID: {job.id}")
import asyncio
async def example_task(**kwargs):
print("started")
await asyncio.sleep(5)
return kwargs.get("x", 1) + kwargs.get("y", 2)
Using rq worker
command to run the worker
rq worker report-generation
2
Answers
Answering my own question:
There are multiple cases:
Set the Python path as follows (setting Python path informs your RQ from where to look for tasks):
After examining the library code, I discovered it uses
getattr
to retrieve task functions from the module, but there is an issue in that part. If it encounters a problem with that file, it raises an error "module xyz has no attribute abc." Therefore, please check for any common errors.In my codebase, I resolved this by adding a test code that runs when I start the RQ container. It checks for task imports, and if they are working, it instructs the entrypoint script to continue. If the code crashes, the container stops.
When enqueuing tasks, check for any common errors in the task path. This has happened to me many times, so please verify that you're not facing a common issue and that you're not overthinking the problem.
Ensure that your task exists in the folder where you expect it to be. Double-check this aspect.
These steps should be sufficient to address the issues in your project. Thanks!
From this Python RQ tutorial, add the paths if having virtual environments.