skip to Main Content

I have an application that saves data in redis, this data arrives through events in a queue which generates a high concurrency environment. To maintain data consistency I have used the Redis MULTI / EXEC statement and in case of failure the data is not confirmed. Before opening MULTI I need to query Redis for data.

I present an example case.

Test endpoint with which I simulate the events:

@GET
@Path("/foo")
public Response foo() {
    String queue = "foo";

    for (int i = 0; i < 10000; i++) {
        activeMqClient.addToQueue(queue,  new Foo(i, "Message " + 1));
    }

    return OkResponse.toResponse("Foo send OK");
}

Event burning (running multiple instances in threads):

public class FooRedisCmd {

    public void process(Foo foo) {
        // Example
        RedisClient redisClient = null;
        Response multiResponse = null;
        try {
            redisClient = RedisClient.createClient();
            //get info
            redisClient.hget("foos", "foo_field1:" + foo.getId());
            redisClient.hget("foos", "foo_field2:" + foo.getId());
            redisClient.hget("foos", "foo_field3:" + foo.getId());
            redisClient.hget("foos", "foo_field4:" + foo.getId());
            redisClient.hget("foos", "foo_field5:" + foo.getId());
            redisClient.hget("foos", "foo_field6:" + foo.getId());
            redisClient.hget("foos", "foo_field7:" + foo.getId());

            //write info
            multiResponse = redisClient.multi();
            log.info("Get multi: {} for foo: {}", multiResponse, foo);
            redisClient.hincrby("foos", "foo_field1:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field2:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field3:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field4:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field5:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field6:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field7:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field8:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field9:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field10:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field11:" + foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field12:" + foo.getId(), Integer.toString(1));
            Response execResponse = redisClient.exec();
            log.info("Multi result: {} for foo: {}", execResponse, foo);
        } catch (Exception e) {
            if (multiResponse != null) {
                try {
                    redisClient.discard();
                } catch (Exception e2) {
                    log.error("Error on discard for foo: {}", foo, e2);
                }
            }
            log.error("Error on exec for foo: {}", foo, e);
        } finally {
            if (redisClient != null) {
                redisClient.close();
            }
        }
    }
}

Errors:

INFO  [com.foo.FooRedisCmd] (pool-17-thread-3) Get multi: OK for foo: Foo(id=9965, message=Message 1)
INFO  [com.foo.FooRedisCmd] (pool-17-thread-3) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9965, message=Message 1)
INFO  [com.foo.FooRedisCmd] (pool-17-thread-1) Get multi: OK for foo: Foo(id=9966, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on discard for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR DISCARD without MULTI
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
        at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
        at io.quarkus.redis.client.runtime.RedisClientImpl.discard(RedisClientImpl.java:142)
        at com.foo.FooRedisCmd.process(FooRedisCmd.java:48)
        at com.foo.FooRedisCmd_Subclass.process$$superforward1(FooRedisCmd_Subclass.zig:94)
        at com.foo.FooRedisCmd_Subclass$$function$$1.apply(FooRedisCmd_Subclass$$function$$1.zig:33)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
        at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR DISCARD without MULTI

INFO  [com.foo.FooRedisCmd] (pool-17-thread-2) Get multi: OK for foo: Foo(id=9967, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on exec for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR EXEC without MULTI
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
        at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
        at io.quarkus.redis.client.runtime.RedisClientImpl.exec(RedisClientImpl.java:167)
        at com.foo.FooRedisCmd.process(FooRedisCmd.java:43)
        at com.foo.FooRedisCmd_Subclass.process$$superforward1(FooRedisCmd_Subclass.zig:94)
        at com.foo.FooRedisCmd_Subclass$$function$$1.apply(FooRedisCmd_Subclass$$function$$1.zig:33)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
        at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR EXEC without MULTI

INFO  [com.foo.FooRedisCmd] (pool-17-thread-2) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9967, message=Message 1)

Errors are generated when executing both the exec and the discard when the first one fails. Why is this happening? I am using quarkus 2 and its own client for redis quarkus-redis-client.

2

Answers


  1. Since you running multiple instances in threads, the multi and exec command were executed alternately.

    like this.

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set dst jjj
    QUEUED
    127.0.0.1:6379> multi
    (error) ERR MULTI calls can not be nested
    127.0.0.1:6379> exec
    1) OK
    127.0.0.1:6379> exec
    (error) ERR EXEC without MULTI
    127.0.0.1:6379> discard
    (error) ERR DISCARD without MULTI
    

    The solution is pipeline, to package you multi, hincrby, exec commands to one command. Redis will execute them in right order.
    you can refer https://redis.io/topics/pipelining for more information.

    Login or Signup to reply.
  2. You can use pipeline, if you are using jedis client for Redis, you can check out this

    https://www.tabnine.com/code/java/methods/redis.clients.jedis.Jedis/pipelined

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search