8. Flink 窗口计算API

8. Flink 窗口计算API

8.1 窗口(window)概念

窗口,就是把无界的数据流,依据一定规则划分成一段一段的有界数据流;
既然划分成有界数据段,通常都是为了”聚合”;

Flink图29

Keyed Window重要特性:任何一个窗口,都绑定在自己所属的key上,不同key的数据肯定不会划分到相同窗口中去!

8.2 窗口细分类型

滚动窗口
Flink图30

滚动窗口,是滑动窗口的特例,可以用滑动窗口来表达。(窗口长度 = 滑动步长)

滑动窗口
Flink图31
会话窗口
Flink图32

会话窗口的触发机制,当Flink检查到前后两条数据中间间隔的时长超过了你指定的间隔时长,那么后面的数据就会进入一个新的窗口,而前面的窗口就闭合触发计算了。

8.3 窗口计算API模板

下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(…)后再调用 window(…) , 而 non-keyed streams 只用直接调用 windowAll(…)。留意这个区别,它能帮我们更好地理解后面的内容。

Keyed Windows
1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
Non-Keyed Windows
1
2
3
4
5
6
7
8
stream
.windowAll(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (else default trigger)
[.evictor(...)] <- 可选项:"evictor" (else no evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (else zero)
[.sideOutputLateData(...)] <- 可选项:"output tag" (else no side output for late data)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"

小乱序,利用watermark的容错时间来解决
中等乱序,利用窗口允许迟到机制[.allowedLateness(…)] (即使时间到了,触发计算了,但计算完后依然保留桶,目的是等待可能迟到的数据,重新计算一次)
大乱序,利用窗口中的迟到数据侧流输出机制[.sideOutputLateData(…)] (如果有迟到严重的数据,那用第二种方案也不会触发计算,因为桶不可能无限保留,那么这种迟到严重的数据,Flink也不会至于丢掉,Flink会把它输出到侧流中去,用户可以get侧流中的迟到严重的数据,至于拿到之后怎么办,就由用户自己搞了)

8.4 窗口指派API

详见
https://www.bilibili.com/video/BV1K44y1g7wA?p=72&vd_source=26668f0ed33317a00612f0d4c98799c9
P73

8.5 窗口聚合算子

8.5.1 两类窗口聚合算子的区别

窗口聚合算子,整体上分两类
  • 增量聚合算子,如min、max、minBy、maxBy、sum、reduce、aggregate
  • 全量聚合算子,如apply、process
两类聚合算子的底层区别
  • 增量聚合:一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果;
  • 全量聚合:数据”攒”在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数;
Flink图33

8.5.2 各种聚合算子代码示例

详见
https://www.bilibili.com/video/BV1K44y1g7wA?p=72&vd_source=26668f0ed33317a00612f0d4c98799c9
P69 - P72

8.6 数据延迟处理

延迟处理的方案
  • 小乱序,用watermark容错,(减慢时间的推进,让本已经迟到的数据被认为没有迟到)
  • 中等乱序,用allowedLateness(允许一定限度内的迟到,并对迟到数据重新触发窗口计算)
  • 大乱序,用sideOutputLateData (将超出allowedLateness的迟到数据输出到一个侧流中)
代码示例
1
2
3
4
5
6
7
8
SingleOutputStreamOperator<String> stream = watermarkedBeanStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(5)))
.allowedLateness(Time.milliseconds(2000)) //允许迟到2秒,默认是0
.sideOutputLateData(lateTag) //超过迟到最大允许时间的数据,收集到侧流
.apply();

//获取侧流,做一些自己的补救
stream.getSideOutput(lateTag).print();

注意正确理解延迟时间!
如: allowedLateness(2s) 表示:
如果watermark(此刻的事件时间)推进到了A窗口结束点后2s,还来A窗口的数据,就算迟到,不会再触发A窗口的计算,而是输出到侧流。

8.7 窗口触发机制

窗口计算的触发,是由Trigger类来决定;
Flink中为各类内置的WindowAssigner都设计了对应的默认Trigger;

Flink图34

一般情况下不需要自己去重写Trigger,除非有特别的需求;
Evictor是窗口触发前,或者触发后,对窗口中的数据移除的机制;