everyone!
I got some idea to use gRPC protobuff code generation implementation as a data layer API to use it instead of POJO in RSocket protocol.
Here is the implementation:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option java_package = "me.some.protoapi";
option java_multiple_files = true;
message ValidationTaskRequest {
int64 id = 1;
string name = 2;
}
message ValidationTaskResponse {
int64 id = 1;
ValidationStatus status = 2;
ValidationError error = 3;
}
message ValidationError {
string reason = 1;
}
enum ValidationStatus {
PASSED = 0;
DECLINED = 1;
}
RSocket configuration
@Configuration
public class RSocketConfiguration {
@Bean
public RSocket rSocket(@Value("${rsocket.client.port}") int port) {
return RSocketFactory
.connect()
.mimeType(MimeTypeUtils.ALL_VALUE, MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(port))
.start()
.retry()
.block();
}
@Bean
public RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(
rSocket,
MimeTypeUtils.ALL,
MimeTypeUtils.ALL,
rSocketStrategies
);
}
}
and the service itself
@Service
public class ValidationServiceImpl implements ValidationService {
private final Logger logger = LoggerFactory.getLogger(ValidationServiceImpl.class);
private final TaskService taskService;
private final ReactiveRedisTemplate<String, Task> redis;
private final RSocketRequester rSocketRequester;
private final RedisTopicHelper redisHelper;
@Value("${rsocket.routes.validation}")
private String rSocketValidationRoute;
@Value("${validation.interval}")
private Optional<Integer> validationInterval;
public ValidationServiceImpl(TaskService taskService, ReactiveRedisTemplate<String, Task> redis, RSocketRequester rSocketRequester, RedisTopicHelper redisHelper) {
this.taskService = taskService;
this.redis = redis;
this.rSocketRequester = rSocketRequester;
this.redisHelper = redisHelper;
}
@Override
public void startValidationProcess() {
logger.info("validation listener started");
Flux.interval(Duration.ofMillis(validationInterval.orElse(1000)))
.flatMap(i -> redis.keys(redisHelper.topicAllKeys()))
.flatMap(redis.opsForValue()::get)
.filter(it -> !it.isVerified())
.flatMap(this::requestValidation)
.log()
.metrics()
.subscribe(result -> {
if ( result.getError() != null ) {
logger.error(result.getError().getReason());
} else {
taskService.markTaskAsVerified(result.getId(), result.getStatus());
redis.opsForValue().delete(redisHelper.specifiedTopicWithId(result.getId()));
}
});
}
private Mono<ValidationTaskResponse> requestValidation(Task task) {
return rSocketRequester
.route(rSocketValidationRoute)
.data(
ValidationTaskRequest
.newBuilder()
.setId(task.getId())
.setName(task.getName())
)
.retrieveMono(ValidationTaskResponse.class);
}
}
But, when the spring boot service is starting I caught exception
java.lang.IllegalArgumentException: No decoder for me.some.protoapi.ValidationTaskResponse
at org.springframework.messaging.rsocket.RSocketStrategies.decoder(RSocketStrategies.java:92) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:274) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:258) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at me.some.test.api.services.validation.ValidationServiceImpl.requestValidation(ValidationServiceImpl.java:73) ~[main/:na]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:530) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:972) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:888) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:281) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:756) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
As you can see, the problem is in decoding proto-models in RSocket transport layer. I know, that gRPC its a binary serializing and it has some magic with binary description below. Has anyone tried to bing this two technologies? Any ideas would be very helpful. Thanks.
2
Answers
I have never faced this problem, but I’ve configurated my receiver and requester for an JSON payload.
Could you try to set an octet stream mime type for example?
The full code is here
May be a
ProtobufDecoder()
can help you when registered inrsocketStrategies