skip to Main Content

I created a small class to perform basic operations with redis, using aioredis.

class RedisService:
    def __init__(self, r_url) -> str:
        self.redis = r_url

    async def create_connection(self):
        return await aioredis.create_redis(self.redis)

    async def _get(self, key) -> str:
        try:
            return await self.create_connection().get(key, encoding='utf-8')
        finally:
            await self._close()

    async def _set(self, key, value) -> None:
        await self.create_connection().set(key, value)
        await self._close()

    async def _close(self) -> None:
        self.create_connection().close()
        await self._redis.wait_closed() 

And a test handler to call the write/read operation for redis

@router.post('/perform')
async def index():
    key = 'test'
    value = 'test'
    value = await RedisService(r_url)._set(key, value)
    return {'result': value}

But get error

    await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'

I guess the problem could be that the asynchronous code has to be run through an event loop

asyncio.run(some coroutine)

But I can’t understand how I can build this logic into my code

2

Answers


  1. event_loop provided by uvicorn when a fastapi app is launched can take care of calling redis get/set value in an asynchronous fashion.
    refer -> https://fastapi.tiangolo.com/tutorial/first-steps/

    below code snippet available at "https://github.com/tiangolo/fastapi/issues/1694" is altered to fit the ask from question.

    moving the create connection result inside the object state helps taking care of asynchronous resolution of network call.

    when path "/" is queried object’s state which holds the connection will be used to
    set the result in an asynchronous fashion to redis.

    from fastapi import FastAPI
    from connections import redis_cache
    
    app = FastAPI()
    
    @app.on_event('startup')
    async def startup_event():
        await redis_cache.create_connection(r_url="redis://localhost")
    
    @app.on_event('shutdown')
    async def shutdown_event():
        await redis_cache._close()
    
    @app.get("/")
    async def root():
        key = 'key'
        value = 'data'
        await redis_cache._set(key, value)
    
    @app.get("/value")
    async def get_value():
        key = 'key'
        return await redis_cache._get(key)
    
    
    from typing import Optional
    from aioredis import Redis, create_redis
    
    
    class RedisCache:
        def __init__(self) -> str:
            self.redis_cache = None
    
        async def create_connection(self,r_url):
            self.redis_cache = await create_redis(r_url)
    
        async def _get(self, key) -> str:
            return await self.redis_cache.get(key)
    
        async def _set(self, key, value) -> None:
            await self.redis_cache.set(key, value)
    
        async def _close(self) -> None:
            self.redis_cache.close()
            await self.redis_cache.wait_closed()
    
    redis_cache = RedisCache()
    
    Login or Signup to reply.
  2. Your problem is how you use create_connection. You have to call it and await what it returns.

    await self.create_connection()
    

    You’ll then need to await set and get as well. As a one-liner this would get messy.

    await (await self.create_connection()).set(key, value)
    

    To help clean this up, you should split the awaits into separate statements.

    conn = await self.create_connection()
    await conn.set(key, value)
    

    Creating a new connection each time you need to perform an operation can be expensive. I’d recommend changing create_connection in one or two ways.

    Either have it attach the connection to your instance

    async def create_connection(self):
        self.conn = await aioredis.create_redis(self.redis)
    

    You can call this after you instantiate an instance of RedisService and then use

    await self.conn.set(key, value)
    

    Or you can switch to using a connection pool.

    async def create_connection(self):
        return await aioredis.create_redis_pool(self.redis)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search