skip to Main Content

I am developing a telegram bot using python-telegram-bot. It is like a stock screener, it analyzes the market every given interval and sends the results to users that are subscribed to the indicator. The problem is I do not want the bot to be blocked (users continuously interact with it) when it is analyzing the market (it fetches data & has a lot of computations). So I thought I needed to do that in a different thread, but I can’t make it work with the asyncio

Here is the example:

async def run_screener(bot):
    while True:
    
        async def heavy_computations():
            for i in range(5):
                await asyncio.sleep(2)
                print("Doing computations")

        compute = threading.Thread(target=lambda: asyncio.run(heavy_computations()))
        compute.start()
        compute.join()  #   <--- This is blocking the bot
        
        # Computations are done, now send the results with the bot
        async with bot: #   <--- For some reason this line blocks the bot forever, even without .join()
            for user_id in users:
                await bot.send_message(text=results, chat_id=user_id)

        await asyncio.sleep(compute_next_time())
        

async def main():
    application = buildBot()
    
    async with application:
        await application.start()
        await application.updater.start_polling()
        
        await run_screener(application.bot)
        
        await application.updater.stop()
        await application.stop()

asyncio.run(main())

2

Answers


  1. t = threading.Thread(target=foo, args=(...))
    t.start()
    t.join()  #   <--- This is ALWAYS a mistake.
    

    There is never any reason reason to start a thread and then immediately join it. You are complaining that compute.join() "blocks" the caller, but blocking the caller is the only thing that join() is ever supposed to do.

    compute.join() does nothing to the compute thread. It does nothing at all to any thread or, to anything else. compute.join() does nothing until the compute thread has finished, and then it returns. That’s it. That’s all it does.

    I don’t know anything about telegram. I don’t know what you actually are trying to do, but it sounds like you’ll need to re-structure your code so that run_screener does not wait until "Computations are done." Instead, you’ll either need to make the new thread itself "send the results with the bot," or else you’ll need to make the new thread trigger some event that will cause some other thread to "send the results with the bot."


    P.S., The reason why threads exist is, so you can do this:

    t = threading.Thread(target=foo, args=(...))
    t.start()
    doSomethingElse_concurrently_withFoo()
    

    And, the reason why join exists is, that foo might produce a result that you want to use later:

    t = threading.Thread(target=foo, args=(...))
    t.start()
    doSomethingElse_concurrently_withFoo()
    t.join()
    doSomethingWithResultFromFoo()
    
    Login or Signup to reply.
  2. To run a non-asyncio task and await its completion without blocking other asyncio tasks tou use method asyncio.loop.run_in_executor. You can either execute your non-asyncio worker function in a multiprocessing pool or a multithreading pool. Since you are using multithreading we can either run the task in a default multithreading pool provided by the asyncio loop or create a custom pool, as we are doing here (in this case we only need a pool size of 1):

    import asyncio
    import concurrent.futures
    import time
    
    async def task1():
        # Emulate doing some processing:
        await asyncio.sleep(2)
        return 1
    
    async def task2():
        # Use a ProcessPoolExecutor if the processing is CPU-intensive:
        with concurrent.futures.ThreadPoolExecutor(1) as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, io_task)
        return result
    
    def io_task():
        # Emulate doing some processing:
        time.sleep(2)
        return 2
    
    async def main():
        results = await asyncio.gather(task1(), task2())
        print(results)
    
    asyncio.run(main())
    

    Prints:

    [1, 2]
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search