skip to Main Content

currently I’m using websockets to pass through data that I receive from a Redis queue (pub/sub). But for some reason the websocket doesn’t send messages when using this redis queue.

What my code looks like

My code works as folllow:

  1. I accept the socket connection
  2. I connect to the redis queue
  3. For each message that I receive from the subscription, i sent a message through the socket. (at the moment only text for testing)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

The issue with the code

When I’m using this code it’s just not sending the message through the socket. But when I accept the websocket within the subscribe.listen() loop it does work but it reconnects every time (see code below).

@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.accept()
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

I think that the subscribe.listen() causes some problems that make the websocket do nothing when websocket.accept() is outside the for loop.

I hope someone knows whats wrong with this.

2

Answers


  1. Chosen as BEST ANSWER

    After a few days more research I found a solution for this issue. I solved it by using aioredis. This solution is based on the following GitHub Gist.

    import json
    import aioredis
    
    from fastapi import APIRouter, WebSocket
    
    from app.service.config_service import load_config
    
    check_route = APIRouter()
    
    
    @check_route.websocket("/check")
    async def websocket_endpoint(websocket: WebSocket):
    
        await websocket.accept()
    
        # ---------------------------- REDIS REQUIREMENTS ---------------------------- #
        config = load_config()
        redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
        redis_channel = config.redis.redis_socket_queue.channel
        redis = await aioredis.create_redis_pool(redis_uri)
    
        # ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
        (channel,) = await redis.subscribe(redis_channel)
        assert isinstance(channel, aioredis.Channel)
        try:
            while True:
                response_raw = await channel.get()
                response_str = response_raw.decode("utf-8")
                response = json.loads(response_str)
    
                if response:
                    await websocket.send_json({
                        "event": 'NEW_CHECK_RESULT',
                        "data": response
                    })
        except Exception as e:
            raise e
    

  2. I’m not sure if this will work, but you could try this:

    async def websocket_endpoint(websocket: WebSocket):
    
        await websocket.accept()
    
        redis = Redis(host='::1', port=6379, db=1)
        subscribe = redis.pubsub()
        subscribe.subscribe('websocket_queue')
    
        try:
            results = await subscribe.listen()
            for result in results:
                await websocket.send_text('test')
                print('test send')
        except Exception as e:
            await websocket.close()
            raise e
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search