skip to Main Content

I’m trying to get the average number of words and characters in each RDD I pull from the Twitter API using spark in Java 8. However, I’m having trouble using streams to achieve this. My code is as follows:

//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
        return avgWc, avgCc;});

The error I’m getting is that the return type expected for foreachRDD is void and my return is a long format.

How can I get around this? Is there another way I need to approach this?

2

Answers


  1. There is no possibility to return data if the return type is void.
    You can create a list outside the "foreachRDD" function and pass the values as shown below:

    List<Data> listData=new ArrayList();
    statuses.foreachRDD(rdd -> {
                long c = rdd.count();
                long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
                long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
                long avgWc = wc / c;
                long avgCc = cc / c;
                System.out.println(wc / c);
                System.out.println(cc / c);
                Data data=new Data();
                data.setAvgCc(avgCc);
                data.setAvgWc(avgWc);
                listData.add(data);
            });
    

    Data is a class with two variables avgCc and AvgWc as shown below

    public class Data {
        long avgWc;
        long avgCc;
        public long getAvgWc() {
            return avgWc;
        }
        public void setAvgWc(long avgWc) {
            this.avgWc = avgWc;
        }
        public long getAvgCc() {
            return avgCc;
        }
        public void setAvgCc(long avgCc) {
            this.avgCc = avgCc;
        }
        public Data(long avgWc, long avgCc) {
            super();
            this.avgWc = avgWc;
            this.avgCc = avgCc;
        }
        public Data() {
            super();
        }
    }
    

    Please let me know if this helps. Or you need more clarifications.

    Login or Signup to reply.
  2. A possible solution would be to use JavaDStream.transform. This functions allows to stay within the SparkStreaming-API:

    JavaDStream<String> statuses = ...
    JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
                long c = rdd.count();
                long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
                long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
                long avgWc = wc / c;
                long avgCc = cc / c;
                //System.out.println(wc / c);
                //System.out.println(cc / c);
                return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
            }
    );
    avgs.print();
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search