skip to Main Content

everyone!
I got some idea to use gRPC protobuff code generation implementation as a data layer API to use it instead of POJO in RSocket protocol.

Here is the implementation:

syntax = "proto3";

import "google/protobuf/wrappers.proto";

option java_package = "me.some.protoapi";
option java_multiple_files = true;

message ValidationTaskRequest {
    int64 id = 1;
    string name = 2;
}

message ValidationTaskResponse {
    int64 id = 1;
    ValidationStatus status = 2;
    ValidationError error = 3;
}

message ValidationError {
    string reason = 1;
}

enum ValidationStatus {
    PASSED = 0;
    DECLINED = 1;
}

RSocket configuration

@Configuration
public class RSocketConfiguration {

    @Bean
    public RSocket rSocket(@Value("${rsocket.client.port}") int port) {
        return RSocketFactory
            .connect()
            .mimeType(MimeTypeUtils.ALL_VALUE, MimeTypeUtils.ALL_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create(port))
            .start()
            .retry()
            .block();
    }

    @Bean
    public RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(
            rSocket,
            MimeTypeUtils.ALL,
            MimeTypeUtils.ALL,
            rSocketStrategies
        );
    }

}

and the service itself

@Service
public class ValidationServiceImpl implements ValidationService {

    private final Logger logger = LoggerFactory.getLogger(ValidationServiceImpl.class);

    private final TaskService taskService;
    private final ReactiveRedisTemplate<String, Task> redis;
    private final RSocketRequester rSocketRequester;
    private final RedisTopicHelper redisHelper;

    @Value("${rsocket.routes.validation}")
    private String rSocketValidationRoute;

    @Value("${validation.interval}")
    private Optional<Integer> validationInterval;

    public ValidationServiceImpl(TaskService taskService, ReactiveRedisTemplate<String, Task> redis, RSocketRequester rSocketRequester, RedisTopicHelper redisHelper) {
        this.taskService = taskService;
        this.redis = redis;
        this.rSocketRequester = rSocketRequester;
        this.redisHelper = redisHelper;
    }

    @Override
    public void startValidationProcess() {
        logger.info("validation listener started");

        Flux.interval(Duration.ofMillis(validationInterval.orElse(1000)))
            .flatMap(i -> redis.keys(redisHelper.topicAllKeys()))
            .flatMap(redis.opsForValue()::get)
            .filter(it -> !it.isVerified())
            .flatMap(this::requestValidation)
            .log()
            .metrics()
            .subscribe(result -> {
                if ( result.getError() != null ) {
                    logger.error(result.getError().getReason());
                } else {
                    taskService.markTaskAsVerified(result.getId(), result.getStatus());
                    redis.opsForValue().delete(redisHelper.specifiedTopicWithId(result.getId()));
                }
            });
    }

    private Mono<ValidationTaskResponse> requestValidation(Task task) {
        return rSocketRequester
            .route(rSocketValidationRoute)
            .data(
                ValidationTaskRequest
                    .newBuilder()
                    .setId(task.getId())
                    .setName(task.getName())
            )
            .retrieveMono(ValidationTaskResponse.class);
    }

}

But, when the spring boot service is starting I caught exception

java.lang.IllegalArgumentException: No decoder for me.some.protoapi.ValidationTaskResponse
    at org.springframework.messaging.rsocket.RSocketStrategies.decoder(RSocketStrategies.java:92) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:274) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:258) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at me.some.test.api.services.validation.ValidationServiceImpl.requestValidation(ValidationServiceImpl.java:73) ~[main/:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:530) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:972) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:888) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:281) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:756) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

As you can see, the problem is in decoding proto-models in RSocket transport layer. I know, that gRPC its a binary serializing and it has some magic with binary description below. Has anyone tried to bing this two technologies? Any ideas would be very helpful. Thanks.

2

Answers


  1. I have never faced this problem, but I’ve configurated my receiver and requester for an JSON payload.
    Could you try to set an octet stream mime type for example?

        @Bean
        RSocket rSocket() {
            return RSocketFactory
                    .connect()
                    .metadataMimeType("message/x.rsocket.composite-metadata.v0")
                    .frameDecoder(PayloadDecoder.ZERO_COPY)
                    .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
                    .transport(TcpClientTransport.create(ztlServerHostname, port))
                    .start()
                    .block();
        }
    
    
    
        @Bean
        RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
            return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
                    MimeTypeUtils.parseMimeType("message/x.rsocket.composite-metadata.v0"),
                    rSocketStrategies);
        }
    

    The full code is here

    Login or Signup to reply.
  2. May be a ProtobufDecoder() can help you when registered in rsocketStrategies

    rsocketRequesterBuilder
      .rsocketStrategies(b -> b.decoder(new ProtobufDecoder()))
         .connectTcp("localhost",7000)
         .block();
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search