11. Flink checkpoint机制

11. Flink checkpoint机制

11.1 分布式checkpoint的难题

由于Flink是一个分布式的系统,数据在流经系统中各个算子时,是有先后顺序的,换个角度来说就是:整个系统对一条数据的处理过程,并不是一个原子性的过程;

这样一来,对系统中各算子的状态进行持久化(快照),就成了一件棘手的事情;

来看如下数据处理场景:

  • 算子1:从kafka中读取数据,并在状态中记录消费位移
  • 算子2:对流入的整数进行累加,并输出累加结果
  • 算子3:对流入是整数进行累加,并输出累加结果
先注意观察正常情况下,整个系统的各算子状态变化及最终输出结果
Flink图37
出于容错考虑,需要在某个时机对整个系统各个算子的状态数据进行快照持久化,如下
Flink图38
系统重启后加载快照数据,恢复各算子崩溃前的状态,但是会发现,处理结果相对正常时完全错误
Flink图39

从最终结果来看,整个计算

  • 丢失了数据2的结果
  • 数据3则因为内部状态的紊乱而产生了错误的结果

发生错误的根本原因是:简单粗暴的快照所得到的快照状态在各个算子间不统一(不是经过了相同数据的影响)

11.2 Checkpoint的核心要点

checkpoint是Flink内部对状态数据的快照机制; 
Flink的checkpoint机制是源于Chandy-Lamport算法: 
底层逻辑:通过插入序号单调递增的barrier,把无界数据流划分成逻辑上的数据批(段),并提供段落标记(barrier)来为这段数据的处理,加持“事务(transaction)”特性;

  • 每一段数据流要么被完整成功处理
  • 要么回滚一切不完整的影响(状态变化)
Flink图40

11.3 Checkpoint的整体流程

  1. JobMaster即CheckpointCoordinator会定期向每个source task发送命令start checkpoint(trigger checkpoint);
  2. 当source task收到trigger checkpoint指令后,产生barrier并通过广播的方式发送到下游,source task及其它所有task,收到barrier-n,会执行本地checkpoint-n,当checkpoint-n完成后,向JobMaster发送ack;
  3. 当流图的所有节点都完成checkpoint n,JobMaster会收到所有节点的ack,那么就表示完成checkpoint-n;
Flink图41

说明:checkpoint机制的调用流程实质是2PC。JobMaster是协调者,所有operator task是执行者。start checkpoint是pre-commit的开始信号,而每个operator task的checkpoint是pre-commit过程,ack是执行者operator task反馈给协调者JobMaster,最后callback是commit。

barrier是JobManager定期指派各个source算子插入, 
每个算子做完了checkpoint-n,就会向JobManager应答, 
JobManager收到所有算子对checkpoint-n的应答后,才认为这次checkpoint是成功的(完整完成的 global)
然后,JobManager确认checkpoint-n全局完成后,会向各个算子通报一次checkpoint-n完成。

1. barrier会在数据流源头被注入并行数据流中

barrier-n所在的位置就是恢复时数据重新处理的起始位置。例如在kafka中,这个位置就是最后一个记录在分区内的消费位移(offset),作业恢复时,会根据这个位置从这个偏移量向kafka请求数据,这个偏移量就是State中保存的内容之一。

2. barrier接着向下游传递

当一个非数据源算子从所有的输入流中收到barrier-n时,该算子就会对自己的State保存快照,并向自己的下游广播发送barrier-n;

3. 一旦Sink算子接收到barrier,有两种情况:

(1) 如果是引擎内严格一次处理保证,当Sink算子已经收到了所有上游的barrier-n时,Sink算子对自己的State进行快照,然后通知检查点协调器(CheckpointCoordinator)。当所有的算子都向检查点协调器汇报成功之后,检查点协调器向所有的算子确认本次快照完成。

(2) 如果是端到端严格一次处理保证,当Sink算子已经收到了所有上游的barrier-n时,Sink算子对自己的State进行快照,并预提交事务(两阶段提交的第一阶段),再通知检查点协调器(CheckpointCoordinator),检查点协调器向所有的算子确认本次快照完成,Sink算子提交事务(两阶段提交的第二阶段),本次事务完成。

snapshotting operator算子
Flink图42
  • For each parallelism stream data source,the offset/position in the stream when the snapshot was started
  • For each operator,a pointer to the state that was stored as part of the snapshot

11.4 对齐和非对齐checkpoint

对齐的checkpoint
Flink图43
  • 算子收到数字流barrier,字母流对应barrier尚未到达
  • 算子收到数字流barrier,会继续从数字流中接收数据,但这些流只能被搁置,记录不能被处理,而是放入缓存中,等待字母流barrier到达,在字母流到达前,1、2、3数据已经被缓存
  • 字母流到达,算子开始对齐State进行异步快照,并将barrier向下游广播,并不等待快照完毕
  • 算子做异步快照,首先处理缓存中积压数据,然后再从输入通道中获取数据

注意背压!

非对齐的checkpoint
Flink图44

barrier不对齐:就是指当还有其它流的barrier还没到达时,为了不影响性能,也不用理会,直接处理barrier之后的数据,等到所有流的barrier都到达后,就可以对该Operator做checkpoint了;
如果不对齐,那么在chk-100快照之前,已经处理了一些chk-100对应的offset之后的数据,当程序从chk-100恢复任务时,chk-100对应的offset之后的数据还会被处理一次,所以就出现了重复消费。

非对齐的checkpoint不能保证精确一次

11.5 Checkpoint相关参数和API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 要让Flink对托管状态数据进行容错,还需要开启快照机制
// 开启状态数据的checkpoint机制(快照的周期,快照的模式)
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 开启快照后,就需要指定快照数据的持久化存储位置
/*env.getCheckpointConfig().setCheckpointStorage(new URI("hdfs://114.116.0.219:9000/checkpoint"));*/
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(new URI("file:///d:/checkpoint/"));
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1000)); //设置ck对齐的超时时长
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置ck算法模式
checkpointConfig.setCheckpointInterval(2000); //设置ck的间隔时长
//checkpointConfig.setCheckpointIdOfIgnoredInFlightData(5); //用于非对齐算法模式下,在job恢复时让各个算子自动抛弃掉ck-5的飞行数据
// job cancel调时
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setForceUnalignedCheckpoints(false); //是否强制使用非对齐的ck模式
checkpointConfig.setMaxConcurrentCheckpoints(5); //允许在系统中同时存在的飞行中(未完成的)的ck数
checkpointConfig.setMinPauseBetweenCheckpoints(2000); //设置两次ck之间的最小时间间隔,用于防止checkpoint过多的占用算子的处理时间
checkpointConfig.setCheckpointTimeout(3000); //一个算子在一次checkpoint执行过程中的总耗时时长超时上限
checkpointConfig.setTolerableCheckpointFailureNumber(10); //允许的checkpoint失败最大次数