skip to Main Content

While connecting to memorystore through dataflow getting the below exception in the dataflow worker logs.

”’Error message from worker: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: redis.clients.jedis.Protocol.process(Protocol.java:140) redis.clients.jedis.Protocol.read(Protocol.java:192) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:243) redis.clients.jedis.Jedis.ping(Jedis.java:356) org.redis.memorystore.redisconn1.processElement(memorystore.java:39)”’

Please find the code which I am using.

class redisconn1 extends DoFn<String,String>
{
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = LoggerFactory.getLogger(redisconn1.class);
        @ProcessElement
        public void processElement(ProcessContext c) throws ParseException, IOException, InterruptedException
        {
                String line = c.element();
                Jedis jedis = new Jedis("12.08.64.4", 6378);
                LOG.info("Connection to server sucessfully");
                LOG.info("Server is running: "+jedis.ping());

        }
}
public class redisconn
{
        public static void main(String[] args)
        {
                DataflowPipelineOptions options =PipelineOptionsFactory.as(DataflowPipelineOptions.class);
                options.setProject("heg-dev-rtstdo-0");//project_id
                options.setRunner(DataflowRunner.class);//Runner type
                options.setRegion("us-east4");//region
                options.setTemplateLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/redis_template/TemplateNag");
                options.setGcpTempLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/TempFolder/redis_temp");
                options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
                Pipeline p = Pipeline.create(options);
                
                p.apply(Create.of("Hello world")).apply(ParDo.of(new redisconn1()));

                p.run();

        }
}

2

Answers


  1. Chosen as BEST ANSWER

    hh

    I have tried with beam.sdk.io to connect to the memory store. But I am still getting the same exception.

    Please find the code which I used.

    public class memorystore 
    {
    
        public static void main(String[] args) 
        {
            DataflowPipelineOptions options =PipelineOptionsFactory.as(DataflowPipelineOptions.class);
            options.setProject("vz-it-np-gh2v-dev-rtstdo-0");//project_id
            options.setRunner(DataflowRunner.class);//Runner type
            options.setRegion("us-east4");//region
            options.setTemplateLocation("gs://vz-it-np-gh2v-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/redis_template/TemplateNag");
            options.setGcpTempLocation("gs://vz-it-np-gh2v-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/TempFolder/redis_temp");
            options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
            Pipeline pipeline = Pipeline.create(options);
    
            
            RedisConnectionConfiguration redisConfig = RedisConnectionConfiguration.create()
                                                       .withHost("100.68.64.4")
                                                       .withPort(6378)
                                                       .withAuth("e7cce2ef-7fb1-47ff-bc8f-8bfb5d936b0c");
    
            PCollection<KV<String, String>> output1=pipeline.apply(Create.of(KV.of("a", "1"), KV.of("b", "2"), KV.of("a", "3"), KV.of("c", "4")));
            
            
            output1.apply(ParDo.of(new ToKVFunction()))
                    .apply(RedisIO.write().withMethod(Write.Method.SADD).withConnectionConfiguration(redisConfig));
    
            pipeline.run().waitUntilFinish();
        }
    
        // Function to convert your data to KV format (Key-Value)
        public static class ToKVFunction extends DoFn<KV<String, String>, KV<String, String>> 
        {
            @ProcessElement
            public void processElement(ProcessContext c) {
                KV<String, String> input = c.element();
                String key = "GCP";
                String value = "1";
                c.output(KV.of(key, value));
            }
        }
    }
    

    Please find the exception which I am getting.

    Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:178) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:149) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:67) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:54) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:91) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:109) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:267) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:221) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:147) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:127) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:248) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:80) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:167) ... 15 more Suppressed: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn$DoFnInvoker.invokeTeardown(Unknown Source) at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:56) ... 22 more Caused by: java.lang.NullPointerException at org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn.teardown(RedisIO.java:811) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: redis.clients.jedis.Protocol.process(Protocol.java:140) redis.clients.jedis.Protocol.read(Protocol.java:192) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:243) redis.clients.jedis.Jedis.auth(Jedis.java:2625) org.apache.beam.sdk.io.redis.RedisConnectionConfiguration.connect(RedisConnectionConfiguration.java:143) org.apache.beam.sdk.io.redis.RedisIO$Write$WriteFn.setup(RedisIO.java:684)
    

    Please let me know how can we resolve the same.


  2. For external connetions on dataflow it is best to use beam.sdk.io.
    For redis org.apache.beam.sdk.io.redis.RedisIO;
    Also the connection must not be opened inside a DoFn, but already upfront via beam.sdk then it will be passed to workers properly.

    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.redis.RedisIO;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    
    public class RedisDataflowExample {
        public static void main(String[] args) {
            // Create the pipeline options
            PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    
            // Create the pipeline
            Pipeline pipeline = Pipeline.create(options);
    
            // Define the Redis configuration
            RedisIO.Read<String> readConfig = RedisIO.read().withEndpoint("your.redis.host", 6379);
    
            // Read data from Redis
            pipeline.apply(readConfig).apply(YourDataTransformationLogicHere); // Replace YourDataTransformationLogicHere with your actual transformations
    
            // Run the pipeline
            pipeline.run();
        }
    }
    
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search