skip to Main Content

I’m using PostgreSQL & asyncpg.

class PgDb:
    # noinspection SpellCheckingInspection
    def __init__(self, conn: asyncpg.connection.Connection):
        self.conn = conn
    async def select(self, sql: str, args: Union[list, Dict[str, Any]] = []) -> List[Dict[str, Any]]:
        sql, _args = self.__convert_placeholders(sql, args)
        return [dict(row) for row in await self.conn.fetch(sql, *_args)]

class DbPoolSingleton:
    db_pool: Optional[asyncpg.pool.Pool] = None

    @staticmethod
    async def create_pool():
        config = get_postgres_config()
        pool: asyncpg.Pool = await asyncpg.create_pool(
            ...,
            min_size=30,
            max_size=40
        )
        print("Pool created")
        return pool

    @staticmethod
    async def get_pool() -> asyncpg.pool.Pool:
        if not DbPoolSingleton.db_pool:
            DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
        return DbPoolSingleton.db_pool

    @staticmethod
    async def terminate_pool():
        (await DbPoolSingleton.get_pool()).terminate()
        DbPoolSingleton.db_pool = None
        print("Pool terminated")
import asyncio
from helpers.pg_rdb_helper import DbPoolSingleton, PgDb
async def test_synchronous():
    conn = await (await DbPoolSingleton.get_pool()).acquire()
    db = PgDb(conn)
    sql = """samplesql"""
    total_start = start = datetime.datetime.now()
    
    for i in range(20):
        start = datetime.datetime.now()
        rows = await db.select(sql)
        end  = datetime.datetime.now()
        print(f"{i}st query took: ", (end-start).total_seconds())
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

=> total query took: 2.131297

async def test_asynchronous():
    db_pool = await DbPoolSingleton.get_pool()
    sql = """samplesql"""
    total_start = datetime.datetime.now()
    tasks = []
    for i in range(20):
        db = PgDb(await db_pool.acquire())
        task = asyncio.create_task(db.select(sql))
        tasks.append(task)
    await asyncio.gather(*tasks)
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

===> total query took: 2.721282


Here, I have a function which is simple multiple queries call, the first version is synchronous version which await every single query without using asyncio, the second one is using asyncio.gather to run these query in background (at least this is my assumption).

Then turn out, as you saw the result asynchronous version was completely slower than synchronous version. Basically I know in asynchronous version we have some overhead for getting connection from pool for every single query which caused it a bit slower.

So how could we fix asynchronous version to take advandtage of asyncpg and asyncio.


After I investigate, I have some fix for this asynchronous version but bot of them got some error.

Asynchronous fix 1

async def test_asynchronous():
    db_pool = await DbPoolSingleton.get_pool()
    sql = """samplesql"""
    total_start = datetime.datetime.now()
    tasks = []
    async with db_pool.acquire() as conn:
        db = PgDb(conn)
        for i in range(20):
            task = asyncio.create_task(db.select(sql))
            tasks.append(task)
        await asyncio.gather(*tasks)
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

I got this error ===>

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

Basically, this fix make multiple coroutine using the same connection to db, so that I got this error..

Now, I gave up with this problem, please help me to resolve it??

My question: So how could we fix asynchronous version to take advandtage of asyncpg and asyncio.

2

Answers


  1. As you said, the error is because there are multiple operations being performed simultaneously. You can fix with this error by adding "await db.select(sql)" in your code.

    What it does is that it waits for previous query to finish before starting with the new one. The problem with this is that the overall execution time will increase.

    Code:

     async with db_pool.acquire() as conn:
         for i in range(20):
             db = PgDb(conn)
             await db.select(sql)
    
    Login or Signup to reply.
  2. You are attempting to use the same connection from the pool in all of the tasks you create in your async function.

    Try this working example where the tasks each acquire its own connection from the pool:

    from typing import Optional
    import asyncio, datetime
    import asyncpg
    
    TRIALS = 200
    SQL = "select count(*) from users"
    
    class DbPoolSingleton:
        db_pool: Optional[asyncpg.pool.Pool] = None
    
        @staticmethod
        async def create_pool():
            pool: asyncpg.Pool = await asyncpg.create_pool(
                min_size=1,
                max_size=10
            )
            print("Pool created")
            return pool
    
        @staticmethod
        async def get_pool() -> asyncpg.pool.Pool:
            if not DbPoolSingleton.db_pool:
                DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
            return DbPoolSingleton.db_pool
    
        @staticmethod
        async def terminate_pool():
            (await DbPoolSingleton.get_pool()).terminate()
            DbPoolSingleton.db_pool = None
            print("Pool terminated")
    
    
    async def test_synchronous() -> None:
        conn = await(await DbPoolSingleton.get_pool()).acquire()
        total_start = start = datetime.datetime.now()
        runtime_total = 0.0
        for i in range(TRIALS):
            start = datetime.datetime.now()
            result = await conn.fetchval(SQL)
            end  = datetime.datetime.now()
            qruntime = (end - start).total_seconds()
            runtime_total += qruntime
            #print(f"{i}st query took:  {qruntime} to get {result}")
        total_end = datetime.datetime.now()
        print(f"sync  total query took:  {(total_end-total_start).total_seconds()} wallclock seconds and {runtime_total} run seconds")
    
    async def run_query(pool, sql) -> tuple:
        async with pool.acquire() as conn:
            qstart = datetime.datetime.now()
            result = await conn.fetchval(sql)
            qend = datetime.datetime.now()
            return (result, (qend - qstart).total_seconds())
    
    
    async def test_asynchronous() -> None:
        pool = await DbPoolSingleton.get_pool()
        total_start = datetime.datetime.now()
        #runtime_total = 0.0
        tasks = []
        for i in range(TRIALS):
            tasks.append(run_query(pool, SQL))
        results = await asyncio.gather(*tasks)
        runtime_total = sum([t[1] for t in results])
        total_end = datetime.datetime.now()
        print(f"async total query took:  {(total_end-total_start).total_seconds()} wallclock seconds and {runtime_total} run seconds")
        # print(f"total query took:  {(total_end-total_start).total_seconds()} wallclock seconds")
    
    
    if __name__ == "__main__":
        loop = asyncio.new_event_loop()
        loop.run_until_complete(test_synchronous())
        loop.run_until_complete(test_asynchronous())    
        loop.run_until_complete(test_synchronous())
        loop.run_until_complete(test_asynchronous())
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search