skip to Main Content

I’m calculating a simple mean on some records, using different windows sizes. Using 1 hour and 1 week windows there are no problems, and the results are computed correctly.

var keyed = src
        .filter(event -> event.getSensor_id() < 10000)
        .keyBy(Event::getSensor_id);

var hourResult = keyed
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new AvgQ1(Config.HOUR))
        .setParallelism(5);

var weekResult = keyed
        .window(TumblingEventTimeWindows.of(Time.days(7)))
        .aggregate(new AvgQ1(Config.WEEK))
        .setParallelism(5);

Instead, using a window of 1 month (31 days), the window is splitted in half, and flink gave as output two results, one for records from 05-1 to 05-14 and one for records from 05-15 to 05-31.

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(31)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);

Using a window of size 30 days, the result is instead splitted into (05-1;05-27) and (05-28;05-31).

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(30)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);

This is the AggregateFunction.

public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
    String windowType;
    public AvgQ1(String windowType) {
        this.windowType = windowType;
    }

    public AccumulatorQ1 createAccumulator() {
        return new AccumulatorQ1();
    }

    @Override
    public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
        acc.sum += values.getTemperature();
        acc.sensor_id = values.getSensor_id();
        acc.last_timestamp = values.getTimestamp();
        acc.count++;
        return acc;
    }

    @Override
    public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    @Override
    public OutQ1 getResult(AccumulatorQ1 acc) {
        double mean = acc.sum / (double) acc.count;
        OutQ1 result = new OutQ1(windowType);
        result.setSensor_id(acc.sensor_id);
        result.setTemperature(mean);
        result.setOccurrences(acc.count);
        if (windowType.equals(Config.HOUR)) {
            result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.WEEK)) {
            result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
        }
        if (windowType.equals(Config.MONTH)) {
            result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
        }
        return result;
    }
}

I think that the problem is somehow related to memory usage, as if the accumulator or the window couldn’t hold too much data. So i tried to monitor jvm heap usage in WebUI, but it does not crosses the limit, and also change the backend state from hash to rockdb.

I’m using Flink on docker, reading DataStream from a kafka topic, any idea?

3

Answers


  1. Chosen as BEST ANSWER

    Solution

    Thanks to @david-anderson suggestion, i solved the problem. Using a 31 days window for a dataset with values for May 2022, the flink window was starting from 14-04-2022 to 15-05-2022, instead of 01-05-2022 to 31-05-2022. This is because (as @david-anderson said):

    Flink's time-based window divides the time since the Unix epoch (01-01-1970).

    Month Window

    So my solution is based on delaying the start of the window, by applying an offset of 17days (from 14-04 to 01-05) in the window() operator.

    var monthResult = keyed
            .window(TumblingEventTimeWindows.of(Time.days(31),Time.days(17)))
            .aggregate(new AvgQ1(Config.MONTH))
            .name("Monthly Window Mean AggregateFunction");
    

    The offset depends on the specific month, so i used a ProcessWindowFunction just to print start and end of the windows, and then see the difference in days between the window used by flink and the desired window.

    public class DebugProcess extends ProcessWindowFunction<Event, Tuple2<Long,Integer>, Long, TimeWindow> {
        @Override
        public void process(Long sensor_id, ProcessWindowFunction<Event, Tuple2<Long,Integer>, Long, TimeWindow>.Context context, Iterable<Event> iterable, Collector<Tuple2<Long,Integer>> collector) throws Exception {
            Timestamp end = new Timestamp(context.window().getEnd());
            Timestamp start = new Timestamp(context.window().getStart());
            System.out.printf("WINDOW: (%s,%s)n", start,end);
                                               ...
    }
    

    Week Window

    Analyzing more accurately my results, i noticed that there were problems also for one-week query; in particular, the window was starting from 28-04-2022 to 05-05-2022 instead of 01-05-2022 to 08-05-2022. So, also there i applied an offset of 3 days:

    var weekResult = keyed
            .window(TumblingEventTimeWindows.of(Time.days(7),Time.days(3)))
            .aggregate(new AvgQ1(Config.WEEK))
            .name("Weekly Window Mean AggregateFunction");
    

  2. Try to set the buffer timeout to -1 via .setTimeoutBuffer(-1) onExecutionEnvironment

    Login or Signup to reply.
  3. The issue has to do with how Flink’s time-based window assigner works. It divides the time since the Unix epoch (01-01-1970) into equal-sized chunks (windows) of the specified duration, and then assigns incoming events into those chunks (windows).

    Thus with windows that are 30 days long, these windows cover these ranges:

    01-01-1970 thru 30-01-1970
    31-01-1970 thru 01-02-1970
    ...
    29-04-2022 thru 28-05-2022
    29-05-2022 thru 27-06-2022
    ...
    

    This works okay for windows that are one second, one minute, one hour, one day, or even one week long, but for month-long windows it’s not very convenient.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search