While connecting to memorystore through dataflow getting the below exception in the dataflow worker logs.
”’Error message from worker: redis.clients.jedis.exceptions.JedisConnectionException: Unknown reply: redis.clients.jedis.Protocol.process(Protocol.java:140) redis.clients.jedis.Protocol.read(Protocol.java:192) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:316) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:243) redis.clients.jedis.Jedis.ping(Jedis.java:356) org.redis.memorystore.redisconn1.processElement(memorystore.java:39)”’
Please find the code which I am using.
class redisconn1 extends DoFn<String,String>
{
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(redisconn1.class);
@ProcessElement
public void processElement(ProcessContext c) throws ParseException, IOException, InterruptedException
{
String line = c.element();
Jedis jedis = new Jedis("12.08.64.4", 6378);
LOG.info("Connection to server sucessfully");
LOG.info("Server is running: "+jedis.ping());
}
}
public class redisconn
{
public static void main(String[] args)
{
DataflowPipelineOptions options =PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("heg-dev-rtstdo-0");//project_id
options.setRunner(DataflowRunner.class);//Runner type
options.setRegion("us-east4");//region
options.setTemplateLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/redis_template/TemplateNag");
options.setGcpTempLocation("gs://heg-dev-rtstdo-0-devlopment_dataflow/DataflowSinkJobs/TempFolder/redis_temp");
options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("Hello world")).apply(ParDo.of(new redisconn1()));
p.run();
}
}
2
Answers
hh
I have tried with beam.sdk.io to connect to the memory store. But I am still getting the same exception.
Please find the code which I used.
Please find the exception which I am getting.
Please let me know how can we resolve the same.
For external connetions on dataflow it is best to use beam.sdk.io.
For redis
org.apache.beam.sdk.io.redis.RedisIO;
Also the connection must not be opened inside a DoFn, but already upfront via beam.sdk then it will be passed to workers properly.