skip to Main Content

I tested it on windows and it worked, but now I want to do it using docker. The problem is when I try to execute task to send email to user I get error: [Errno 111] Connection refused, but celery starts successfully and connects to rabbitmq. Why can’t celery send tasks to rabbitmq?

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 32, in __call__
    return self.__value__
           ^^^^^^^^^^^^^^

During handling of the above exception ('ChannelPromise' object has no attribute '__value__'), another exception occurred:
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
    yield
    ^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 318, in retry_over_time
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 941, in _connection_factory
    self._connection = self._establish_connection()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 867, in _establish_connection
    conn = self.transport.establish_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
    conn.connect()
    ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/connection.py", line 323, in connect
    self.transport.connect()
    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 129, in connect
    self._connect(self.host, self.port, self.connect_timeout)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 184, in _connect
    self.sock.connect(sa)
    ^^^^^^^^^^^^^^^^^^^^^

The above exception ([Errno 111] Connection refused) was the direct cause of the following exception:
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/website/journal_website/views.py", line 281, in register_new_user
    send_email_message_to_user_with_activation_link.delay(new_user.pk, new_user_additional_data.code)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
           
  File "/usr/local/lib/python3.11/dist-packages/celery/app/base.py", line 798, in send_task
    amqp.send_task_message(P, name, message, **options)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/celery/app/amqp.py", line 517, in send_task_message
    ret = producer.publish(
          
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 186, in publish
    return _publish(
           
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 563, in _ensured
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 195, in _publish
    channel = self.channel
              ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 218, in _get_channel
    channel = self._channel = channel()
                              ^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
                             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 234, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 960, in default_channel
    self._ensure_connection(**conn_opts)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 458, in _ensure_connection
    with ctx():
    ^^^^^
  File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

docker-compose.yml:

version: "3.0"

services:
  # WEB
  django:
    build: .
    command: python3.11 manage.py runserver 0.0.0.0:8000
    container_name: django-server
    volumes:
      - media_volume:/website/journal_website/media
      - static_volume:/website/journal_website/static
      - database_volume:/website/journal_website/database
    ports:
      - "8000:8000"
    depends_on:
      - rabbit

  # Celery
  celery:
    build: .
    command: celery -A website worker -l info
    container_name: celery
    depends_on:
      - rabbit
  
  # RabbitMQ
  rabbit:
    hostname: rabbit
    container_name: rabbitmq
    image: rabbitmq:3.12-rc-management
    ports:
      # AMQP protocol port
      - "5672:5672"
      # HTTP management UI
      - "15672:15672"
    restart: always
      

volumes:
  media_volume:
  static_volume:
  database_volume:

celery.py:

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "website.settings")
celery_application = Celery("website")
celery_application.config_from_object("django.conf:settings", namespace="CELERY")
celery_application.conf.broker_url = "amqp://rabbit:5672"
celery_application.autodiscover_tasks()

tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task

# Some imports...

@shared_task
def send_email_message_to_user_with_activation_link(target_user_id: int, code: UUID) -> HttpResponse | None:
    target_user = User.objects.get(pk=target_user_id)
    content = {
        "email": target_user.email,
        "domain": "127.0.0.1:8000",
        "site_name": "Website",
        "user": target_user,
        "protocol": "http",
        "code": code,
    }

    message = render_to_string("user/account_activation/account_activation_email.txt", content)
    try:
        send_mail("Account activation", message, "[email protected]" , [target_user.email], fail_silently=False)
    except BadHeaderError:
        return HttpResponse("Invalid header found.")

2

Answers


  1. Chosen as BEST ANSWER

    Solved, it works on Linux with WSL 2. First of all I need some lines of code in __init__.py file inside project directory:

    from .celery import celery_application as celery_app
    __all__ = ["celery_app"]
    

    Then I used common database volume for both django and celery containers in docker-compose file:

      # Celery worker
      celery_worker:
        command: celery -A website worker -l info
        container_name: celery_worker
        volumes:
          - database_volume:/website/journal_website/database # just add this to celery worker service.
        image: django-image # you need to use common image for django and celery containers that is created from Dockerfile.
        depends_on:
          - rabbitmq
          - django
    

  2. I tried your docker-compose and it works fine for me. The only issue is rabbitmq takes time around 20 seconds to start and accepting connections. You might see connection timeout errors until it starts. But once it’s started, celery worker establishes connection and works fine.

    For developing, it’s better not to stop rabbitmq container and restart celery container as you make changes. But you might always get timeouts with rabbitmq starts.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search