skip to Main Content

I’ve have a fastapi server and I want to implement background task using Redis and RQ.
I’ve created FASTAPI_SERVER, REDIS_SERVER and WORKER_SERVER in docker-compose.

Here is what my fastapi code looks like for creating a task in Redis Queue:

@app.get("/api/greet")
async def greet():
    try:
        redis_connection = Redis(host=host, port=port, password=password)
        queue = Queue(name=report_queue, connection=redis_connection)
        logger.debug(f"queue created successfully! {queue.connection}")

        job = queue.enqueue("app.tasks.generate_report_task", {"name": "test"})
        logger.debug(f"job created successfully! {job.id}")
    except Exception as e:
        logger.error(f"queue creation failed: {e}")
        raise Exception(f"queue creation failed: {e}")
    return {"message": "Hello World"}

This is my folder structure of WORKER_SERVER:

──> ls           
app  Dockerfile  requirements.txt  scripts  supervisord.conf  venv
──> cd app && tree
├── config.py
├── database.py
├── __init__.py
├── model.py
├── reports
│   ├── generation.py
│   ├── __init__.py
│   └── utils.py
├── tasks.py
└── templates
    └── ppt_template.pptx

Fastapi is able to create task in redis queue

fastapi-4          | 2024-01-26 08:57:53.463 | DEBUG    | app.main:greet:176 - queue created successfully! Redis<ConnectionPool<Connection<host=redis,port=6379,db=0>>>
fastapi-4          | 2024-01-26 08:58:00.584 | DEBUG    | app.main:greet:179 - job created successfully! 71dc6cb5-29b6-4d30-b078-5b315bd8ea44

But when RQ finds the task, it raises Attribute error:

worker_server      | 2024-01-26 08:58:01,153 DEBG 'worker-0' stderr output:
worker_server      | 08:58:01 [Job 71dc6cb5-29b6-4d30-b078-5b315bd8ea44]: exception raised while executing (app.tasks.generate_report_task)
worker_server      | Traceback (most recent call last):
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 118, in import_attribute
worker_server      |     attribute_owner = getattr(module, attribute_owner_name)
worker_server      | AttributeError: module 'app' has no attribute 'tasks'
worker_server      | 
worker_server      | During handling of the above exception, another exception occurred:
worker_server      | 
worker_server      | Traceback (most recent call last):
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/worker.py", line 1428, in perform_job
worker_server      |     rv = job.perform()
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1278, in perform
worker_server      |     self._result = self._execute()
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 1315, in _execute
worker_server      |     result = self.func(*self.args, **self.kwargs)
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/job.py", line 425, in func
worker_server      |     return import_attribute(self.func_name)
worker_server      |   File "/home/appuser/venv/lib/python3.10/site-packages/rq/utils.py", line 120, in import_attribute
worker_server      |     raise ValueError('Invalid attribute name: %s' % attribute_name)
worker_server      | ValueError: Invalid attribute name: generate_report_task

I’ve tried to recreate a simple version of this issue without using docker and it worked.

# enqueue_tasks.py

from rq import Queue
from redis import Redis

if __name__ == "__main__":
    redis_conn = Redis()
    queue = Queue(name="report-generation", connection=redis_conn)

    job = queue.enqueue("worker.tasks.example_task", kwargs={"x": 1, "y": 2})
    print(f"Task enqueued with job ID: {job.id}")
import asyncio

async def example_task(**kwargs):
    print("started")
    await asyncio.sleep(5)
    return kwargs.get("x", 1) + kwargs.get("y", 2)

Using rq worker command to run the worker
rq worker report-generation

2

Answers


  1. Chosen as BEST ANSWER

    Answering my own question:

    There are multiple cases:

    1. Set the Python path as follows (setting Python path informs your RQ from where to look for tasks):

      $ export PATH=[your_python_path]:$PATH
      $ export PYTHONPATH=[your_project_path]:$PYTHONPATH
      $ rq worker
      
    2. After examining the library code, I discovered it uses getattr to retrieve task functions from the module, but there is an issue in that part. If it encounters a problem with that file, it raises an error "module xyz has no attribute abc." Therefore, please check for any common errors.

      In my codebase, I resolved this by adding a test code that runs when I start the RQ container. It checks for task imports, and if they are working, it instructs the entrypoint script to continue. If the code crashes, the container stops.

    3. When enqueuing tasks, check for any common errors in the task path. This has happened to me many times, so please verify that you're not facing a common issue and that you're not overthinking the problem.

    4. Ensure that your task exists in the folder where you expect it to be. Double-check this aspect.

    These steps should be sufficient to address the issues in your project. Thanks!


  2. From this Python RQ tutorial, add the paths if having virtual environments.

    $ export PATH=[your_python_path]:$PATH
    $ export PYTHONPATH=[your_project_path]:$PYTHONPATH
    $ rq worker
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search