skip to Main Content

I am working on a task to clear the cache of memorystore if the input file to be processed by dataflow has data. This means, if the input file has no records, the memorystore won’t be flushed, but the input file has even one record, the memorystore should be flushed and then the input file should be processed.

My dataflow application is a multi-pipeline application which reads, processes and then stores the data in the memorystore. The pipeline is executing successfully. However, the flushing of the memorystore is working but after flushing, the insertion is not happening.

I have written a function that flushes the memorystore after checking if the input file has a record.

FlushingMemorystore.java

package com.click.example.functions;

import afu.org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class FlushingMemorystore {


    private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);

    public static FlushingMemorystore.Read read() {
        return (new AutoValue_FlushingMemorystore_Read.Builder())
                .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PCollection<Long>, PDone> {

        public Read() {
        }

        @Nullable
        abstract RedisConnectionConfiguration connectionConfiguration();

        @Nullable
        abstract Long expireTime();
        abstract FlushingMemorystore.Read.Builder toBuilder();

        public FlushingMemorystore.Read withEndpoint(String host, int port) {
            Preconditions.checkArgument(host != null, "host cannot be null");
            Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
        }

        public FlushingMemorystore.Read withAuth(String auth) {
            Preconditions.checkArgument(auth != null, "auth cannot be null");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
        }

        public FlushingMemorystore.Read withTimeout(int timeout) {
            Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
            return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
        }

        public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
            return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
            Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
            Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
            return this.toBuilder().setExpireTime(expireTimeMillis).build();
        }

        public PDone expand(PCollection<Long> input) {
            Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
            input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
            return PDone.in(input.getPipeline());
        }

        private static class ReadFn extends DoFn<Long, String> {
            private static final int DEFAULT_BATCH_SIZE = 1000;
            private final FlushingMemorystore.Read spec;
            private transient Jedis jedis;
            private transient Pipeline pipeline;
            private int batchCount;

            public ReadFn(FlushingMemorystore.Read spec) {
                this.spec = spec;
            }

            @Setup
            public void setup() {
                this.jedis = this.spec.connectionConfiguration().connect();
            }

            @StartBundle
            public void startBundle() {
                this.pipeline = this.jedis.pipelined();
                this.pipeline.multi();
                this.batchCount = 0;
            }

            @ProcessElement
            public void processElement(DoFn<Long, String>.ProcessContext c) {
                Long count = c.element();
                batchCount++;

                if(count==null && count < 0) {
                    LOGGER.info("No Records are there in the input file");
                } else {
                    if (pipeline.isInMulti()) {
                        pipeline.exec();
                        pipeline.sync();
                        jedis.flushDB();
                    }
                    LOGGER.info("*****The memorystore is flushed*****");
                }
            }

            @FinishBundle
            public void finishBundle() {
                if (this.pipeline.isInMulti()) {
                    this.pipeline.exec();
                    this.pipeline.sync();
                }
                this.batchCount=0;
            }

            @Teardown
            public void teardown() {
                this.jedis.close();
            }

        }

        @AutoValue.Builder
        abstract static class Builder {

            Builder() {
            }

            abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);

            abstract FlushingMemorystore.Read build();

            abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

        }

    }

}

I am using the function in my Starter Pipeline code.

Code snippet of starter pipeline where the function is being used:

 StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(StorageToRedisOptions.class);

        Pipeline p = Pipeline.create(options);

        PCollection<String> lines = p.apply(
                "ReadLines", TextIO.read().from(options.getInputFile()));

        /**
         * Flushing the Memorystore if there are records in the input file
         */
        lines.apply("Checking Data in input file", Count.globally())
                .apply("Flushing the data store", FlushingMemorystore.read()
                        .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(), options.getRedisPort())));

Code snippet for the processed data to be inserted after clearing the cache:

 dataset.apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
                .withMethod(RedisIO.Write.Method.SADD)
                .withConnectionConfiguration(RedisConnectionConfiguration
                        .create(options.getRedisHost(), options.getRedisPort())));

The dataflow executes fine and it flushes the memorystore as well but the insertion is not working after that. Could you please point out where I am going wrong?
Any solution for resolving the issue is truly appreciated. Thanks in advance!

Edit:

Providing additional information as requested in the comments

The runtime used is Java 11, and it is using Apache Beam SDK for 2.24.0

If the input file has records, it will process the data with some logic. For example, if the input file has data like:

abcabc|Bruce|Wayne|2000
abbabb|Tony|Stark|3423

The dataflow will count the number of records which 2 in this case and will process the id, first name, etc. according to the logic, and then it stores in memorystore. This input file will be coming everyday hence, the memorystore should be cleared (or flushed) if the input file has records.

Although the pipeline is not breaking, but I think I am missing out something.

2

