skip to Main Content

When running my code I realized that after around 30 requests it gets very slow to fetch the rest, taking a long time to complete the code.

My goal with this code is to fetch users from a Telegram’s group and dump their information in a JSON file, with ther name, username and bio.

This is the code I am running:

import configparser
import json
import asyncio

from telethon.tl.functions.users import GetFullUserRequest 
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
from telethon.tl.functions.channels import GetParticipantsRequest
from telethon.tl.types import ChannelParticipantsSearch
from telethon.tl.types import (
    PeerChannel
)

# Reading Configs
config = configparser.ConfigParser()
config.read("config.ini")

# Setting configuration values
api_id = config['Telegram']['api_id']
api_hash = config['Telegram']['api_hash']

api_hash = str(api_hash)

phone = config['Telegram']['phone']
username = config['Telegram']['username']

# Create the client and connect
client = TelegramClient(username, api_id, api_hash)

async def main(phone):
    await client.start()
    print("Client Created")

    # Ensure you're authorized
    if await client.is_user_authorized() == False:
        await client.send_code_request(phone)
        try:
            await client.sign_in(phone, input('Enter the code: '))
        except SessionPasswordNeededError:
            await client.sign_in(password=input('Password: '))

    me = await client.get_me()

    user_input_channel = input("enter entity(telegram URL or entity id):")

    if user_input_channel.isdigit():
        entity = PeerChannel(int(user_input_channel))
    else:
        entity = user_input_channel

    my_channel = await client.get_entity(entity)

    offset = 0
    limit = 100
    all_participants = []
    count = 0

    while True:
        participants = await client(GetParticipantsRequest(my_channel, ChannelParticipantsSearch(''), offset, limit,hash=0))
        if not participants.users or offset >= 100:
            break
        all_participants.extend(participants.users)
        count +=1
        print(len(participants.users))
        offset += len(participants.users)
        print("finished")

    all_user_details = []
    for participant in all_participants:
        user_full = await client(GetFullUserRequest(participant.id))
        all_user_details.append({
            "id": user_full.full_user.id,
            "bio": user_full.full_user.about
        })

    with open('user_data.json', 'w') as outfile:
        json.dump(all_user_details, outfile)

with client:
    client.loop.run_until_complete(main(phone))

With some debug I could realize that the problem is the in the for loop, this is the loop it is taking a long time to complete

for participant in all_participants:
        user_full = await client(GetFullUserRequest(participant.id))
        all_user_details.append({
            "id": user_full.full_user.id,
            "bio": user_full.full_user.about
        })

Why is this happening and how do I optmize this code to run more effiently?

My compiler is python3 version 3.11.6

Do you guys need more information?

Update—————-

What I have tried:

As jsbueno suggested, I’ve adjusted my code to send multiple requests instead of one request by user, using the asyncio. It worked well for a group base with 80 users, but above from that, I got the following errors:

  1. Server response had invalid buffer: Invalid response buffer (HTTP code 429)
  2. Server indicated flood error at transport level: Invalid response buffer (HTTP code 429)

which indicates too many concurrent requests to the API, so I have tried with the semaphore method, and ended up with the code like this:

# ... (other imports and code)
api_semaphore = asyncio.Semaphore(10) #Updated line

async def main(phone):
    await client.start()
    print("Client Created")
    # Ensure you're authorized
    if await client.is_user_authorized() == False:
        await client.send_code_request(phone)
        try:
            await client.sign_in(phone, input('Enter the code: '))
        except SessionPasswordNeededError:
            await client.sign_in(password=input('Password: '))

    me = await client.get_me()

    user_input_channel = input("enter entity(telegram URL or entity id):")

    if user_input_channel.isdigit():
        entity = PeerChannel(int(user_input_channel))
    else:
        entity = user_input_channel

    my_channel = await client.get_entity(entity)

   # Fetch all participants
    offset = 0
    limit = 100
    all_participants = []
    count = 0

    while True:
        participants = await client(GetParticipantsRequest(my_channel, ChannelParticipantsSearch(''), offset, limit,hash=0))
        if not participants.users or offset >= 1000:
            break
        all_participants.extend(participants.users)
        count +=1
        print(len(participants.users))
        offset += len(participants.users)
        print("finished")

    # Process the participants as needed
    all_user_details = []
    tasks = []
    for participant in all_participants:
        async with api_semaphore:
            # This will prepare all requests and let them be ready to go
            tasks.append(asyncio.create_task(client(GetFullUserRequest(participant.id)))) #Updated line 
