skip to Main Content

I have a Flux stream that reads objects from the database.
For each of these objects, I have a processing function to be run.
I want the processing function to execute after acquiring Redis lock on ID of given object, and after processing release the lock (also if the processing function throws an error).

What’s the easiest way in Flux to create such a stream?
Here is some code of my failed attempt at doing this with transform.
I could probably make withLock take a function which would be attached as afterLock.flatMap(func), but I am looking for a solution that can avoid that.

I would like this to be as transparent to the rest of the stream as possible, and not require seperate attachment of lock and unlock functions, just one attachment that can do "lock-process-unlock".


        private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
          
            return flux -> {
                    Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
    
    // processing logic should be attached somewhere here
    
                afterLock
                        .flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
                                .thenReturn(ret)
                                .onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
    
                return afterLock;
    
            };
        }


        Flux.just(someObjectFromDatabase)
                .transform(withLock(t -> t.id()))
                .flatMap(this::objectProcessor)

2

Answers


  1. Chosen as BEST ANSWER

    Thanks for your answer @Alex, in the meantime I was able to come with something like this which is very flexible in terms of organizing the stream and resilent to failures (took me a while to cover edge cases...)
    It can be used as a call to stream.flatMap(withLock(..., processor)

        public static <T> Function<T, Flux<T>> withLock(
                long tid, String lockPrefix, int lockTimeMs, Function<T, String> keyExtractor, Function<Mono<T>, Flux<T>> processor, RedissonReactiveClient redis) {
    
            // If Redis lock or unlock operations fail, that will on purpose propagate the error.
            // If processor throws an error, lock will be unlocked first before error propagation.
    
            // tid has to be unique for each local task, it's a virtual "thread id" so if it's used concurrently locks will not protect the code
    
            return flux -> {
                Function<T, RLockReactive> getLock = ev -> redis.getLock(lockPrefix + keyExtractor.apply(ev));
                RLockReactive lock = getLock.apply(flux);
    
                Supplier<Mono<T>> unlock = () -> lock.unlock(tid).then(Mono.<T>empty());
                Supplier<Mono<T>> doLock = () -> lock.lock(lockTimeMs, TimeUnit.MILLISECONDS, tid).then(Mono.<T>empty());
    
                // Careful not to call map/flatMap on redis.lock/redis.unlock which returns Void and so it won't trigger on empty stream...lol!
                return Flux.concat(
                        Mono.defer(doLock),
                        Flux.defer(() -> processor.apply(Mono.just(flux))
                                .onErrorResume(err -> unlock.get()
                                        .onErrorResume(unlockError -> {
                                            err.addSuppressed(unlockError);
                                            // Propagate original processor error, but note the unlock error as suppressed
                                            return Mono.error(err);
                                        })
                                        .then(Mono.error(err)))),
                        Mono.defer(unlock)
                );
            };
    

  2. One of the solution is to use Mono.usingWhen that allows to use async operations for resource supplier, resource closure and cleanup.

    Mono.usingWhen(
       lockService.acquire(key),
       lock -> process(),
       lock -> lockService.release(lock)
    );
    

    In our case we wrapped Redis lock into LockService that looks like the following

    public interface ReactiveLockService {
    
        Mono<LockEntry> acquire(String key, Duration expireAfter);
    
        Mono<Void> release(LockEntry lock);
    
        interface LockEntry {
            String getKey();
    
            String getValue();
        }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search