skip to Main Content

I like to do an SSE with the response of redis.subscribe in quarkus.

I have a sample from the quarkus-quickstart for a simple SSE

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("{name}/streaming")
  public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
    return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
        .map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
  }

This work well, every 2 seconds I received the Hello …. in my web browser

Now I try to subscribe to Redis, so I should receive the message from Redis.

Redis sample :

(cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1

(cmd window 2)
PUBLISH  message-channel HelloWorld
(integer) 1

(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"

Now I try this with quarkus SSE:

  @Inject
  ReactiveRedisClient reactiveRedisClient;

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("sse/redissse")
  public Multi<String> redissse() {
    List<String> subscriberList = new ArrayList();
    subscriberList.add("message-channel");

    return reactiveRedisClient.subscribe(subscriberList)
        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
        .onItem().castTo(String.class);
  }

what I received was an exception:

WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]

Could anyone support me?
Is there a simple example?
I have no clue about it, I can’t receive Redis messages with "subscribe" publish.

any suggestions…

2

Answers


  1. Chosen as BEST ANSWER

    Now I do the following:

      @Inject
      @RedisClientName("second")
      RedisClient redisClient2;
    
    void onStart(@Observes StartupEvent ev) throws IOException {
      this.redisClient2.subscribe(List.of("message-channel"));
    }
    
    
      @GET
      @Produces(MediaType.SERVER_SENT_EVENTS)
      @SseElementType(MediaType.TEXT_PLAIN)
      @Path("/redis/subscribe")
      public Publisher<String> subscribechannel(){
         return eventBus.<String>consumer("io.vertx.redis.message-channel").toPublisherBuilder()
            .map(Message::body)
            .buildRs();
      }
    

    Now it works, but If I do SSE from more than browsers they share the events. So only one of each received an event after each other consumer (browser).


  2. I haven’t used Redis pub-sub, but I did use Redis streams and what I had to do was something like this:

    `

    return Multi.createBy().repeating()
        .supplier(() -> this.reactiveRedisClient.subscribe(subscriberList)
                            .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
                            .onItem().castTo(String.class))
            .indefinitely()
            .onItem().disjoint();
    

    `

    I guess since pub-sub is non-blocking, it runs once then it doesn’t wait till another message arrives. You have to implement your own while(true) loop in a reactive way.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search