I have an application that saves data in redis, this data arrives through events in a queue which generates a high concurrency environment. To maintain data consistency I have used the Redis MULTI / EXEC statement and in case of failure the data is not confirmed. Before opening MULTI I need to query Redis for data.
I present an example case.
Test endpoint with which I simulate the events:
@GET
@Path("/foo")
public Response foo() {
String queue = "foo";
for (int i = 0; i < 10000; i++) {
activeMqClient.addToQueue(queue, new Foo(i, "Message " + 1));
}
return OkResponse.toResponse("Foo send OK");
}
Event burning (running multiple instances in threads):
public class FooRedisCmd {
public void process(Foo foo) {
// Example
RedisClient redisClient = null;
Response multiResponse = null;
try {
redisClient = RedisClient.createClient();
//get info
redisClient.hget("foos", "foo_field1:" + foo.getId());
redisClient.hget("foos", "foo_field2:" + foo.getId());
redisClient.hget("foos", "foo_field3:" + foo.getId());
redisClient.hget("foos", "foo_field4:" + foo.getId());
redisClient.hget("foos", "foo_field5:" + foo.getId());
redisClient.hget("foos", "foo_field6:" + foo.getId());
redisClient.hget("foos", "foo_field7:" + foo.getId());
//write info
multiResponse = redisClient.multi();
log.info("Get multi: {} for foo: {}", multiResponse, foo);
redisClient.hincrby("foos", "foo_field1:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field2:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field3:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field4:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field5:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field6:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field7:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field8:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field9:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field10:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field11:" + foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field12:" + foo.getId(), Integer.toString(1));
Response execResponse = redisClient.exec();
log.info("Multi result: {} for foo: {}", execResponse, foo);
} catch (Exception e) {
if (multiResponse != null) {
try {
redisClient.discard();
} catch (Exception e2) {
log.error("Error on discard for foo: {}", foo, e2);
}
}
log.error("Error on exec for foo: {}", foo, e);
} finally {
if (redisClient != null) {
redisClient.close();
}
}
}
}
Errors:
INFO [com.foo.FooRedisCmd] (pool-17-thread-3) Get multi: OK for foo: Foo(id=9965, message=Message 1)
INFO [com.foo.FooRedisCmd] (pool-17-thread-3) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9965, message=Message 1)
INFO [com.foo.FooRedisCmd] (pool-17-thread-1) Get multi: OK for foo: Foo(id=9966, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on discard for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR DISCARD without MULTI
at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
at io.quarkus.redis.client.runtime.RedisClientImpl.discard(RedisClientImpl.java:142)
at com.foo.FooRedisCmd.process(FooRedisCmd.java:48)
at com.foo.FooRedisCmd_Subclass.process$$superforward1(FooRedisCmd_Subclass.zig:94)
at com.foo.FooRedisCmd_Subclass$$function$$1.apply(FooRedisCmd_Subclass$$function$$1.zig:33)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR DISCARD without MULTI
INFO [com.foo.FooRedisCmd] (pool-17-thread-2) Get multi: OK for foo: Foo(id=9967, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on exec for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR EXEC without MULTI
at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
at io.quarkus.redis.client.runtime.RedisClientImpl.exec(RedisClientImpl.java:167)
at com.foo.FooRedisCmd.process(FooRedisCmd.java:43)
at com.foo.FooRedisCmd_Subclass.process$$superforward1(FooRedisCmd_Subclass.zig:94)
at com.foo.FooRedisCmd_Subclass$$function$$1.apply(FooRedisCmd_Subclass$$function$$1.zig:33)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR EXEC without MULTI
INFO [com.foo.FooRedisCmd] (pool-17-thread-2) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9967, message=Message 1)
Errors are generated when executing both the exec and the discard when the first one fails. Why is this happening? I am using quarkus 2 and its own client for redis quarkus-redis-client.
2
Answers
Since you running multiple instances in threads, the
multi
andexec
command were executed alternately.like this.
The solution is
pipeline
, to package youmulti
,hincrby
,exec
commands to one command. Redis will execute them in right order.you can refer https://redis.io/topics/pipelining for more information.
You can use pipeline, if you are using jedis client for Redis, you can check out this
https://www.tabnine.com/code/java/methods/redis.clients.jedis.Jedis/pipelined