7. Flink 事件时间语义中的watermark

7. Flink 事件时间语义中的watermark

7.1 事件时间推进的尴尬

由于在事件时间语义的世界观中,时间是由流入系统的数据(事件)而推进的;
而事件时间,并不能像处理时间那样,由宇宙客观规律以恒定速度,不可停滞地推进;
从而,在事件时间语义的世界观中,时间的推进,并不是一件显而易见的事情;

  • 场景1:
Flink图15

数据时间存在乱序的可能性,但时光不能倒流啊!

  • 场景2:
Flink图16

下游分区接受上游多个分区的数据,数据时间错落有致,那以谁为准?

7.2 watermark推进时间

  • 所谓watermark,就是在事件时间语义世界观中,用于单调递增向前推进时间的一种机制;
  • 它的核心机制是在数据流中周期性的插入一种时间戳单调递增的特殊数据元素(watermark),来不可逆转的在整个数据流中进行时间的推进;
  • watermark中的时间戳到了哪里,算子的时间就推进到了哪里;

Watermark是从某一个算子实例(源头)开始,根据数据中的事件时间,来周期性的产生,并插入到数据流中,持续不断的往下游传递,以推进整个计算链条上各个算子实例的时间!

1
2
//watermark的生成周期(默认值即为200ms)
env.getConfig().setAutoWatermarkInterval(200);
Watermark源码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class Watermark implements Serializable {

private final long timestamp;

public Watermark(long timestamp) {
this.timestamp = timestamp;
}

public long getTimestamp() {
return this.timestamp;
}

public String getFormattedTimestamp() {
return ((SimpleDateFormat)TS_FORMATTER.get()).format(new Date(this.timestamp));
}
}
  • watermark,本质上也是Flink中各算子间流转的一种数据,只不过与用户的数据不同,它是Flink内部自动产生并插入到数据流的;
  • 它本身所携带的信息很简单,就是一个时间戳!
watermark产生源头示意图
初始状态
Flink图17
收到一条新数据后
Flink图18

简单说,就是在watermark产生的源头算子实例中,实例程序会用一个定时器,去周期性的检查截止到此刻所收到过的数据的事件时间最大值,如果超过了之前的最大值,则将这个最大值更新为最新的watermark,并向下游传递;

watermark往下游推进的示意图
初始状态
Flink图19
新的上游watermark即将到达
Flink图20
上游新的watermark最终产生的效果
Flink图21

一个下游算子实例,如果消费着多个上游算子实例:
则选择”Min(上游各实例的最新watermark)”作为自己当前的watermark;
并将自己最新的watermark往下游传播;

watermark从源头往下游推进完整示意图
Flink图22 Flink图23 Flink图24
watermark推进事件时间与窗口计算的结合示例
Flink图25

Flink产生watermark时,下游算子的watermark是以上游watermark的最小值为准,这样就会产生延迟。延迟最大的程度是最慢的那个分区。
如果最慢的watermark数据丢失了,下游的watermark永远是之前的最小值,不再更新,那么这样的话,未来数据是不是永远不会输出?
为了解决这个问题,Flink对窗口算子有一个机制:watermark-idle-timeout,如果有一条分支,迟迟没有数据过来,超过指定时间,那么Flink会主动推进时间。

Flink图26

从这里可以看出,如果某一个上游实例watermark一直停滞,则会导致下游实例的watermark也一直停滞,从而延迟窗口计算的触发,并造成大量数据的积压。
对此,Flink提供了一个机制:设置时间watermark的idle超时(在源头设置):如果某个分区超过idle时长没有收到数据,则会自主往前推进时间。

7.3 内置watermark生成策略

在Flink1.12以后,watermark默认是按固定频率周期性的产生;
此前有两种生成策略:

  • AssignerWithPeriodicWatermarks 周期性生成watermark
  • AssignerWithPunctuatedWatermarks[已过期] 按指定标记性事件生成watermark

在Flink1.12后,watermark默认是按固定频率周期性的产生,这个产生wartermark的源点不一定是source,也可以选择计算逻辑中任何一个环节产生。

新版本API内置的Watermark策略
  • 紧跟最大事件时间的watermark生成策略(完全不容忍乱序,只要迟到,就丢弃)
1
WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序的watermark生成策略(最大事件数据 - 容错时间)
1
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));//根据实际数据的最大乱序情况来设置
  • 自定义watermark生成策略
1
WatermarkStrategy.forGenerator(new WatermarkGenerator(){...});
Monotonous策略的时间推进示意图
Flink图27
BoundedOutOfOrderness策略的时间推进示意图 (watermark = eventTime - 5)
Flink图28

如果让watermark直接紧跟收到的数据的最大事件时间,那么会有大量迟到数据被认为是过期的,所以应该让时间的推进比收到的数据的最大事件时间慢一点。比如这里的例子,它允许的最大乱序是5。但即使这样,还是不能彻底解决乱序问题,它只是起一个缓冲作用。

7.4 设置watermark策略的模板代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class _19_Watermark_API_Demo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//1,e01,168673487846,pg01
DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
//给上面的source算子,添加watermark生成策略

//策略1:WatermarkStrategy.noWatermarks() 不生成watermark,禁用了事件时间的推进机制
//策略2:WatermarkStrategy.forMonotonousTimestamps() 紧跟最大事件时间
//策略3:WatermarkStrategy.forBoundedOutOfOrderness() 允许乱序的watermark生成策略
//策略4:WatermarkStrategy.forGenerator() 自定义watermark生成算法

/**
* 构造一个watermark的生成策略对象(算法策略,及事件时间的抽取方法)
*/
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofMillis(0)) //允许乱序的算法策略
.withTimestampAssigner(new SerializableTimestampAssigner<String>() { // 时间抽取方法

@Override
public long extractTimestamp(String s, long l) {

return Long.parseLong(s.split(",")[2]);
}
})
.withIdleness(Duration.ofMillis(2000)); // 防止上游某些分区的水位线不推进导致下游的窗口一直不触发(这个分区很久都没数据)

//然后将构造好的watermark策略对象,分配给流(source算子)
stream.assignTimestampsAndWatermarks(watermarkStrategy);
}
}

7.5 内置watermark的源码分析

WatermarkGenerator接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
* fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {

/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);

/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
BoundedOutOfOrdernessWatermarks乱序水印实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* A WatermarkGenerator for situations where records are out of order, but you can place an upper
* bound on how far the events are out of order. An out-of-order bound B means that once an event
* with timestamp T was encountered, no events older than {@code T - B} will follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
* the periodic interval length, plus the out-of-orderness bound.
*/
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

/** The maximum timestamp encountered so far. */
private long maxTimestamp;

/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;

/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}

// ------------------------------------------------------------------------

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
//视当前的事件时间戳,更新(或不更新)maxTimestamp
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
//以当前eventTimeStamp - 乱序延迟数 - 1,作为生成的watermark值
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}

详见
https://www.bilibili.com/video/BV1K44y1g7wA?p=64&spm_id_from=pageDriver&vd_source=26668f0ed33317a00612f0d4c98799c9
P64