It is necessary to filter the flow of values from the database (filtering must be performed in a reactive, non-blocking style). You need to get the incoming element, take the email field from it and validate this value. After the first correct result. Stop processing.
However. to get a valid value, you may have to process several thousand elements before you get the first suitable value.
public class User {
@Id
Long id;
String email;
String phoneHome;
}
there is a search by phone number
private Mono<Response> findUserByHomePhone(Response response) {
return Mono.just(response)
.flatMap(this::retrieveUserList);
}
private Mono<Response> retrieveUserList (Response response) {
String phone = retrievePhoneFromResponse (response);
return Mono.from(userService.getByPhone (phone)
.groupBy(Customer::getId)
.flatMap(this::processGroupedObjects)
.switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response)))
.as(Log.of(log,
"Search by phone {}",
phone (response))::info);
}
- userService.getByPhone (phone) – through the service class (@Transactional is installed above it), the method defined in ReactiveCrudRepository is accessed, This method initiates a request to the database. Request of the type:
select * from users by phone = $1
-
.groupBy(Customer::getId) – we iterate through the flow of elements, each element of the flow is a User. We perform grouping by id.
-
.flatMap(this::processGroupedObjects)
- processing grouped objects. this is where freezes are possible. in Debug mode, I couldn’t check all the elements. since the first few thousand elements have as email -> null.
-
.switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response))) – as soon as there is a user whose email field is filled in, we return the result.
private Mono< Response > getFirsFoundElement(Signal<? extends User> signal, Response response) {
boolean isFoundElement = signal.hasValue();
if (isFoundElement) {
return Mono.just(response);
}
return Mono.error(new Exception());
}
private Flux<User> processGroupedObjects(GroupedFlux<Long, User> group) {
return group
.mapNotNull(this::checkEmailOnNull);
}
private User checkEmailOnNull (User user) {
String email = user.getEmail();
if (Strings.isEmpty(email)){
return null;
}
return user;
}
However, approximately at this point (process Group Objects(GroupedFlux<Lang, User> group)) – a freezes occurs, I do not observe errors in the console.
The number of source elements in the database is from 4 000 to 25 000 (the algorithm worked for 3000).
I found this:
GroupedFlux
Grouping is best suited for when you have a medium to low number of
groups. The groups must also imperatively be consumed (such as by a
flatMap) so that groupBy continues fetching data from upstream and
feeding more groups. Sometimes, these two constraints multiply and
lead to hangs, such as when you have a high cardinality and the
concurrency of the flatMap consuming the groups is too low.
however, it is not clear to me what was meant in the documentation, although I think my case is described here, but I am not sure about it.
Мaybe anyone has any ideas why freezes occur, how it could be fixed?
2
Answers
I organized the filter like this :
As you found out, grouping doesn’t work for a flow with many groups. So I suggest avoiding the groups at all.
As your goal is just to find any user with an email it’s simple:
If you want to get emails for all users you don’t need a
groupBy
too, just usedistinct
: