Recently I’ve gotten into the “crypto mania” and have started writing my own wrappers around the API’s on some exchanges.
Binance in particular has an a streaming websocket endpoint.
where you can stream data but via a websocket endpoint.
I thought I’d try this out on my own using sanic.
here is my websocket route
@ws_routes.websocket("/hello")
async def hello(request, ws):
while True:
await ws.send("hello")
now I have 2 clients on 2 different machines connecting to it
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
however only one of the clients will be able to connect and receive the sent data from the server. I’m assuming that because of the while
loop its blocking and preventing the other connection from connecting because it doesn’t yield
?
how do we make it stream to multiple clients without blocking the other connections?
I looked into adding more workers and it seems to do the trick but what I don’t understand is thats not a very scalable solution. because each client would be its own worker and if you have thousands or even just 10 clients that would be 10 workers 1 per client.
so how does Binance do their websocket streaming? or hell how does the twitter stream endpoint work?
how is it able to serve an infinite stream to multiple concurrent clients?
because ultimately thats what I’m trying to do
2
Answers
The way to solve this would be something like this.
I am using the
sanic
frameworkwebsocket
sessionlist
orset
websocket
sessions and remove from yourwebsocket
sessions containerawait asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])
which is basically a broadcast.5.profit!
this is basically the pubsub design pattern
Something like this maybe?