skip to Main Content

I’m trying to use Spring Data Redis to consume a Redis Stream using consumer groups, but keep getting the following exception:

Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option

The message seems to hint that I first need to create a consumer group? But the documentation does not provide any reference to this: https://github.com/spring-projects/spring-data-redis/blob/master/src/main/asciidoc/reference/redis-streams.adoc

Framework Versions:

  • Spring Boot 2.2.6
  • Lettuce 5.2.2
  • Redis 5.0.8

This is the code I’m using to consume the stream:

@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofMillis(100)).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
                        containerOptions);

        container.receive(Consumer.from("my-group", "my-consumer"),
                        StreamOffset.create("event-stream", ReadOffset.latest()),
                        message -> {
                                System.out.println("MessageId: " + message.getId());
                                System.out.println("Stream: " + message.getStream());
                                System.out.println("Body: " + message.getValue());
                                streamRedisTemplate.opsForStream().acknowledge("my-group", message);
                        });

        /*Subscription subscription = container.receive(StreamOffset.fromStart("event-stream"), message -> {

                System.out.println("MessageId: " + message.getId());
                System.out.println("Stream: " + message.getStream());
                System.out.println("Body: " + message.getValue());
        });*/

        container.start();

        return container;
}

Full Stack Trace:

org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:52) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:361) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:239) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
    ... 1 common frames omitted

2

Answers


  1. Chosen as BEST ANSWER

    Answering my own question. It seems as though you do need to explicitly create the stream and group first, even though not mentioned anywhere in the docs. Although there should really be a better way to initialize an empty stream other than publishing a message to it.

    private void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
            try {
                    //redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
                    redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
            } catch (RedisSystemException e) {
                    if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
                            log.info("STREAM - Redis group already exists, skipping Redis group creation: my-group-2");
                    } else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
                            log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
                            // TODO: There has to be a better way to create a stream than this!?
                            redisTemplate.opsForStream().add("event-stream", Collections.singletonMap("", ""));
                            redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
                    } else throw e;
            }
    }
    
    

    EDIT: As mentioned by @anstue in the comments below, spring-data-redis 2.3.1+ now automatically creates the stream if it doesn't exist, when calling createGroup. However, it will throw a RedisSystemBusyException if the group does already exist. So I'm updating the answer with the solution I am currently using, making sure to catch this exception.

    public class EventStreamUtils {
    
        public static void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
            try {
                // ReadOffset.from("0-0") will start reading stream from the very beginning.  Otherwise,
                // it will pick up at the point in the stream where the new group was created.
                //redisTemplate.opsForStream().createGroup(key, ReadOffset.from("0-0"), group);
                redisTemplate.opsForStream().createGroup(key, group);
            } catch (RedisSystemException e) {
                var cause = e.getRootCause();
                if (cause != null && RedisBusyException.class.equals(cause.getClass())) {
                    log.info("STREAM - Redis group already exists, skipping Redis group creation: {}", group);
                } else throw e;
            }
        }
    }
    

  2. I upgraded to spring-data-redis 2.4.6 (I believe it’s the newest stable release at the time of this writing) and I still get an exception when creating group of empty stream using opsForStream().createGroup().

    The solution I came up with is to directly use RedisStreamCommand as follow (continuing from the try-catch of the OP’s original answer):

    try {
      redisTemplate.getConnectionFactory().getConnection().xGroupCreate(
          "key".getBytes(),
          "group",
          ReadOffset.from("0-0"),
          true // this is important. It's to execute MKSTREAM command from redis
               // only available from 2.3.0.RELEASE and above
      );
    } catch (RedisSystemException e) {
      // your exception handling
      // getConnection() can also throw NullPointerException
    }
    

    I originally used kotlin in my spring boot project, so the code might need some adjustments. I don’t know how safe it is to use that method, but for now that’s the only way I know.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search