skip to Main Content

I have a Flask Application with an MVC structure:

my_app
├── server.py
├── requirements.txt
├── models
│   ├── __init__.py
    └── model.py
├── controllers
    ├── __init__.py
    ├── client_controllers
        └──controller.py
    └── another_controller.py
└── templates

I use blueprints to split the server code in “controllers” so I have something like this:

server.py:

from flask import Flask
from celery import Celery

from controllers.client_controllers.controller import controller

app = Flask(__name__)
app.secret_key = 'SECRET'

app.register_blueprint(controller)

# Celery Configuration
def make_celery(app):

    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


if __name__ == "__main__":
    app.run(host='0.0.0.0', debug=True)

controller.py:

from flask import Blueprint, render_template, json, request, redirect, url_for, abort, session



controller = Blueprint('controller', __name__,
                                     template_folder='templates/')

@celery.task()
def add_together(a, b):
    return a + b

@controller.route('/add', methods=['GET'])
def add():
    result = add_together.delay(23, 42)
    result.wait()
    return 'Processing'

As you may notice, celery is not imported into the controller, because I don’t know how to import the celery instance from server.py into my controller.py without getting an error, I’ve been trying with:

from ...server import celery
from ..server import celery
...etc

but still failing with errors.

2

Answers


  1. One option is to assign celery instance to the app instance and then access it through flask’s current_app.

    In you server.py, just add:

    celery = make_celery(app)
    app.celery = celery
    

    Then you can access this in your controller.py:

    from flask import current_app
    
    @current_app.celery.task()
    def add_together(a, b):
        return a + b
    
    Login or Signup to reply.
  2. The flask Error RuntimeError: Working outside of application context. happens because you are not in a Flask application_context().

    You should use celery shared_task which is what you need given your MVC structure.

    celery_flask/
    ├── celery_tasks
    │   ├── app_tasks.py
    │   ├── __init__.py
    ├── celery_worker.py
    ├── controllers
    │   ├── __init__.py
    │   ├── some_controller.py
    ├── __init__.py
    └── server.py
    
    

    Script app_tasks.py

    #=====================
    #   app_tasks.py
    #=====================
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    @shared_task(name='celery_tasks.add_together')
    def add_together(x, y):
        return x + y
    

    The @shared_task decorator returns a proxy that always points to the active Celery instances:

    >>> from celery import Celery, shared_task
    >>> @shared_task
    ... def add_together(x, y):
    ...     return x + y
    ...
    >>> app1 = Celery(broker='amqp://')
    >>> add_together.app is app1
    True
    >>> app2 = Celery(broker='redis://')
    >>> add_together.app is app2
    True
    

    After you define you task you can call them using a reference to a Celery app. This celery app could be part of the flask application_context(). Example:

    Script server.py

    from __future__ import absolute_import
    from flask import Flask
    from celery import Celery
    
    from controllers.some_controller import controller
    
    flask_app = Flask(__name__)
    flask_app.secret_key = 'SECRET'
    
    flask_app.register_blueprint(controller)
    
    # Celery Configuration
    def make_celery( app ):
        celery = Celery('flask-celery-app', backend=app.config['CELERY_RESULT_BACKEND'],
                        broker=app.config['CELERY_BROKER_URL'],
                        include=['celery_tasks.app_tasks'])
        TaskBase = celery.Task
        class ContextTask(TaskBase):
            abstract = True
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
        celery.Task = ContextTask
        return celery
    
    def list_celery_task( ):
        from celery.task.control import inspect
        i = inspect()
        i.registered_tasks()
        from itertools import chain
        t = set(chain.from_iterable( i.registered_tasks().values() ))
        print "registered_tasks={}".format( t )
    #======================================
    #                MAIN
    #======================================
    flask_app.config.update(
        CELERY_BROKER_URL='redis://localhost:6379',
        CELERY_RESULT_BACKEND='redis://localhost:6379'
    )
    celery = make_celery(flask_app)
    flask_app.celery = celery
    list_celery_task( )
    
    
    if __name__ == "__main__":
        flask_app.run(host='0.0.0.0', debug=True)
    

    Script some_controller.py

    #============================
    #   some_controller.py
    #============================
    from __future__ import absolute_import
    from flask import Blueprint
    from flask import current_app
    
    controller = Blueprint('controller', __name__,
                                         template_folder='templates/')
    
    @controller.route('/add', methods=['GET'])
    def add():
        print "calling add"
        result = current_app.celery.send_task('celery_tasks.add_together',args=[12,6])
        r = result.get()
        print 'Processing is {}'.format( r )
        return 'Processing is {}'.format( r )
    

    Finally, start the worker to consume the tasks:

    celery -A celery_worker worker --loglevel=DEBUG
    

    Script celery_worker.py

    #============================
    #   celery_worker.py
    #============================
    from __future__ import absolute_import
    from celery import Celery
    
    # Celery Configuration
    def make_celery():
        celery = Celery('flask-celery-app', backend='redis://localhost:6379',
                        broker='redis://localhost:6379',
                        include=['celery_tasks.app_tasks'])
        return celery
    
    celery = make_celery()
    print "tasks={}".format( celery.tasks.keys() )
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search