# ... (rest of the code)

With that code, I still have the same problems mentioned earlier, even though they are limited in concurrent requests now. Why is this happening, and what is taking so much time of processing after the loops are finished?

2

Answers


  1. Although you are usign asynchronous libs, you are making all your requests for participants in series – meaning for 30 uers, you make 30 requests, one just after the previous other was finished.

    One can easily do this concurrently with asyncio code – for example, you can rewrite this part of your code:

    ...
    async def main():
        ...
        all_user_details = []
        for participant in all_participants:
            user_full = await client(GetFullUserRequest(participant.id))
            all_user_details.append({
                "id": user_full.full_user.id,
                "bio": user_full.full_user.about
            })
    ...
    

    more or less like this:

    from asyncio import Semaphore
    
    
    async def main():
        ...
        
        tasks = []
        for participant in all_participants:
            # This will prepare all requests and let then ready to go;
            # but without the "await", they won't be triggered one by one.
            tasks.append(asyncio.create_task(client(GetFullUserRequest(participant.id))))
            
        # this will launch all the requests in parallel and gather the results:
        results = await asyncio.gather(*tasks)
        all_user_details = [{"id": user_full.full_user.id, "bio": user_full.full_user.about} for user_full in results]
    

    Now, depending on the number of users and on your network, sending all requests "at once" (not really "at once" but they are sent as fast as possible), may cause some failures. If that is the case, you will have to create some more code and make use of an asyncio.Semaphore or other strategy to limit the number of concurrent requests to the API.

    Login or Signup to reply.
  2. hello i am making new telegram bot which will login user from phone number and otp code send. then display list of all channels where user can select from channel and after selecting channel it will copy messages from it. My code run on render. But every time when user enter OTP, it does get forward and in logger got message – "User already connected". Here is below code please help me correct it. Thanks in advance

    from telegram import Update
    from telethon.sync import TelegramClient
    from telegram.ext import CommandHandler, Filters, MessageHandler, Updater, ConversationHandler, CallbackContext
    import logging
    import asyncio
    import os
    
    # Define states for the conversation
    PHONE, OTP, END = range(3)
    
    # Telethon client setup
    TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN')
    API_ID = os.environ.get('API_ID')
    API_HASH = os.environ.get('API_HASH')
    APP_URL = os.environ.get("APP_URL")
    PORT = int(os.environ.get('PORT'))
    
    session_file = 'session.session'
    if os.path.exists(session_file):
        os.remove(session_file)
        print("Session file deleted.")
    else:
        print("Session file does not exist.")
    
    # Setup logging
    logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    telethon_client = TelegramClient('session', API_ID, API_HASH)
    
    authenticated_users = set()
    
    def help(update: Update, context: CallbackContext) -> None:
        """Sends a help message when the command /help is issued
    
        Arguments:
            update: update from Telegram
            context: CallbackContext object that stores commonly used objects in handler callbacks
        """
    
        help_message = "This bot is used to automatically enter trades onto your MetaTrader account directly from Telegram. To begin, ensure that you are selected the correct channel or group from your account to use for copying trading.nnThis bot supports all trade order types (Market Execution, Limit, and Stop)nnAfter an extended period away from the bot, please be sure to re-enter the start command to restart the connection to your MetaTrader account."
        commands = "List of commands:n/start : to login in your accountn/help : displays list of commands and example tradesn/trade : takes in user inputted trade for parsing and placementn/calculate : calculates trade information for a user inputted trade"
        trade_example = "Example Trades 💴:nn"
        market_execution_example = "Market Execution:nBUY GBPUSDnEntry NOWnSL 1.14336nTP 1.28930nTP 1.29845nn"
        limit_example = "Limit Execution:nBUY LIMIT GBPUSDnEntry 1.14480nSL 1.14336nTP 1.28930nn"
        note = "You are able to enter up to two take profits. If two are entered, both trades will use half of the position size, and one will use TP1 while the other uses TP2.nnNote: Use 'NOW' as the entry to enter a market execution trade."
    
        # sends messages to user
        update.effective_message.reply_text(help_message)
        update.effective_message.reply_text(commands)
        update.effective_message.reply_text(trade_example + market_execution_example + limit_example + note)
    
        return
    def cancel(update: Update, context: CallbackContext) -> int:
        """Cancels and ends the conversation.   
        
        Arguments:
            update: update from Telegram
            context: CallbackContext object that stores commonly used objects in handler callbacks
        """
    
        update.effective_message.reply_text("Command has been canceled.")
    
        # removes trade from user context data
        context.user_data['trade'] = None
    
        return ConversationHandler.END
    
    def error(update: Update, context: CallbackContext) -> None:
        """Logs Errors caused by updates.
    
        Arguments:
            update: update from Telegram
            context: CallbackContext object that stores commonly used objects in handler callbacks
        """
    
        logger.warning('Update "%s" caused error "%s"', update, context.error)
    
        return
    
    def unknown_command(update: Update, context: CallbackContext) -> None:
        """Checks if the user is authorized to use this bot or shares to use /help command for instructions.
    
        Arguments:
            update: update from Telegram
            context: CallbackContext object that stores commonly used objects in handler callbacks
        """
        if update.effective_message.chat.username not in authenticated_users:
            update.effective_message.reply_text("You are not authorized to use this bot! 🙅🏽‍♂️")
            return
    
        update.effective_message.reply_text("Unknown command. Use /trade to place a trade or /calculate to find information for a trade. You can also use the /help command to view instructions for this bot.")
    
        return
    
    async def send_otp(update, context):
        phone_number = context.user_data.get('phone_number')
        
        # Connect the client if not already connected
        if not telethon_client.is_connected():
            await telethon_client.connect()
            
        # Check if the user is already authorized
        if await telethon_client.is_user_authorized():
            update.effective_message.reply_text("You are already logged in.")
            return ConversationHandler.END      
        else:
            login_token = await telethon_client.sign_in(phone_number)
            context.user_data['login_token'] = login_token
            update.effective_message.reply_text("Please enter the OTP.")
            return OTP  # Indicates that OTP was sent
            
    async def otp(update, context):
        otp_code = update.effective_message.text
        phone_number = context.user_data.get('phone_number')
        login_token = context.user_data.get('login_token')
        try:
            user_or_token = await telethon_client.sign_in(login_token, code=otp_code)
            update.effective_message.reply_text("You have been successfully connected!",user_or_token)
            return ConversationHandler.END
        except Exception as e:
            print(e)
            update.effective_message.reply_text("Invalid OTP. Please try again.")
            return OTP
        
    def phone(update, context):
        phone_number = update.effective_message.text
        context.user_data['phone_number'] = phone_number
        asyncio.run(send_otp(update, context))
    
    def welcome(update: Update, context: CallbackContext) -> str:
        """Sends welcome message to user.
    
        Arguments:
            update: update from Telegram
            context: CallbackContext object that stores commonly used objects in handler callbacks
        """
    
        welcome_message = "Welcome to the FX Signal Copier Telegram Bot! 💻💸nnYou can use this bot to enter trades directly from Telegram and get a detailed look at your risk to reward ratio with profit, loss, and calculated lot size.nnIn order to get started please send your phone number in international format (e.g., +1234567890)nnUse the /help command to view instructions and example trades."
        
        # sends messages to user
        update.effective_message.reply_text(welcome_message)
        
        return PHONE
    
    
    
    def main() -> None:
        """Runs the Telegram bot."""
    
        updater = Updater(TELEGRAM_BOT_TOKEN, use_context=True)
    
        # get the dispatcher to register handlers
        dp = updater.dispatcher
    
        # help command handler
        dp.add_handler(CommandHandler("help", help))
        
        conv_handler = ConversationHandler(
            entry_points=[CommandHandler('start', welcome)],
            states={
                PHONE: [MessageHandler(Filters.text & ~Filters.command, phone)],
                # OTP: [MessageHandler(Filters.text & ~Filters.command, otp)],
                # TRADE: [MessageHandler(Filters.text & ~Filters.command, PlaceTrade)],
                # CALCULATE: [MessageHandler(Filters.text & ~Filters.command, CalculateTrade)],
                # DECISION: [CommandHandler("yes", PlaceTrade), CommandHandler("no", cancel)]
            },
            fallbacks=[CommandHandler('cancel', cancel)],
        )
    
        # conversation handler for entering trade or calculating trade information
        dp.add_handler(conv_handler)
    
        # message handler for all messages that are not included in conversation handler
        dp.add_handler(MessageHandler(Filters.text, unknown_command))
    
        # log all errors
        dp.add_error_handler(error)
        
        # listens for incoming updates from Telegram
        updater.start_webhook(listen="0.0.0.0", port=PORT, url_path=TELEGRAM_BOT_TOKEN, webhook_url=APP_URL + TELEGRAM_BOT_TOKEN)
        updater.idle()
    
        return
    
    if __name__ == '__main__':
        main()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search