skip to Main Content

I am attempting to run celery on it’s own container from my Flask app. Right now I am just setting up a simple email app. The container CMD is

"["celery", "worker", "–loglevel=info"]"

The message gets sent to the redis broker and celery picks it up, but celery gives me the error.

"Received unregistered task of type
‘flask_project.views.send_async_email’. The message has been ignored
and discarded."

I am setting the include in the celery config on my flask app. I have restarted and rebuilt my containers and still the same issue.

from flask import Blueprint, current_app
from flask_mail import Mail
from os import getenv
from celery import Celery

from .support_func import decorator_require_api

views = Blueprint('views', __name__)

celery = Celery(views.name,
                broker='redis://redis:6379/0',
                include=["views.tasks"])

@celery.task
def send_async_email(email_data):
    mail = Mail()
    mail.send(email_data)

@views.route('/')
def home():
    with current_app.app_context():

        email_data = {'sender': getenv('MAIL_USERNAME'), 'recipients': ['[email protected]'],
                      'message': "This is a test email"}

        send_async_email.delay(email_data)
    return "Message sent!"

Compose:

---
version: "3.9"
services:
  flask:
    build:
      context: ./Docker/flask
    container_name: flask
    volumes:
      - ./app/:/app
    restart: unless-stopped
    stdin_open: true
    #entrypoint: /bin/bash
    networks:
      - api
      
  nginx:
    image: nginx:latest
    container_name: nginx
    depends_on:
      - flask
    #entrypoint: /bin/bash
    volumes:
      - ./nginx_config:/etc/nginx/conf.d
      - ./app/:/app
    ports:
      - "5000:443"
    networks:
      - api

  celery:
    build:
      context: ./Docker/celery
    container_name: celery
    depends_on:
      - redis
    restart: unless-stopped
    stdin_open: true
    networks:
      - api

  redis:
    image: redis:latest
    container_name: redis
    depends_on:
      - flask
    #entrypoint: /bin/bash
    networks:
      - api

networks:
  api:
    driver: bridge

  -----------------
DockerFile:

FROM python:3.9.7-slim-buster

WORKDIR /app

RUN apt-get update && apt-get install -y 
    build-essential # python-dev libssl-dev openssl

COPY ./ .

RUN pip3 install -r requirements.txt

ENV CELERY_BROKER_URL=redis://redis:6379/0

CMD ["celery", "worker", "--loglevel=info"]

2

Answers


  1. Chosen as BEST ANSWER

    I finally figured it out. I used https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern as a reference. Now I can register new blueprints without touching the celery config. It is a work in progress, but now the containers are all up and running.

    .
    ├── Docker
    │   ├── celery
    │   │   ├── Dockerfile
    │   │   └── requirements.txt
    │   └── flask
    │       ├── Dockerfile
    │       └── requirements.txt
    ├── app
    │   ├── flask_project
    │   │   ├── __init__.py
    │   │   ├── celery_app.py
    │   │   └── views.py
    ├── docker-compose.yml
    
    
    
    Compose:
    --------------------------------------------------------------------------------
    ---
    version: "3.9"
    services:
      flask:
        build:
          context: ./Docker/flask
        container_name: flask
        volumes:
          - ./app/:/app
        restart: unless-stopped
        stdin_open: true
        networks:
          - api
    
      nginx:
        image: nginx:latest
        container_name: nginx
        depends_on:
          - flask
        #entrypoint: /bin/bash
        volumes:
          - ./nginx_config:/etc/nginx/conf.d
          - ./app/:/app
        ports:
          - "5000:443"
        networks:
          - api
    
      celery:
        build:
          context: ./Docker/celery
        container_name: celery
        depends_on:
          - redis
        volumes:
          - ./app/:/app
        restart: unless-stopped
        stdin_open: true
        networks:
          - api
    
      redis:
        image: redis:latest
        container_name: redis
        depends_on:
          - flask
        #entrypoint: /bin/bash
        networks:
          - api
    
    networks:
      api:
        driver: bridge
    
    celery_app.py:
    --------------------------------------------------------------------------------
    from . import celery, create_app
    
    app = create_app()
    app.app_context().push()
    
    
    __init__.py:
    --------------------------------------------------------------------------------
    from celery import Celery
    
    celery = Celery(__name__, broker=getenv('CELERY_BROKER_URL'))
    
    
    def create_app():
        app = Flask(__name__)
    
        # Celery stuff
        celery.conf.update(app.config)
    
    
        # Register Blueprints
        from .views import views
    
        app.register_blueprint(views, url_prefix='/')
    
        return app
    
    views.py:
    --------------------------------------------------------------------------------
    
    from flask import Blueprint, current_app
    from flask_mail import Message, Mail
    from os import getenv
    from . import celery
    
    views = Blueprint('views', __name__)
    
    
    @celery.task
    def send_async_email(email_data):
        msg = Message(email_data['subject'],
                      sender=email_data['sender'],
                      recipients=email_data['recipients'],
                      )
        msg.body = email_data['message']
        mail = Mail()
        mail.send(msg)
    
    @views.route('/')
    def home():
    
        with current_app.app_context():
            email_data = {'sender': getenv('MAIL_USERNAME'),
                          'recipients': ['[email protected]'],
                          'subject': 'testing123',
                          'message': "testing123"
                          }
    
            msg = Message(email_data['subject'],
                          sender=email_data['sender'],
                          recipients=email_data['recipients'],
                          )
            msg.body = email_data['message']
    
            send_async_email.delay(email_data)
        return "Message sent!"
    

  2. You need to pass the celery app to the worker with --app or -A flag (see my answer/example here).

    I would recommend to refactor a bit and extract this snippet:

    celery = Celery(views.name,
                    broker='redis://redis:6379/0',
                    include=["views.tasks"])
    

    to external file, such as celery_app.py and then import it for your flask app and use it for the worker:

    ["celery", "--app", "your_module.celery_app:celery", "worker", "--loglevel=info"]
    

    You should see the registered tasks within the worker’s startup logs (when you see the big C (Celery) logo..

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search