skip to Main Content

I am trying to test an API that sends long-running jobs to a queue processed by Celery workers.. I am using RabbitMQ running in a Docker container as the message queue. However, when sending a message to the queue I get the following error: Error: [Errno 111] Connection refused

Steps to reproduce:

  • Start RabbitMQ container: docker run -d -p 5672:5672 rabbitmq
  • Start Celery server: celery -A celery worker --loglevel=INFO
  • Build docker image: docker build -t fastapi .
  • Run container docker run -it -p 8000:8000 fastapi

Dockerfile:

FROM python:3.9

WORKDIR /

COPY . .

RUN pip install --no-cache-dir --upgrade -r ./requirements.txt

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt:

anyio==3.6.1
asgiref==3.5.2
celery==5.2.7
click==8.1.3
colorama==0.4.4
fastapi==0.78.0
h11==0.13.0
httptools==0.4.0
idna==3.3
pydantic==1.9.1
python-dotenv==0.20.0
PyYAML==6.0
sniffio==1.2.0
starlette==0.19.1
typing_extensions==4.2.0
uvicorn==0.17.6
watchgod==0.8.2
websockets==10.3

app.py:

from fastapi import FastAPI
import tasks

@app.get("/{num}")
async def root(num):
  tasks.update_db.delay(num)
  return {"success": True}

tasks.py:

from celery import Celery
import time

celery = Celery('tasks', broker='amqp://')

@celery.task(name='update_db')
def update_db(num: int) -> None:
  time.sleep(30)
  return

2

Answers


  1. Update tasks.py

    
    import time
    
    celery = Celery('tasks',  broker='amqp://user:pass@host:port//')
    
    @celery.task(name='update_db')
    def update_db(num: int) -> None:
      time.sleep(30)
      return
    
    
    Login or Signup to reply.
  2. You can’t connect to rabbitmq on localhost; it’s not running in the same container as your Python app. Since you’ve exposed rabbit on your host, you can connect to it using the address of your host. One way of doing that is starting the app container like this:

    docker run -it -p 8000:8000 --add-host host.docker.internal:host-gateway fastapi
    

    And then modify your code like this:

    celery = Celery('tasks',  broker='amqp://host.docker.internal')
    

    With that code in place, let’s re-run your example:

    $ docker run -d -p 5672:5672 rabbitmq
    $ docker run -d -p 8000:8000 --add-host host.docker.internal:host-gateway fastapi
    $ curl http://localhost:8000/1
    {"success":true}
    

    There’s no reason to publish the rabbitmq ports on your host if you only need to access it from within a container. When building an application with multiple containers, using something like docker-compose can make your life easier.

    If you used the following docker-compose.yaml:

    version: "3"
    
    services:
      rabbitmq:
        image: rabbitmq
    
      app:
        build:
          context: .
        ports:
          - "8000:8000"
    

    And modified your code to connect to rabbitmq:

    celery = Celery('tasks',  broker='amqp://rabbitmq')
    

    You could then run docker-compose up to bring up both containers. Your app would be exposed on host port 8000, but rabbitmq would only be available to your app container.

    Incidentally, rather than hardcoding the broker uri in your code, you might want to get that from an environment variable instead:

    celery = Celery('tasks',  broker=os.getenv('APP_BROKER_URI'))
    

    That allows you to use different connection strings without needing to rebuild your image every time. We’d need to modify the docker-compose.yaml to include the appropriate variable:

    version: "3"
    
    services:
      rabbitmq:
        image: rabbitmq
    
      app:
        build:
          context: .
        environment:
          APP_BROKER_URI: "amqp://rabbitmq"
        ports:
          - "8000:8000"
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search