skip to Main Content

There are about 1M images, I need to read them and insert the bytes into redis with python. I have two choices, the first is to use a thread pool, and the second is to use asyncio, since this is only IO task. However, I find out that the thread pool method is much faster than asyncio method. A piece of example code is like this:

import pickle
import os
import os.path as osp
import re
import redis
import asyncio

from multiprocessing.dummy import Pool


r = redis.StrictRedis(host='localhost', port=6379, db=1)



data_root = './datasets/images'

print('obtain name and paths')
paths_names = []
for root, dis, fls in os.walk(data_root):
    for fl in fls:
        if re.search('JPEG$', fl) is None: continue
        pth = osp.join(root, fl)
        name = re.sub('.+/', '', pth)
        name = re.sub('/', '-', name)
        name = 'redis-' + name
        paths_names.append((pth, name))
print('num samples in total: ', len(paths_names))


### this is slower
print('insert into redis')
async def insert_one(path_name):
    pth, name = path_name
    if r.get(name): return
    with open(pth, 'rb') as fr:
        binary = fr.read()
    r.set(name, binary)

async def func(cid, n_co):
    num = len(paths_names)
    for i in range(cid, num, n_co):
        await insert_one(paths_names[i])

n_co = 256
loop = asyncio.get_event_loop()
tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)]
fut = asyncio.gather(*tasks)
loop.run_until_complete(fut)
loop.close()



### this is more than 10x faster
def insert_one(path_name):
    pth, name = path_name
    if r.get(name): return
    with open(pth, 'rb') as fr:
        binary = fr.read()
    r.set(name, binary)

def func(cid, n_co):
    num = len(paths_names)
    for i in range(cid, num, n_co):
        insert_one(paths_names[i])

with Pool(128) as pool:
    pool.map(func, paths_names)

Here I have two questions that puzzled me a lot:

  1. What is the problem with the asyncio method, which makes is slower than thread method?

  2. Is it encouraged to add millions of tasks to the gather function? Like this:

    num_parallel = 1000000000
    tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)]
    await asyncio.gather(*tasks)

2

Answers


  1. You’re not actually using async for the I/O operations, you’re still doing synchronous blocking calls for both disk reads and Redis read/writes.

    Try changing your async approach to use async Redis methods. In particular, use aioredis (used to be a separate library, now it is part of the main redis lib).

    import aioredis  # if you're using redis library version < 4.2.0rc1
    # or
    from redis import asyncio as aioredis  # if you're using redis >= 4.2.0rc1
    
    aior = aioredis.from_url(
        "redis://localhost", encoding="utf-8", decode_responses=True
    )
    
    # ...
      if await aior.get(name): return
    # ...
      await aoir.set(name, binary)
    

    See how much this change affects your speed difference. If you want to make the file I/O async too, try aiofile:

    from aiofile import async_open
    
    # ...
      async with async_open(pth, 'rb') as fr:
        binary = await fr.read()
    # ...
    
    Login or Signup to reply.
  2. lxop is spot on regarding your first question.

    Regarding your second question. It is okay to add 1 million tasks to gather. However it becomes a problem if they all run at the same time and open a connection. This usually leads to more congestion than is useful, as servers / file systems only allow a limited number of connections. Also it can lead to connections timing out or being dropped completely.

    One solution for this is to use a semaphore inside the coroutine.

    async def insert_one(.., semaphore):
        with semaphore:
           .. # logic from lxop
    
    semaphore= asyncio.Semaphore(10)
    tasks = [loop.create_task(insert_one(.., semaphore)) for .. in ..]
    

    This semaphore will ensure that at most 10 inserts are running at the same time. You can play with this number and see what works best, anything between and 10-1000 could work, depending on your server setup, and connection.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search