skip to Main Content

I am using the following script to do queries on a website.

It works on macos, however it does not work with ubuntu. I have tried requests and it works, I also tried a simple asyncio + requests with the website and it works.

I need help converting it to using requests instead of aiohttp. Any help would be appreciated.

This is my original code.

import aiohttp
import asyncio

async def get_data(session, x):
    while True:
        try:
            async with session.get(url=f'https://api.abc.com/{x}') as response:
                if response.status == 200:
                    data = await response.json()
                    try:
                        data = float(data)
                        return data
                    except ValueError:
                        print("Data is not a valid float. Retrying...")
                else:
                    print("Received non-200 status code. Retrying...")
            await asyncio.sleep(1)  # Wait for 1 second before retrying
        except Exception as e:
            print("Unable to get url {} due to {}. Retrying...".format(x, e.__class__))
            await asyncio.sleep(1)  # Wait for 1 second before retrying

async def main(datas):
    async with aiohttp.ClientSession() as session:
        ret = await asyncio.gather(*[get_data(session, data) for data in datas])
        return {datas[i]: ret[i] for i in range(len(datas))}  # Return the results as a dictionary

datas = ['x1', 'x2', 'x3', 'x4']
results = asyncio.run(main(datas))

Here is my code


import asyncio
import requests

async def get_data(x):
    while True:
        try:
            response = requests.get(url=f'https://api.abc.com/{x}')
            if response.status_code == 200:
                try:
                    data = float(response.json())
                    return data
                except ValueError:
                    print("Data is not a valid float. Retrying...")
            else:
                print("Received non-200 status code. Retrying...")
            await asyncio.sleep(1)  # Wait for 1 second before retrying
        except Exception as e:
            print("Unable to get url {} due to {}. Retrying...".format(x, e.__class__))
            await asyncio.sleep(1)  # Wait for 1 second before retrying

async def main(datas):
    tasks = [get_data(data) for data in datas]
    ret = await asyncio.gather(*tasks)
    return {datas[i]: ret[i] for i in range(len(datas))}  # Return the results as a dictionary

datas = ['x1', 'x2', 'x3', 'x4']
results = asyncio.run(main(datas))
print(results)

and the error I am getting so far,

Unable to get data for x1 due to <class 'TypeError'>. Retrying...
Unable to get data for x2 due to <class 'TypeError'>. Retrying...
Unable to get data for x3 due to <class 'TypeError'>. Retrying...
Unable to get data for x4 due to <class 'TypeError'>. Retrying...
Unable to get data for x1 due to <class 'TypeError'>. Retrying...
...

2

Answers


  1. By catching the ValueError and printing a message, the function should not raise any more errors related to parsing JSON data

    async def get_data(x):
        while True:
            try:
                response = requests.get(url=f'https://api.abc.com/{x}')
                if response.status_code == 200:
                    try:
                        data = float(response.json())
                        return data
                    except ValueError:
                        print("Data is not a valid float. Retrying...")
                else:
                    print(f"Received non-200 status code ({response.status_code}). Retrying...")
                await asyncio.sleep(1)  # Wait for 1 second before retrying
            except Exception as e:
                print(f"Unable to get data for {x} due to {type(e).__name__}. Retrying...")
                await asyncio.sleep(1)  # Wait for 1 second before retrying
    
    Login or Signup to reply.
  2. Most likely Ubuntu doesn’t work because of certificate problems.

    certifi package can be used for providing fresh client certificates (please don’t forget to keep the latest version of certifi in your setup).

    Tuning aiohttp to use certifi instead of system storage is relatively simple:

    import aiohttp
    import asyncio
    import certifi
    import ssl
    
    async def get_data(session, x):
        while True:
            try:
                ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
                ssl_context.load_verify_locations(capath=certifi.where())
                async with session.get(
                    url=f'https://api.abc.com/{x}',
                    connector=aiohttp.TCPConnector(ssl=ssl_context),
                ) as response:
                    if response.status == 200:
                        data = await response.json()
                        try:
                            data = float(data)
                            return data
                        except ValueError:
                            print("Data is not a valid float. Retrying...")
                    else:
                        print("Received non-200 status code. Retrying...")
                await asyncio.sleep(1)  # Wait for 1 second before retrying
            except Exception as e:
                print("Unable to get url {} due to {}. Retrying...".format(x, e.__class__))
                await asyncio.sleep(1)  # Wait for 1 second before retrying
    
    async def main(datas):
        async with aiohttp.ClientSession() as session:
            ret = await asyncio.gather(*[get_data(session, data) for data in datas])
            return {datas[i]: ret[i] for i in range(len(datas))}  # Return the results as a dictionary
    
    datas = ['x1', 'x2', 'x3', 'x4']
    results = asyncio.run(main(datas))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search