skip to Main Content

It is necessary to filter the flow of values from the database (filtering must be performed in a reactive, non-blocking style). You need to get the incoming element, take the email field from it and validate this value. After the first correct result. Stop processing.

However. to get a valid value, you may have to process several thousand elements before you get the first suitable value.

public class User {

    @Id
    Long id;

    String email;
    
    String phoneHome;
}

there is a search by phone number

private Mono<Response> findUserByHomePhone(Response response) {

    return Mono.just(response)
            .flatMap(this::retrieveUserList);
}
private Mono<Response> retrieveUserList (Response response) {

    String phone = retrievePhoneFromResponse (response);
  
    return Mono.from(userService.getByPhone (phone)
                    .groupBy(Customer::getId)
                    .flatMap(this::processGroupedObjects)
                    .switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response)))
            .as(Log.of(log,
                    "Search by phone {}",
                    phone (response))::info);
}
  • userService.getByPhone (phone) – through the service class (@Transactional is installed above it), the method defined in ReactiveCrudRepository is accessed, This method initiates a request to the database. Request of the type:
select * from users by phone = $1
  • .groupBy(Customer::getId) – we iterate through the flow of elements, each element of the flow is a User. We perform grouping by id.

  • .flatMap(this::processGroupedObjects)

    • processing grouped objects. this is where freezes are possible. in Debug mode, I couldn’t check all the elements. since the first few thousand elements have as email -> null.
  • .switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response))) – as soon as there is a user whose email field is filled in, we return the result.

private Mono< Response > getFirsFoundElement(Signal<? extends User> signal, Response response) {

    boolean isFoundElement = signal.hasValue();

    if (isFoundElement) {
        return Mono.just(response);
    }

    return Mono.error(new Exception());
}


private Flux<User> processGroupedObjects(GroupedFlux<Long, User> group) {

    return group
            .mapNotNull(this::checkEmailOnNull);
}

private User checkEmailOnNull (User user) {

    String email = user.getEmail();

    if (Strings.isEmpty(email)){
        return null;
    }

    return user;
}

However, approximately at this point (process Group Objects(GroupedFlux<Lang, User> group)) – a freezes occurs, I do not observe errors in the console.

The number of source elements in the database is from 4 000 to 25 000 (the algorithm worked for 3000).

I found this:
GroupedFlux

Grouping is best suited for when you have a medium to low number of
groups. The groups must also imperatively be consumed (such as by a
flatMap) so that groupBy continues fetching data from upstream and
feeding more groups. Sometimes, these two constraints multiply and
lead to hangs, such as when you have a high cardinality and the
concurrency of the flatMap consuming the groups is too low.

however, it is not clear to me what was meant in the documentation, although I think my case is described here, but I am not sure about it.

Мaybe anyone has any ideas why freezes occur, how it could be fixed?

2

Answers


  1. Chosen as BEST ANSWER

    I organized the filter like this :

    
     private Mono<Response> filterUsersByEmail(Response response) {
    
            String phone = retrieveMobilePhoneFromResponse(response);
    
           return userService
                    .findByMobilePhone(phone)
                    .filter(user -> isaBoolean(user))
                    .next()
                    .switchIfEmpty(
                            Mono.defer(() -> Mono.error(
                            new Exception())
                    )
                    )
                   .flatMap(user -> Mono.just(response));
    
    }
    
     private static boolean isaBoolean(User user) {
    
            String email = user.getEmail();
            boolean b = !Strings.isEmpty(email);
            return b;
        }
    
    

  2. As you found out, grouping doesn’t work for a flow with many groups. So I suggest avoiding the groups at all.

    As your goal is just to find any user with an email it’s simple:

    userService.getByPhone (phone)
      .filter((user) -> user.getEmail() != null)
      .next()
    

    If you want to get emails for all users you don’t need a groupBy too, just use distinct:

    userService.getByPhone (phone)
      .filter((user) -> user.getEmail() != null)
      .distinct((user) -> user.getId()) 
      
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search