I’m using PostgreSQL & asyncpg.
class PgDb:
# noinspection SpellCheckingInspection
def __init__(self, conn: asyncpg.connection.Connection):
self.conn = conn
async def select(self, sql: str, args: Union[list, Dict[str, Any]] = []) -> List[Dict[str, Any]]:
sql, _args = self.__convert_placeholders(sql, args)
return [dict(row) for row in await self.conn.fetch(sql, *_args)]
class DbPoolSingleton:
db_pool: Optional[asyncpg.pool.Pool] = None
@staticmethod
async def create_pool():
config = get_postgres_config()
pool: asyncpg.Pool = await asyncpg.create_pool(
...,
min_size=30,
max_size=40
)
print("Pool created")
return pool
@staticmethod
async def get_pool() -> asyncpg.pool.Pool:
if not DbPoolSingleton.db_pool:
DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
return DbPoolSingleton.db_pool
@staticmethod
async def terminate_pool():
(await DbPoolSingleton.get_pool()).terminate()
DbPoolSingleton.db_pool = None
print("Pool terminated")
import asyncio
from helpers.pg_rdb_helper import DbPoolSingleton, PgDb
async def test_synchronous():
conn = await (await DbPoolSingleton.get_pool()).acquire()
db = PgDb(conn)
sql = """samplesql"""
total_start = start = datetime.datetime.now()
for i in range(20):
start = datetime.datetime.now()
rows = await db.select(sql)
end = datetime.datetime.now()
print(f"{i}st query took: ", (end-start).total_seconds())
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
=> total query took: 2.131297
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
for i in range(20):
db = PgDb(await db_pool.acquire())
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
===> total query took: 2.721282
Here, I have a function which is simple multiple queries call, the first version is synchronous version which await every single query without using asyncio
, the second one is using asyncio.gather
to run these query in background (at least this is my assumption).
Then turn out, as you saw the result asynchronous version
was completely slower than synchronous version
. Basically I know in asynchronous version
we have some overhead for getting connection from pool for every single query which caused it a bit slower.
So how could we fix asynchronous version
to take advandtage of asyncpg
and asyncio
.
After I investigate, I have some fix for this asynchronous version
but bot of them got some error.
Asynchronous fix 1
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
async with db_pool.acquire() as conn:
db = PgDb(conn)
for i in range(20):
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
I got this error ===>
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
Basically, this fix make multiple coroutine using the same connection to db, so that I got this error..
Now, I gave up with this problem, please help me to resolve it??
My question: So how could we fix asynchronous version
to take advandtage of asyncpg
and asyncio
.
2
Answers
As you said, the error is because there are multiple operations being performed simultaneously. You can fix with this error by adding "await db.select(sql)" in your code.
What it does is that it waits for previous query to finish before starting with the new one. The problem with this is that the overall execution time will increase.
Code:
You are attempting to use the same connection from the pool in all of the tasks you create in your async function.
Try this working example where the tasks each acquire its own connection from the pool: