skip to Main Content

Recently I’ve gotten into the “crypto mania” and have started writing my own wrappers around the API’s on some exchanges.

Binance in particular has an a streaming websocket endpoint.

where you can stream data but via a websocket endpoint.
I thought I’d try this out on my own using sanic.

here is my websocket route

@ws_routes.websocket("/hello")
async def hello(request, ws):
    while True:
        await ws.send("hello")

now I have 2 clients on 2 different machines connecting to it

async def main():
    async with aiohttp.ClientSession() as session:

        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

however only one of the clients will be able to connect and receive the sent data from the server. I’m assuming that because of the while loop its blocking and preventing the other connection from connecting because it doesn’t yield?

how do we make it stream to multiple clients without blocking the other connections?

I looked into adding more workers and it seems to do the trick but what I don’t understand is thats not a very scalable solution. because each client would be its own worker and if you have thousands or even just 10 clients that would be 10 workers 1 per client.

so how does Binance do their websocket streaming? or hell how does the twitter stream endpoint work?

how is it able to serve an infinite stream to multiple concurrent clients?
because ultimately thats what I’m trying to do

2

Answers


  1. Chosen as BEST ANSWER

    The way to solve this would be something like this.

    I am using the sanic framework

    class Stream:
        def __init__(self):
            self._connected_clients = set()
    
        async def __call__(self, *args, **kwargs):
            await self.stream(*args, **kwargs)
    
        async def stream(self, request, ws):
            self._connected_clients.add(ws)
    
            while True:
                disconnected_clients = []
                for client in self._connected_clients:  # check for disconnected clients
                    if client.state == 3:  # append to a list because error will be raised if removed from set while iterating over it 
                        disconnected_clients.append(client)
                for client in disconnected_clients:  # remove disconnected clients
                    self._connected_clients.remove(client)
    
                await asyncio.wait([client.send("Hello") for client in self._connected_clients]))
    
    
    ws_routes.add_websocket_route(Stream(), "/stream")
    
    1. keep track of each websocket session
    2. append to a list or set
    3. check for invalid websocket sessions and remove from your websocket sessions container
    4. do an await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]]) which is basically a broadcast.

    5.profit!

    this is basically the pubsub design pattern


  2. Something like this maybe?

    import aiohttp
    import asyncio
    loop = asyncio.get_event_loop()
    async def main():
        async with aiohttp.ClientSession() as session:
            ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
            while True:
                data = await ws.receive()
                print(data)
    
    multiple_coroutines = [main() for _ in range(10)]
    loop.run_until_complete(asyncio.gather(*multiple_coroutines))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search