skip to Main Content

I am trying to count the number of rows in an input file and I am using Cloud dataflow Runner for creating the template. In the below code, I am reading the file from a GCS bucket, processing it and then storing the output in a Redis instance.

But I am unable to count the number of lines of the input file.

Main Class

 public static void main(String[] args) {
    /**
     * Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
     */
    StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StorageToRedisOptions.class);

    Pipeline p = Pipeline.create(options);
    p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
            .apply("Transforming data...",
                    ParDo.of(new DoFn<String, String[]>() {
                        @ProcessElement
                        public void TransformData(@Element String line, OutputReceiver<String[]> out) {
                            String[] fields = line.split("\|");
                            out.output(fields);
                        }
                    }))
            .apply("Processing data...",
                    ParDo.of(new DoFn<String[], KV<String, String>>() {
                        @ProcessElement
                        public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
                            if (fields[RedisIndex.GUID.getValue()] != null) {

                                out.output(KV.of("firstname:"
                                        .concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("lastname:"
                                        .concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("dob:"
                                        .concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("postalcode:"
                                        .concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));

                            }
                        }
                    }))
            .apply("Writing field indexes into redis",
            RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
                    .withEndpoint(options.getRedisHost(), options.getRedisPort()));
    p.run();

}

Sample Input File

xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666

Command to execute the pipeline

mvn compile exec:java 
  -Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore 
  -Dexec.args="--project=my-project-id 
  --jobName=dataflow-job 
  --inputFile=gs://my-input-bucket/*.txt 
  --redisHost=127.0.0.1 
  --stagingLocation=gs://pipeline-bucket/stage/ 
  --dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template 
  --runner=DataflowRunner"

I have tried to use the below code from the StackOverflow solution but it doesn’t work me.

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

I have gone through the Apache Beam documentation as well but didn’t find anything helpful. Any help on this will be really appreciated.

2

Answers


  1. Chosen as BEST ANSWER

    I resolved this issue by adding the Count.globally() and applying to a PCollection<String> after the pipeline reads the file.

    I have added the below code:

    PCollection<String> lines = p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()));
    
     lines.apply(Count.globally()).apply("Count the total records", ParDo.of(new RecordCount()));
    

    where I have created a new Class (RecordCount.java) which extends DoFn<Long, Void> which just logs the count.

    RecordCount.java

    import org.apache.beam.sdk.transforms.DoFn;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class RecordCount extends DoFn<Long, Void> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);
    
        @ProcessElement
        public void processElement(@Element Long count) {
           LOGGER.info("The total number of records in the input file is: ", count);
    
            }
        }
    
    }
    

  2. Proper way to do this is to write the count to a storage system using a Beam connector (or using a Beam ParDo). Pipeline result is not directly available to the main program since Beam runner could parallelize computation and execution may not happen in the same computer.

    For example (pseudocode):

        p.apply(TextIO.Read.from("gs://..."))
         .apply(Count.<String>globally())
         .apply(ParDo(MyLongToStringParDo()))
         .apply(TextIO.Write.to("gs://..."));
    

    If you need to handle output directly in the main program, you can read from GCS using a client library after Beam program ends (make sure to specify p.run().waitUntilFinish() in this case). Alternatively, you can move your computation (that needs the count) into a Beam PTransform and make that part of your pipeline.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search