skip to Main Content

I’ve revamped my last year’s app. In the beginning there were two different python applications – first one for counting statistics and the second one – web server gunicorn+flask with GET requests. (both services in centos)
Statistics makes counting and stores everything in Postgres. And web server is connected to that Postgres database and answers to GET requests.

In rewritten version I’ve made all statistics using pandas framework and now I want to merge those two apps into one.
I use asyncio to get data and count statistics. Everything works perfectly and now I’m up to add web server to respond to GET.

Part of code:

import asyncio
from contextlib import closing
import db_cl, tks_db
from formdf_cl import FormatDF

getinfofromtks = tks_db.TKS() # Class object to connect to third party database
formatdf = FormatDF() # counting class object, that stores some data
dbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my database


async def get_some_data():
    # getting information from third party database every 5 seconds.
    await asyncio.sleep(5)
    ans_inc, ans_out = getinfofromtks.getdf()
    return ans_inc, ans_out # two huge dataframes in pandas


async def process(ans_inc, ans_out):
    # counting data on CPU
    await asyncio.sleep(0)
    formatdf.leftjoin(ans_inc, ans_out)
    # storing statistics in my Database
    dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',
                       formatdf.make_count())
    dbb.commit_query()


async def main():
    while True:
        ans_inc, ans_out = await get_some_data()  # blocking, get data from third party database
        asyncio.ensure_future(process(ans_inc, ans_out))  # computing



if __name__ == "__main__":
    with closing(asyncio.get_event_loop()) as event_loop:
        event_loop.run_until_complete(main())

Now I wish to add http server as threaded application (with flask or aiohttp) that will answer to GET requests using parameters from class object “formatdf”.
What’s the best way to include those features?

2

Answers


  1. Chosen as BEST ANSWER

    I managed to add a http server as a corutine. First I tried to use aiohttp, but eventually I found Quart (same as Flask but it uses Asyncio). Sample code to run http server on Quart:

    import quart
    from quart import request
    import json
    import time
    
    app = quart.Quart(__name__)
    
    def resp(code, data):
        return quart.Response(
            status=code,
            mimetype="application/json",
            response=to_json(data)
        )
    
    def to_json(data):
        return json.dumps(data) + "n"
    
    @app.route('/')
    def root():
        return quart.redirect('/api/status2')
    
    
    @app.errorhandler(400)
    def page_not_found(e):
        return resp(400, {})
    
    
    @app.errorhandler(404)
    def page_not_found(e):
        return resp(400, {})
    
    
    @app.errorhandler(405)
    def page_not_found(e):
        return resp(405, {})
    
    
    @app.route('/api/status2', methods=['GET'])
    def get_status():
        timestamp = request.args.get("timestamp")
        delay = request.args.get("delay")
        if timestamp:
            return resp(200, {"time": time.time()})
        elif delay:
            return resp(200, {"test is:": '1'})
        else:
            return resp(200, {"", "ask me about time"})
    
    
    if __name__ == "__main__":
        app.run(debug=True, host='0.0.0.0', port=5000)
    

    To add this code as corutine I've used await asyncio.gather() and used app.run_task instead of app.run. Changed the code from my question like this:

    async def launcher_main():
        while True:
            ans_inc, ans_out = await get_some_data()
            asyncio.ensure_future(process(ans_inc, ans_out))
    
    
    async def main():
        await asyncio.gather(launcher_main(),
                             restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))
    

    The last question that remains is to make available parameters from "formatdf" class object to my http server. I've implemented that adding line Tests.restapi_quart.app.config["formatdf"] = formatdf to process(...) function. To call it from quart:

    elif button:
        return resp(200, {"ok": app.config["formatdf"].attr})
    

  2. I just had to figure this out for my app. Here is how you run an aiohttp server in side an existing asyncio app.

    https://docs.aiohttp.org/en/stable/web_lowlevel.html

    import asyncio
    from aiohttp import web
    
    async def web_server():
        print(f'Configuring Web Server')
        app = web.Application()
        app.add_routes([web.get('/hello', web_hello)])
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner)    
        print(f'Starting Web Server')
        await site.start()
        print(f'Web Server Started')
    
        #run forever and ever
        await asyncio.sleep(100*3600)
    
    async def web_hello(request):
        return web.Response(text="Hello World")
    
    async def main():
        tasks = []
    
        #Web Server
        tasks.append(asyncio.create_task(web_server()))
    
        results = await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search