6. Flink 时间语义

6. Flink 时间语义

6.1 三种时间概念

在实时流式计算中,”时间”是一个能影响计算结果的非常重要因素!
试想场景:每隔1分钟计算一次最近10分钟的活跃用户量;

假设此刻的时间是13:10,要计算的活跃用户量时间段为:[13:00, 13:10)

有一条行为日志中记录的用户行为时间是12:59,但到达Flink的计算程序已经是13:02,那么。这个用户是否要纳入本次计算的结果中呢?看如何定义:

  1. 如果时段[13:00, 13:10)定义的是用户行为的发生时间(数据中的业务时间),则不应纳入;
  2. 如果时段[13:00, 13:10)定义的是计算时的时间,则应该纳入;
Flink图14

Flink内部为了直观地统一计算时所用的时间标准,特制定了两种时间语义:

  • processing time 处理时间
  • event time 事件时间
  • Ingestion time 注入时间

时间语义注意影响”窗口计算”;

6.1 两种时间语义

时间语义,是Flink中用于时间推进和时间判断的机制;
时间推进和时间判断,以什么为标准,就产出了两种不同的时间语义;

  • 以processing time为依据,则叫做处理时间语义

  • 以event time为依据,则叫做事件时间语义

时间语义的设计意义
1
2
3
4
5
process(EventLog eventLog){
Long eventTime = eventLog.getTimestamp();
Long processTime = System.currentMillimise();
//用户完全可以自己根据需求中的时间定义来进行相应的计算
}

Flink为什么还要搞出一个”事件时间语义”:时间按数据中的业务时间戳来推进! 
主要是,实时流计算中,有大量跟时间相关的统计需求,比如:时间窗口计算,定时器等,而这些需求,如果都让用户像上面的代码那样自己去进行判断、处理,那么它觉得自己的API不够强大!
所以,Flink想在API的层面,将两类时间定义的计算需求进行API层面的统一,它才搞出这么一种”事件时间语义”,有了这种语义,那么,处理时间和事件时间都可以看成”时间” 
用户在不同时间定义下,要进行一个定时动作,就不需要再像上面的代码那样去进行各种判断,而是一个统一的动作,到XXX时间,给我做个什么事!

1
2
3
process(EventLog eventlog, TimeStamp timestamp){
//不管需求是需要用到哪种时间来计算,用户代码只需要看到一个timestamp了
}

代码中的timestamp到底是事件时间,还是处理时间,取决于环境中设置的”时间语义”

处理时间(processing time)语义

Processing Time是指数据被Operator处理时所在机器的系统时间。
处理时间遵循客观世界中时间的特性:单调递增,恒定速度,永不停滞,永不回退;

事件时间(event time)语义

Event Time是指在数据本身的业务时间(如用户行为日志中的用户行为时间戳);
Event Time语义中,时间的推进完全由流入Flink系统的数据来驱动;
++数据中的业务时间推进到哪,Flink就认为自己的时间推进到了哪++
它可能停滞,也可能速度不恒定,但也一定是单调递增不可回退!

6.3 时间语义的设置

1.12以前,Flink默认以processing time作为默认的时间语义。
1.12及以后,Flink默认以event time作为默认的时间语义。
在需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的API来使用特定的时间语义;

新版API中指定时间语义
1
2
3
4
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1));
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
keyedStream.window(TumblingSlidingEventTimeWindows.of(Time.seconds(5));
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1));
新版API中禁用时间语义

如果需要禁用event time机制,则可以通过设置watermark生成频率间隔来实现:

1
2
//如果设置为0,则禁用了watermark的生成,从而失去了event time语义
ExecutionConfig.setAutoWatermarkInterval(long);

提示:如果需要使用已经过期的ingestion time,可以通过设置恰当的watermark来实现。