Answers


  1. Chosen as BEST ANSWER

    The issue is resolved after I applied to Wait.on transform as Pablo's answer explained it already. However, I had to rewrite my FlushingMemorystore.java a bit to a PCollection for the flushSignal flag.

    Here's the function:

    package com.click.example.functions;
    
    import afu.org.checkerframework.checker.nullness.qual.Nullable;
    import com.google.auto.value.AutoValue;
    import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.Pipeline;
    
    public class FlushingMemorystore extends DoFn<Long, String> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
    
        public static FlushingMemorystore.Read read() {
            return (new AutoValue_FlushingMemorystore_Read.Builder())
                    .setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
        }
    
        @AutoValue
        public abstract static class Read extends PTransform<PCollection<Long>, PCollection<String>> {
    
            public Read() {
            }
    
            @Nullable
            abstract RedisConnectionConfiguration connectionConfiguration();
    
            @Nullable
            abstract Long expireTime();
            abstract FlushingMemorystore.Read.Builder toBuilder();
    
            public FlushingMemorystore.Read withEndpoint(String host, int port) {
                Preconditions.checkArgument(host != null, "host cannot be null");
                Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
                return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
            }
    
            public FlushingMemorystore.Read withAuth(String auth) {
                Preconditions.checkArgument(auth != null, "auth cannot be null");
                return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
            }
    
            public FlushingMemorystore.Read withTimeout(int timeout) {
                Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
                return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
            }
    
            public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
                Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
                return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
            }
    
            public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
                Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
                Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
                return this.toBuilder().setExpireTime(expireTimeMillis).build();
            }
    
            public PCollection<String> expand(PCollection<Long> input) {
                Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
               return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
            }
    
            @Setup
            public Jedis setup() {
                return this.connectionConfiguration().connect();
            }
    
            private static class ReadFn extends DoFn<Long, String> {
                private static final int DEFAULT_BATCH_SIZE = 1000;
                private final FlushingMemorystore.Read spec;
                private transient Jedis jedis;
                private transient Pipeline pipeline;
                private int batchCount;
    
                public ReadFn(FlushingMemorystore.Read spec) {
                    this.spec = spec;
                }
    
                @Setup
                public void setup() {
                    this.jedis = this.spec.connectionConfiguration().connect();
                }
    
                @StartBundle
                public void startBundle() {
                    this.pipeline = this.jedis.pipelined();
                    this.pipeline.multi();
                    this.batchCount = 0;
                }
    
                @ProcessElement
                public void processElement(@Element Long count, OutputReceiver<String> out) {
                    batchCount++;
    
                    if(count!=null && count > 0) {
                        if (pipeline.isInMulti()) {
                            pipeline.exec();
                            pipeline.sync();
                            jedis.flushDB();
                            LOGGER.info("*****The memorystore is flushed*****");
                        }
                        out.output("SUCCESS");
                    } else {
                        LOGGER.info("No Records are there in the input file");
                        out.output("FAILURE");
                    }
    
                }
    
                @FinishBundle
                public void finishBundle() {
                    if (this.pipeline.isInMulti()) {
                        this.pipeline.exec();
                        this.pipeline.sync();
                    }
                    this.batchCount=0;
                }
    
                @Teardown
                public void teardown() {
                    this.jedis.close();
                }
    
            }
    
            @AutoValue.Builder
            abstract static class Builder {
    
                Builder() {
                }
    
                abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
    
                abstract FlushingMemorystore.Read build();
    
              abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
    
            }
    
        }
    
    }
    

  2. I suspect the problem here is that you need to ensure the "Flush" step runs (and completes) before the RedisIO.write step happens. Beam has a Wait.on transform that you can use for this.

    To accomplish this, we can use the output from the flushing PTransform as a signal that we’ve flushed the database – and we only write to the database after we are done flushing. The process call for your flushing DoFn would look like this:

    @ProcessElement
    public void processElement(DoFn<Long, String>.ProcessContext c) {
        Long count = c.element();
    
        if(count==null && count < 0) {
           LOGGER.info("No Records are there in the input file");
        } else {
           if (pipeline.isInMulti()) {
               pipeline.exec();
               pipeline.sync();
               jedis.flushDB();
           }
           LOGGER.info("*****The memorystore is flushed*****");
       }
       c.output("READY");
    }
    

    Once we have a signal pointing that the database has been flushed, we can use it to wait before writing the new data to it:

    Pipeline p = Pipeline.create(options);
    
    PCollection<String> lines = p.apply(
            "ReadLines", TextIO.read().from(options.getInputFile()));
    
    /**
     * Flushing the Memorystore if there are records in the input file
     */
    PCollection<String> flushedSignal = lines
         .apply("Checking Data in input file", Count.globally())
         .apply("Flushing the data store", FlushingMemorystore.read()
                         .withConnectionConfiguration(RedisConnectionConfiguration
                         .create(options.getRedisHost(), options.getRedisPort())));
    
    // Then we use the flushing signal to start writing to Redis:
    
    dataset
        .apply(Wait.on(flushedSignal))
        .apply(SOME_DATASET_TRANSFORMATION, RedisIO.write()
                    .withMethod(RedisIO.Write.Method.SADD)
                    .withConnectionConfiguration(RedisConnectionConfiguration
                            .create(options.getRedisHost(), options.getRedisPort())));
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search