skip to Main Content

I wrote a bot using Python Telegram Bot. Then, i wanted to write some automatic tests for it. And that’s the problem: i needed a way to send messages, wait for my bot to reply, and then react to those messages (either sending more text or clicking on buttons).

Well, i didn’t found a good answer for it online.

2

Answers


  1. Chosen as BEST ANSWER

    So i had to crack my head and think on something on my own. Maybe it is not the better, or the most performatic, but solved ALL my problems, and in a clean way.

    So my idea was to use a Producer-Consumer schema where i set somehwere to store all my updates and the tests were the consumer. Further, i decided to use an Asyncio Queue. It has methods to hang up and wait for items to be stored on queue. Exactly what i needed.

    Now, some code. First of all, i created a CustomQueue class, extending from Asyncio.Queue where i could implement some functions like skip_n_messages or discard_n_messages, to keep my code DRY.

    CustomQueue:

    class CustomQueue(Queue):
        """
            I made this custom class so i could handle different situations 'elegantly'(the functions below)
        """
        async def skip_n_messages(self, n: int):
            """
                Skips N messages on queue and returns message N + 1
            """
            await self.discard_n_messages(n)
            return await self.get()
    
        async def discard_n_messages(self, n: int):
            """
                Discards N messages from Queue
            """
            for _ in range(n):
                await self.get()
    
    

    Now, i need to set somewhere that i want all the incoming messages to be put on CustomQueue. Because i writing tests, i wrote it on a fixture:

    @pytest.fixture(scope="session")
    def message_queue(pyro_client: Client) -> CustomQueue:
        message_queue = CustomQueue()
    
        @pyro_client.on_message(filters.text & filters.private)
        @pyro_client.on_edited_message(filters.text & filters.private)
        async def queuer(client: Client = None, message: Message = None):
            debug(f"Message received: {message.text}")
            debug(f"Queue size before: {message_queue.qsize()}")
            await message_queue.put(message)
            debug(f"Message queued. New queue size: {message_queue.qsize()}")
    
        return message_queue
    

    Now, some use cases:

    async def test_command_echo(pyrogram_client: Client, message_queue: CustomQueue):
        await pyro_client.send_message(TEST_TARGET_USERNAME, f"/echo Is someone there?")
        # The function will hold in this line until the message is received
        incoming_message: Message = await message_queue.get_message()
        assert incoming_message.text == "Is someone there?"
    

    Now, to the downside: I really really don't know if this schema supports concurrent updates. An workaround would be setting a message_queue for each command, by setting up some update filters, but i never tested this (since my bot is written with another library)


  2. If I understand your question correctly, you want to test a bot from the user-side?

    There it tgintegration out there, which seems to be doing exactly what you want:
    https://github.com/JosXa/tgintegration

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search