trying to build a scalable chat app with FastApi and Redis pub/sub.
Suppose we have 10 processes running FastApi app. Each process will create 1 connection pool to Redis at startup. Redis instance allows max 10 connections. Each user has its own redis channel where all notifications (chat messages, app notifications etc) are coming. When the user connects to a websocket 2 tasks are launched, 1 that listens to websocket, and 1 that listens to redis user channel. Below is a simplified thing we have now.
resources.py
redis = None
async def startup_event():
global redis
redis = aioredis.from_url(url=REDIS_URL, password=REDIS_PASSWORD, encoding='utf-8', decode_responses=True)
async def get_redis() -> Redis:
return redis
views.py
import orjson as json
channel = 'user:channel'
async def listen_socket(
websocket: WebSocket,
redis: Redis, ):
while True:
try:
data = await websocket.receive_bytes()
except:
await redis.publish(channel, json.dumps({'type': 'disconnect_user'}))
return None
async def listen_redis(
websocket: WebSocket,
redis: Redis, ):
ps = redis.pubsub()
await ps.psubscribe(channel)
async for data in ps.listen():
if data['type'] == 'pmessage':
data = json.loads(data['data'])
event_type = data.get('type')
if event_type == 'disconnect_user':
return None
elif event_type == 'echo':
await websocket.send_bytes(json.dumps(data))
@router.websocket('/', name='ws', )
async def process_ws(
websocket: WebSocket,
redis: Redis = Depends(get_redis), ):
await websocket.accept()
await asyncio.gather(
listen_redis(
websocket=websocket,
redis=redis, ),
listen_socket(
websocket=websocket,
redis=redis, ),
)
- This line
async for data in ps.listen():
blocks the connection and this particular connection cannot serve clients on different threads of the same process, not even the current client. Is this true? If yes then this approach is absolutely not scalable, because we cannot afford 1 Redis connection per user. - What would solve the above issue? 2 Redis connections per process? 1 connection pool and 1 connection dedicated to consume redis pub/sub channel? In this case publishing will be done to a process channel not to a specific user channel. We would need a thread that consumes the pub/sub channel and routes to the user websocket connected to that process. Is this correct?
Am I overthinking?
Are there better approaches?
Thank you so much for help!
2
Answers
I recommend that you don’t try to implement the functionality you describe manually just by using a fastapi and redis. Is a path of pain and suffering that is unjustified and highly ineffective.
Just use centrifugo and you’ll be happy.
I recommend using queues to scale your real time application.
e.g. RabbitMQ or even rpush und lpop with redis lists – if you want stay with redis. this approch is much easier to implement as pub/sub and scales great.
Handling and sharing events bidirectional with Pub/Sub & WebSockets is a pain in most languages.