skip to Main Content

I am new to Reactive Programming. i need to connect to Redis to save and get some data. The redis instance is present in cloud.
Am using Lettuce Connection factory to establish the connection.

when establishing the connection to redis, the request fails.
Here is my Redis configuration class :

package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;

@Configuration
@EnableAsync
public class RedisConfig {
    private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);

    @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
    private String host;

    @Value("${vcap.services.redis.credentials.port:36516}")
    private int port;

    @Value("$vcap.services.redis.credentials.password:123456788")
    private String password;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        redisStandaloneConfiguration.setDatabase(0);
        log.error("Redis standalone configuration{}",JsonUtil.toJsonString(redisStandaloneConfiguration));
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
        lettuceConnectionFactory.afterPropertiesSet();
        return lettuceConnectionFactory;

    }

    @Bean
    ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        Jackson2JsonRedisSerializer<TaxDetails> serializer = new Jackson2JsonRedisSerializer<>(TaxDetails.class);
        Jackson2JsonRedisSerializer<TaxLine> serializer1 = new Jackson2JsonRedisSerializer<>(TaxLine.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
                .newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(serializer).value(serializer1).build();
        ;
        return new ReactiveRedisTemplate<>(
                reactiveRedisConnectionFactory, context);
    }
}

and here is my look up service class which actually communicates with redis during the request


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Service;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.RedisRepo;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class RedisTaxLineLookUpService {
    private static final Logger log = LoggerFactory.getLogger(RedisTaxLineLookUpService.class);

    @Autowired
    private ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations;

    public Flux<TaxLine> get(TaxDetails taxDetails) {

        log.info("going to call redis to fetch tax lines{}", JsonUtil.toJsonString(taxDetails));
        return redisOperations.keys(taxDetails).flatMap(redisOperations.opsForValue()::get);

    }

    public Mono<RedisRepo> set(RedisRepo redisRepo) {
        log.info("going to call redis to save tax lines{}", JsonUtil.toJsonString(redisRepo.getTaxDetails()));
        return redisOperations.opsForValue().set(redisRepo.getTaxDetails(), redisRepo.getTaxLine())
                .map(__ -> redisRepo);
    }

}

Stack trace :

2020-03-26T16:27:54.513+0000 [APP/PROC/WEB/0] OUT org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 10.11.241.101:36516 | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: | Error has been observed at the following site(s): | |_ checkpoint ? Handler com.sap.slh.tax.attributes.determination.springwebfluxdemo.controller.TaxLinesDeterminationController#saveTaxLines(RedisRepo) [DispatcherHandler] | |_ checkpoint ? HTTP POST "/tax/lines/save/" [ExceptionHandlingWebHandler] | Stack trace: | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1178) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedReactiveConnection(LettuceConnectionFactory.java:952) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:429) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:94) | at org.springframework.data.redis.core.ReactiveRedisTemplate.lambda$doInConnection$0(ReactiveRedisTemplate.java:198) | at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:85) | at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80) | at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) | at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) | at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) | at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) | at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) | at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) | at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) | at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) | at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) | at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) | at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:489) | at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)

Any suggestions or answers would be highly helpful ! Thanks in Advance !

3

Answers


  1. Chosen as BEST ANSWER

    i updated my RedisConfig class as follows :

    
    import java.time.Duration;
    import java.util.List;
    import java.util.stream.Collectors;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisConfiguration;
    import org.springframework.data.redis.connection.RedisNode;
    import org.springframework.data.redis.connection.RedisPassword;
    import org.springframework.data.redis.connection.RedisSentinelConfiguration;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
    import org.springframework.data.redis.core.ReactiveRedisOperations;
    import org.springframework.data.redis.core.ReactiveRedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializationContext;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
    
    import io.lettuce.core.RedisURI;
    import io.pivotal.cfenv.core.CfEnv;
    
    @Configuration
    public class RedisConfig {
    
        CfEnv cfEnv = new CfEnv();
        String tag = "redis";
        String redisHost = cfEnv.findCredentialsByTag(tag).getHost();
    
        @Bean
        @Primary
        public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
            LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                    .commandTimeout(Duration.ofMillis(60000)).build();
            return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
        }
    
        @Bean
        public RedisConfiguration defaultRedisConfig() {
            if (redisHost != null) {
    //          RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("127.0.0.1", 6379);
                RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
                String redisPort = cfEnv.findCredentialsByTag(tag).getPort();
                String redisPassword = cfEnv.findCredentialsByTag(tag).getPassword();
                config.setHostName(redisHost);
                config.setPassword(RedisPassword.of(redisPassword));
                config.setPort(Integer.parseInt(redisPort));
                config.setDatabase(2);
                return config;
            } else {
                RedisSentinelConfiguration config = new RedisSentinelConfiguration();
                String uri = cfEnv.findCredentialsByTag(tag).getUri();
                RedisURI redisURI = RedisURI.create(uri);
                config.master(redisURI.getSentinelMasterId());
                List<RedisNode> nodes = redisURI.getSentinels().stream()
                        .map(redisUri -> populateNode(redisUri.getHost(), redisUri.getPort())).collect(Collectors.toList());
                nodes.forEach(node -> config.addSentinel(node));
                config.setPassword(RedisPassword.of(redisURI.getPassword()));
                config.setDatabase(2);
                return config;
            }
        }
    
        @Bean
        public ReactiveRedisOperations<TaxDetails, TaxLine> reactiveRedisTemplate(
            ReactiveRedisConnectionFactory factory) {
            StringRedisSerializer keySerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer<TaxLine> valueSerializer = new Jackson2JsonRedisSerializer<>(
                TaxLine.class);
            Jackson2JsonRedisSerializer<TaxDetails> valueSerializer1 = new Jackson2JsonRedisSerializer<>(
                    TaxDetails.class);
            RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
                .newSerializationContext(keySerializer);
            RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(valueSerializer1).value(valueSerializer).build();
            return new ReactiveRedisTemplate<>(factory, context);
        }
    
        private RedisNode populateNode(String host, Integer port) {
            return new RedisNode(host, port);
        }
    
    }
    

    dependencies for cfEnv:

                <groupId>io.pivotal.cfenv</groupId>
                <artifactId>java-cfenv-boot</artifactId>
                <version>2.1.1.RELEASE</version>
            </dependency>
    

  2. I use this RedisConfig.java and it works for me.

    @Configuration
    @ConfigurationProperties(prefix = "spring.redis")
    @Setter
    public class RedisConfig {
    
        private String host;
        private String password;
    
        @Bean
        @Primary
        public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
            LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                    .useSsl().build();
            return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
        }
    
        @Bean
        public RedisConfiguration defaultRedisConfig() {
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            config.setHostName(host);
            config.setPassword(RedisPassword.of(password));
            return config;
        }
    }
    
    Login or Signup to reply.
  3. I had similar problem with Redis running on AWS (EC2 instance). It works after:

    sudo vi /etc/redis/redis.conf

    1. Comment line: bind 127.0.0.1 ::1
    2. Set the line protected-mode no
    3. Set the line supervised systemd
    4. sudo systemctl restart redis.service
    5. Check the AWS security groups just in case.
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search