skip to Main Content

I am trying to create a sample Nest.js app to have websocket connections using socket.io which uses Redis Adapter to publish event to multiple microservices.

Seems like in Redis v4 when the client is created it doesn’t connect automatically anymore.

I have the following adapter:

import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ServerOptions } from 'socket.io';

const pubClient = createClient({
  url: 'redis://redis:6379',
  password: 'pass',
});

const subClient = pubClient.duplicate();

const redisAdapter = createAdapter(pubClient, subClient);

export class RedisIoAdapter extends IoAdapter {
  override createIOServer(port: number, options?: ServerOptions): any {
    const server = super.createIOServer(port, options);

    server.adapter(redisAdapter);

    return server;
  }
}

When calling this.server.to('room').emit() I am getting the following error:

server  | /app/node_modules/@node-redis/client/dist/lib/client/index.js:407
server  |         return Promise.reject(new errors_1.ClientClosedError());
server  |                               ^
server  | 
server  | ClientClosedError: The client is closed
server  |     at Commander._RedisClient_sendCommand (/app/node_modules/@node-redis/client/dist/lib/client/index.js:407:31)
server  |     at Commander.commandsExecutor (/app/node_modules/@node-redis/client/dist/lib/client/index.js:166:154)
server  |     at Commander.BaseClass.<computed> [as publish] (/app/node_modules/@node-redis/client/dist/lib/commander.js:8:29)
server  |     at RedisAdapter.broadcast (/app/node_modules/@socket.io/redis-adapter/dist/index.js:406:28)
server  |     at BroadcastOperator.emit (/app/node_modules/socket.io/dist/broadcast-operator.js:109:22)
server  |     at AppGateway.handleMessage (/app/dist/app.gateway.js:21:30)
server  |     at /app/node_modules/@nestjs/websockets/context/ws-context-creator.js:43:33
server  |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
server  |     at async AppGateway.<anonymous> (/app/node_modules/@nestjs/websockets/context/ws-proxy.js:11:32)
server  |     at async WebSocketsController.pickResult (/app/node_modules/@nestjs/websockets/web-sockets-controller.js:91:24)
server  | 
server  | Node.js v17.4.0
server exited with code 1

I tried downgrading to "redis": "^3.1.2" and "socket.io-redis": "^6.0.0" and updated the relevant code (for example, to use RedisClient instead of createClient) and everything seems to be working well (I tried this in k8s using three servers and all clients received the message).

I want to use the latest versions however. Considering that RedisClient.connect is an async function, what is the correct way to connect to Redis in this case? The createIOServer is also not an async function, so I cannot call connect in there either.


Just for reference, here is my main.ts file:

async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(AppModule);
  app.useWebSocketAdapter(new RedisIoAdapter(app));
  await app.listen(3000);
}
bootstrap();

and app.gateway.ts:

import { SubscribeMessage, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

type Payload = {
  name: String;
  text: String;
};

@WebSocketGateway({
  cors: {
    origin: '*',
  },
})
export class AppGateway {
  @WebSocketServer() server: Server;

  @SubscribeMessage('msgToServer')
  handleMessage(client: Socket, payload: Payload) {
    this.server.to('msgRoom').emit('msgToClient', payload);
  }

  handleConnection(client: Socket, ...args: any[]) {
    client.join('msgRoom');
  }
}

2

Answers


  1. Chosen as BEST ANSWER

    I added a function in redis-io.adapter.ts to connect to clients:

    export async function connectRedis() {
      if (pubClient.isOpen && subClient.isOpen) return;
    
      await Promise.all([pubClient.connect(), subClient.connect()]);
    }
    

    I am calling it in main.ts, right before useWebSocketAdapter

    async function bootstrap() {
      const app = await NestFactory.create<NestExpressApplication>(AppModule);
      await connectRedis();
      app.useWebSocketAdapter(new RedisIoAdapter(app));
      await app.listen(3000);
    }
    bootstrap();
    

    I also moved the createAdapter call inside createIOServer, this way whenever the adapter is created the Redis clients will always be connected. Seems to be working fine now, just like with the older versions.


  2. You should user redis3 npm i --save redis@3. Only microservices docs tell us about it, but is valid to entire Nest packages.

    import { IoAdapter } from '@nestjs/platform-socket.io';
    import { Server, ServerOptions } from 'socket.io';
    import { createAdapter } from '@socket.io/redis-adapter';
    import { createClient } from 'redis';
    
    import { INestApplication } from '@nestjs/common';
    import { ConfigurationService } from 'src/configuration/configuration.service';
    
    export class RedisIoAdapter extends IoAdapter {
      protected redisAdapter;
    
      constructor(app: INestApplication) {
        super(app);
        const configService = app.get(ConfigurationService);
    
        const pubClient = createClient({
          host: configService.get('REDIS_HOST'),
          port: configService.get('REDIS_PORT'),
        });
        const subClient = pubClient.duplicate();
    
        this.redisAdapter = createAdapter(pubClient, subClient);
      }
    
      createIOServer(port: number, options?: ServerOptions) {
        const server = super.createIOServer(port, options) as Server;
    
        server.adapter(this.redisAdapter);
    
        return server;
      }
    }
    

    It works with redis@4 (https://socket.io/docs/v4/redis-adapter/#usage)

    import { IoAdapter } from '@nestjs/platform-socket.io';
    import { Server, ServerOptions } from 'socket.io';
    import { createAdapter } from '@socket.io/redis-adapter';
    import { createClient } from 'redis';
    
    import { INestApplication } from '@nestjs/common';
    import { ConfigurationService } from 'src/configuration/configuration.service';
    
    export class RedisIoAdapter extends IoAdapter {
      protected redisAdapter;
    
      constructor(app: INestApplication) {
        super(app);
        const configService = app.get(ConfigurationService);
    
        const pubClient = createClient({
          socket: {
            host: configService.get('REDIS_HOST'),
            port: configService.get('REDIS_PORT'),
          },
        });
        const subClient = pubClient.duplicate();
    
        pubClient.connect();  // <------
        subClient.connect();  // <------
    
        this.redisAdapter = createAdapter(pubClient, subClient);
      }
    
      createIOServer(port: number, options?: ServerOptions) {
        const server = super.createIOServer(port, options) as Server;
    
        server.adapter(this.redisAdapter);
    
        return server;
      }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search