skip to Main Content

I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don’t think a while loop is required. But the following code seems to not work.

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn’t the callback supposed to be called for every message that is newly added into the stream?

4

Answers


  1. I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.

    exception in thread "main" java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at com.test.TestList.main(TestList.java:57)
    Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
        at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
        at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
        at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
        at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
        at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
        at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
        at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    

    the example code is following:

    package com.test;
    
    import io.lettuce.core.Consumer;
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.RedisFuture;
    import io.lettuce.core.StreamMessage;
    import io.lettuce.core.XGroupCreateArgs;
    import io.lettuce.core.XReadArgs.StreamOffset;
    import io.lettuce.core.api.StatefulRedisConnection;
    import io.lettuce.core.api.async.RedisAsyncCommands;
    
    import java.util.List;
    public class TestList {
        public static void main(String[] args) throws Exception {
            RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
            StatefulRedisConnection<String, String> connection = redisClient.connect();
            RedisAsyncCommands commands = connection.async();
            RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
            String redisFutureGet = redisFuture.get();
            System.out.println(redisFutureGet);
            commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
            RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
                    StreamOffset.lastConsumed("my-stream1"));
            List<StreamMessage<String, String>> res = messages.get();
            System.out.println(res);
        }
    }
    
    Login or Signup to reply.
  2. You could use the Redis reactive commands to achieve this:

    RedisReactiveCommands<String, String> commands = connection.reactive();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .subscribe(System.out::println, Throwable::printStackTrace);
    
    Login or Signup to reply.
  3. I think Lettuce is only response for communicating with Redis,wether in sync,async or stream way。it is a low-level library。 so if you want such high-level function,using spinrg-data something like this:

    StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener();
    
       StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder().pollTimeout(Duration.ofMillis(100)).build();
    
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
                    containerOptions);
    Subscription subscription = container.receive(StreamOffset.fromStart("key2"), streamListener);
    container.start();
    //----------------------------------------------------------------
    
    public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
    
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
    
            System.out.println("MessageId: " + message.getId());
            System.out.println("Stream: " + message.getStream());
            System.out.println("Body: " + message.getValue());
        }
    }
    Login or Signup to reply.
  4. I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same Flux like below and it worked for me with xread. I think the same should work for xreadgroup as well.

    RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
    commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
                    .doOnNext(msg -> {
                        sink.tryEmitNext(msg.getBody().get("key"));
                    })
                    .repeat()
                    .subscribe();
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search