I am trying to count the number of rows in an input file and I am using Cloud dataflow Runner for creating the template. In the below code, I am reading the file from a GCS bucket, processing it and then storing the output in a Redis instance.
But I am unable to count the number of lines of the input file.
Main Class
public static void main(String[] args) {
/**
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
*/
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
.apply("Transforming data...",
ParDo.of(new DoFn<String, String[]>() {
@ProcessElement
public void TransformData(@Element String line, OutputReceiver<String[]> out) {
String[] fields = line.split("\|");
out.output(fields);
}
}))
.apply("Processing data...",
ParDo.of(new DoFn<String[], KV<String, String>>() {
@ProcessElement
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
if (fields[RedisIndex.GUID.getValue()] != null) {
out.output(KV.of("firstname:"
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("lastname:"
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("dob:"
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("postalcode:"
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));
}
}
}))
.apply("Writing field indexes into redis",
RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
.withEndpoint(options.getRedisHost(), options.getRedisPort()));
p.run();
}
Sample Input File
xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666
Command to execute the pipeline
mvn compile exec:java
-Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore
-Dexec.args="--project=my-project-id
--jobName=dataflow-job
--inputFile=gs://my-input-bucket/*.txt
--redisHost=127.0.0.1
--stagingLocation=gs://pipeline-bucket/stage/
--dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template
--runner=DataflowRunner"
I have tried to use the below code from the StackOverflow solution but it doesn’t work me.
PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
p.apply(TextIO.Read.from("gs://..."))
.apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);
I have gone through the Apache Beam documentation as well but didn’t find anything helpful. Any help on this will be really appreciated.
2
Answers
I resolved this issue by adding the
Count.globally()
and applying to aPCollection<String>
after the pipeline reads the file.I have added the below code:
where I have created a new Class (RecordCount.java) which extends DoFn<Long, Void> which just logs the count.
RecordCount.java
Proper way to do this is to write the count to a storage system using a Beam connector (or using a Beam ParDo). Pipeline result is not directly available to the main program since Beam runner could parallelize computation and execution may not happen in the same computer.
For example (pseudocode):
If you need to handle output directly in the main program, you can read from GCS using a client library after Beam program ends (make sure to specify
p.run().waitUntilFinish()
in this case). Alternatively, you can move your computation (that needs the count) into a BeamPTransform
and make that part of your pipeline.