There are about 1M images, I need to read them and insert the bytes into redis with python. I have two choices, the first is to use a thread pool, and the second is to use asyncio, since this is only IO task. However, I find out that the thread pool method is much faster than asyncio method. A piece of example code is like this:
import pickle
import os
import os.path as osp
import re
import redis
import asyncio
from multiprocessing.dummy import Pool
r = redis.StrictRedis(host='localhost', port=6379, db=1)
data_root = './datasets/images'
print('obtain name and paths')
paths_names = []
for root, dis, fls in os.walk(data_root):
for fl in fls:
if re.search('JPEG$', fl) is None: continue
pth = osp.join(root, fl)
name = re.sub('.+/', '', pth)
name = re.sub('/', '-', name)
name = 'redis-' + name
paths_names.append((pth, name))
print('num samples in total: ', len(paths_names))
### this is slower
print('insert into redis')
async def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
async def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
await insert_one(paths_names[i])
n_co = 256
loop = asyncio.get_event_loop()
tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)]
fut = asyncio.gather(*tasks)
loop.run_until_complete(fut)
loop.close()
### this is more than 10x faster
def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
insert_one(paths_names[i])
with Pool(128) as pool:
pool.map(func, paths_names)
Here I have two questions that puzzled me a lot:
-
What is the problem with the asyncio method, which makes is slower than thread method?
-
Is it encouraged to add millions of tasks to the
gather
function? Like this:
num_parallel = 1000000000
tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)]
await asyncio.gather(*tasks)
2
Answers
You’re not actually using async for the I/O operations, you’re still doing synchronous blocking calls for both disk reads and Redis read/writes.
Try changing your async approach to use async Redis methods. In particular, use aioredis (used to be a separate library, now it is part of the main redis lib).
See how much this change affects your speed difference. If you want to make the file I/O async too, try aiofile:
lxop is spot on regarding your first question.
Regarding your second question. It is okay to add 1 million tasks to gather. However it becomes a problem if they all run at the same time and open a connection. This usually leads to more congestion than is useful, as servers / file systems only allow a limited number of connections. Also it can lead to connections timing out or being dropped completely.
One solution for this is to use a semaphore inside the coroutine.
This semaphore will ensure that at most 10 inserts are running at the same time. You can play with this number and see what works best, anything between and 10-1000 could work, depending on your server setup, and connection.