I am developing a private chat for two or more users to communicate with each other. I have an endpoint for the websocket connection where only authenticated users are able to do a handshake between client and the server
The problem occurs when the websocket connection is accepted. Consumer handler is running smoothly in the infinite loop which waits for the messages from the client side to actually do some specific tasks that are requested, but the producer on the other side, hangs in an infinite loop and that causes the CPU spike up to 100%
Obviously I need one listener to a specific redis channel where I get all the messages from the users in real time, somehow I should listen to it, while loop does that but because of that CPU spike obviously it is not a good solution.
# api.py
async def consumer_handler(service):
"""Messages received on the websocket connection Consumer - (Publisher)"""
try:
while True:
received_data = await service.websocket.receive_json()
if received_data['event_type'] == "online.users":
await service.get_online_user_status(received_data['role_id'])
elif received_data['event_type'] == "message.user":
await service.send_message(received_data['user_id'], received_data['content'])
elif received_data['event_type'] == "info":
await service.get_info()
except WebSocketDisconnect:
logger.debug("WebSocketDisconnect - consumer handler disconnected")
async def producer_handler(service):
"""Messages generated at the backend to send to the websocket Producer - (Subscriber)"""
try:
while True:
if service.pubsub.subscribed:
message = await service.pubsub.get_message(ignore_subscribe_messages=True)
if message:
await service.websocket.send_json(message['data'].decode())
except (ConnectionClosedOK, aioredis.exceptions.ConnectionError) as e:
logger.debug(f"{e.__class__}", "producer handler disconnected")
@chat_app.websocket("/")
async def websocket_endpoint(websocket: WebSocket,
current_user: User = Depends(is_authenticated_ws)):
if not current_user:
return
async with ConnectionContextManager(user_id=current_user.id, websocket=websocket) as service:
producer_task = asyncio.ensure_future(producer_handler(service))
consumer_task = asyncio.ensure_future(consumer_handler(service))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
This endpoint handles the both producer/subscriber logic as it is described in the websockets documentation
#websocket_utils.py
class WebsocketService:
"""
This acts like a service for websocket, is returned within the context manager
this class is used to not interact with consumer directly, instead interact it with the manager
"""
def __init__(self, *, user_id: UUID4, websocket: WebSocket, pubsub: PubSub):
self.user_id = user_id
self.websocket = websocket
self.pubsub = pubsub
async def get_online_user_status(self, role_id):
await consumer.online_user_status_per_role(role_id, self.websocket)
async def send_message(self, user_id: UUID4, content: str):
await consumer.send_message_to_user(user_id=user_id,
message=content,
websocket=self.websocket)
async def get_info(self):
await consumer.fetch_info(self.websocket)
class ConnectionContextManager:
"""
This context manager handles the websocket connection
on enter, it returns a controller for the websocket events
"""
websocket_service: WebsocketService
def __init__(self, *, user_id: UUID4, websocket: WebSocket):
self.websocket_service = WebsocketService(user_id=user_id,
websocket=websocket,
pubsub=websocket.app.redis.pubsub())
async def __aenter__(self):
logger.debug("Context manager enter")
await consumer.connect(
user_id=self.websocket_service.user_id,
websocket=self.websocket_service.websocket,
pubsub=self.websocket_service.pubsub
)
return self.websocket_service
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await consumer.disconnect(
user_id=self.websocket_service.user_id,
pubsub=self.websocket_service.pubsub,
websocket=self.websocket_service.websocket,
)
logger.debug("Context manager exit")
This context manager ensures that each user has their own pubsub channel and it creates a controller for the actual consumer so that I do not have to pass user_id and other handy parameters all the time when I need a specific resource.
class ConnectionConsumer:
__redis: aioredis.Redis
def __init__(self):
self.__redis = aioredis.from_url(settings.ws_redis_url, encoding='utf-8', decode_responses=True)
async def __send_json(self, obj: dict, websocket: WebSocket):
await websocket.send_json(obj)
async def connect(self, *, user_id: UUID4, websocket: WebSocket, pubsub: PubSub):
# Accept connection if authorization is successful, set the user online and subscribe to its channel layer
await websocket.accept()
await self.__redis.set(f"status:{user_id}", "1") # status:UUID4 (means online)
await pubsub.subscribe(f"channel:{user_id}") # subscribe to itself's channel
async def disconnect(self, *, user_id: UUID4, websocket: WebSocket, pubsub: PubSub):
# Gracefully disconnect from the websocket and remove the channel layer from pubsub
await self.__redis.delete(f"status:{user_id}")
await pubsub.unsubscribe(f"channel:{user_id}")
await pubsub.close()
await self.__redis.close()
await websocket.close()
And here is the actual consumer which is called from the service that context manager returns.
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
4ed80g7fb093 s_be 1.77% 76.09MiB / 15.29GiB 0.49% 37.3kB / 21.1kB 0B / 0B 7
This is the docker stats
for the container when only consumer is handled
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
4ed80g7fb093 s_be 100.36% 76.08MiB / 15.29GiB 0.49% 42.9kB / 25.7kB 0B / 0B 7
And this is the docker stats
for the container when both producer and consumer handlers are running
I have tried to split the connections as well but I have the same issue.
2
Answers
SOLUTION
here is the refactored producer handler that uses
listen()
function instead ofget_message()
which yields response instead of returning, this causes event loop toyield
message whenever the value would be sent to it with generator expression. it does not need to check each time if value is available or not and therefore the problem gets solved and we do not need to have a timeout orawait sleep()
function inside the codeI know this is pretty old question, but I also got similar problem.
The issue is that
await pubsub.get_message
usestimeout=0.0
param by default, which causes "infinite polling" and high CPU usage.You can specify timeout argument (it must be a "seconds" float) so the system will wait before returning. Also, you can pass
timeout=None
to makeget_message
function to wait indefinitely for next message.