I have a Flux stream that reads objects from the database.
For each of these objects, I have a processing function to be run.
I want the processing function to execute after acquiring Redis lock on ID of given object, and after processing release the lock (also if the processing function throws an error).
What’s the easiest way in Flux to create such a stream?
Here is some code of my failed attempt at doing this with transform.
I could probably make withLock take a function which would be attached as afterLock.flatMap(func)
, but I am looking for a solution that can avoid that.
I would like this to be as transparent to the rest of the stream as possible, and not require seperate attachment of lock and unlock functions, just one attachment that can do "lock-process-unlock".
private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {
return flux -> {
Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));
// processing logic should be attached somewhere here
afterLock
.flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
.thenReturn(ret)
.onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));
return afterLock;
};
}
Flux.just(someObjectFromDatabase)
.transform(withLock(t -> t.id()))
.flatMap(this::objectProcessor)
2
Answers
Thanks for your answer @Alex, in the meantime I was able to come with something like this which is very flexible in terms of organizing the stream and resilent to failures (took me a while to cover edge cases...)
It can be used as a call to
stream.flatMap(withLock(..., processor)
One of the solution is to use
Mono.usingWhen
that allows to use async operations for resource supplier, resource closure and cleanup.In our case we wrapped Redis lock into
LockService
that looks like the following