skip to Main Content

I want to connect a client on a server to receive a Flux of some entity and keep then connected (In a non-blocking + Assynchronous fashion) in order to receive updates or new insertions in the same Flux.

In other words I want that DB inform the server when there are some update or new data. The server to inform the client. All in a assynchronous + non-blocking fashion.

I tried with spring-boot-starter-webflux, with plain http, also tried spring-boot-starter-rsocket, with rsocket.

Tried with redis, mongo and now postgresql.

The only way it worked almost as expected was with MongoDB capped colelction + tailable stream. But in this way the collection is capped and I can’t edit an existing entry.

Are you aware about some approach to accomplish that?

Thank you very much!

2

Answers


  1. Use the spring-boot-starter-rsocket, create a MessageMapping endpoint and the server listen to a topic with reactive-redis, thus there are message data channel.

    Login or Signup to reply.
  2. Postgres includes a Notifier/Listener pattern to track the messages on database, for example.

    @Component
    @Slf4j
    class Listener {
        @Autowired
        @Qualifier("pgConnectionFactory")
        ConnectionFactory pgConnectionFactory;
    
        PostgresqlConnection receiver;
    
        @PostConstruct
        public void initialize() throws InterruptedException {
            receiver = Mono.from(pgConnectionFactory.create())
                    .cast(PostgresqlConnection.class)
                    .block();
    
            receiver.createStatement("LISTEN mymessage")
                    .execute()
                    .flatMap(PostgresqlResult::getRowsUpdated)
                    .log("listen::")
                    .subscribe();
    
            receiver.getNotifications()
                    .delayElements(Duration.ofSeconds(1))
                    .log()
                    .subscribe(
                            data -> log.info("notifications: {}", data)
                    );
        }
    
        @PreDestroy
        public void destroy() {
            receiver.close().subscribe();
        }
    
    }
    

    The whole example is here.

    As you mentioned, with Mongo capped collections, it is easy to emit the item to a reactive Flux sink, check my fullstack(frontend+backend) example of Http/SSE, WebSocket, RSocket.

    You can emit any data to a connnectable flux by your own logic, such as emitting data by fine-grained domain events, this usage is more generic in real world projects.

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class PostService {
        //...
        public Mono<Comment> addComment(CommentInput commentInput) {
            String postId = commentInput.getPostId();
            return this.posts.findById(UUID.fromString(postId))
                    .flatMap(p -> this.comments.create(commentInput.getContent(), UUID.fromString(postId)))
                    .flatMap(id -> this.comments.findById(id).map(COMMENT_MAPPER))
                    .doOnNext(c -> {
                        log.debug("emitting comment: {}", c);
                        sink.emitNext(c, Sinks.EmitFailureHandler.FAIL_FAST);
                    })
                    .switchIfEmpty(Mono.error(new PostNotFoundException(postId)));
        }
    
        private final Sinks.Many<Comment> sink = Sinks.many().replay().latest();
    
        public Flux<Comment> commentAddedEvent() {
            return sink.asFlux();
        }
    }
    

    Any of your client can connect to this commentAddedEvent. For example, the following is using SSE.

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Comment> commentsStream(){
        this.postService.commentAddedEvent();
    }
    

    Similarly, if you are using WebSocket, use a WebSocketHandler to adapt it, and for RSocket, use a controller with messaging mapping route instead.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search