I’m calculating a simple mean on some records, using different windows sizes. Using 1 hour and 1 week windows there are no problems, and the results are computed correctly.
var keyed = src
.filter(event -> event.getSensor_id() < 10000)
.keyBy(Event::getSensor_id);
var hourResult = keyed
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AvgQ1(Config.HOUR))
.setParallelism(5);
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7)))
.aggregate(new AvgQ1(Config.WEEK))
.setParallelism(5);
Instead, using a window of 1 month (31 days), the window is splitted in half, and flink gave as output two results, one for records from 05-1 to 05-14 and one for records from 05-15 to 05-31.
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Using a window of size 30 days, the result is instead splitted into (05-1;05-27) and (05-28;05-31).
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(30)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
This is the AggregateFunction
.
public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
String windowType;
public AvgQ1(String windowType) {
this.windowType = windowType;
}
public AccumulatorQ1 createAccumulator() {
return new AccumulatorQ1();
}
@Override
public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
acc.sum += values.getTemperature();
acc.sensor_id = values.getSensor_id();
acc.last_timestamp = values.getTimestamp();
acc.count++;
return acc;
}
@Override
public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public OutQ1 getResult(AccumulatorQ1 acc) {
double mean = acc.sum / (double) acc.count;
OutQ1 result = new OutQ1(windowType);
result.setSensor_id(acc.sensor_id);
result.setTemperature(mean);
result.setOccurrences(acc.count);
if (windowType.equals(Config.HOUR)) {
result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
}
if (windowType.equals(Config.WEEK)) {
result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
}
if (windowType.equals(Config.MONTH)) {
result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
}
return result;
}
}
I think that the problem is somehow related to memory usage, as if the accumulator or the window couldn’t hold too much data. So i tried to monitor jvm heap usage in WebUI, but it does not crosses the limit, and also change the backend state from hash to rockdb.
I’m using Flink on docker, reading DataStream from a kafka topic, any idea?
3
Answers
Solution
Thanks to @david-anderson suggestion, i solved the problem. Using a 31 days window for a dataset with values for May 2022, the flink window was starting from
14-04-2022
to15-05-2022
, instead of01-05-2022
to31-05-2022
. This is because (as @david-anderson said):Month Window
So my solution is based on delaying the start of the window, by applying an offset of 17days (from 14-04 to 01-05) in the
window()
operator.The offset depends on the specific month, so i used a
ProcessWindowFunction
just to print start and end of the windows, and then see the difference in days between the window used by flink and the desired window.Week Window
Analyzing more accurately my results, i noticed that there were problems also for one-week query; in particular, the window was starting from
28-04-2022
to05-05-2022
instead of01-05-2022
to08-05-2022
. So, also there i applied an offset of 3 days:Try to set the buffer timeout to
-1
via.setTimeoutBuffer(-1)
onExecutionEnvironment
The issue has to do with how Flink’s time-based window assigner works. It divides the time since the Unix epoch (01-01-1970) into equal-sized chunks (windows) of the specified duration, and then assigns incoming events into those chunks (windows).
Thus with windows that are 30 days long, these windows cover these ranges:
This works okay for windows that are one second, one minute, one hour, one day, or even one week long, but for month-long windows it’s not very convenient.