I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don’t think a while loop is required. But the following code seems to not work.
public static void main(String[] args) throws InterruptedException {
RedisClient redisClient = RedisClient
.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
RedisAsyncCommands commands = connection.async();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.thenAccept(System.out::println);
Thread.currentThread().join();
}
It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn’t the callback supposed to be called for every message that is newly added into the stream?
4
Answers
I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.
the example code is following:
You could use the Redis reactive commands to achieve this:
I think Lettuce is only response for communicating with Redis,wether in sync,async or stream way。it is a low-level library。 so if you want such high-level function,using spinrg-data something like this:
I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same
Flux
like below and it worked for me withxread
. I think the same should work forxreadgroup
as well.