10. Kafka 关键原理加强

10. Kafka 关键原理加强

10.1 日志分段切分条件

日志分段文件切分包含以下4个条件,满足其一即可:

(1) 当前日志分段文件的大小超过了broker端参数log.segment.bytes配置的值。
log.segment.bytes参数的默认值为1073741824,即1GB

(2) 当前日志分段中消息的最小时间戳与当前系统的时间戳的差值大于log.roll.ms或++log.roll.hours++参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
(3) 偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes配置的值。log.index.size.max.bytes的默认值为10485760,即10MB
(4) 追加的消息的偏移量与当前日志分段的起始偏移量之间的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。

10.2 controller控制器

Controller简单来说,就是kafka集群的状态管理者 
在kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),
它负责维护整个集群中所有分区和副本的状态及分区leader的选举。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topic.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下: 
{“version”:1,”broker”:0,”timestamp”:”1529210278988”}
其中version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。 
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试读取zookeeper上的/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所有当前就会放弃竞选;如果zookeeper不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其它broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

controller竞选机制,简单说,先来先上

具备控制器身份的broker需要比其它普通的broker多一些职责,具体细节如下:

  • 监听partition相关变化
    对Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。
    对Zookeeper中的/isr_change_notification节点注册IsrChangeNotificationListener,用来处理ISR集合变更的动作。
    对Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本选举。

  • 监听topic增减变化
    对Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化。
    对Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。

  • 监听broker相关的变化
    对Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。

  • 更新集群的元数据信息
    从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理,对各topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化,并将最新信息同步给其它所有broker。

  • 启动并管理分区状态机和副本状态机

  • 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来维护分区的leader副本的均衡

10.3 分区的负载分布

客户端请求创建一个topic时,每一个分期副本在broker上的分配,是由集群controller来决定;
其分布策略源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
1
2
3
4
private def replicaIndex(firstReplicaIndex : Int, secondReplicaShift : Int, replicaIndex : Int, nBrokers : Int) : Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
  • 副本因子不能大于Broker的个数;
  • partition_0的第1个副本(leader副本)放置位置是随机从brokerList选择的;
  • 其它分区的第1个副本放置位置相对与partition_0分区依次往后移(也就是说如果我们有5个Broker,5个分区,假设partition0分区放在broker4上,那么partition1将会放在broker5上;partition2将会放在broker1上,partition3在broker2,依此类推)
  • 各分区剩余的副本相对于分区前一个副本偏移随机数nextReplicaShift

10.4 分区Leader的选举机制

分区leader副本的选举由控制器controller负责具体实施。
当创建分区(创建主题或增加分区都有创建分区的动作)或Leader下线(此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作。

选举策略:按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中;
一个分区的AR集合在partition分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变;

10.5 分区数与吞吐量

Kafka本身提供用于生产者性能测试的 kafka-producer-perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh,主要参数如下:

  • topic用来指定生产者发送消息的目标主题;
  • num-records用来指定发送消息的总条数
  • record-size用来设置每条消息的字节数;
  • producer-props参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔与producer-props参数对应的还有一个producer-config参数,它用来指定生产者的配置文件
  • throughput用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时,当发送的吞吐量大于该值时就会被阻塞一段时间。

10.6 生产者原理解析

Kafka图9

一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程。
在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存
到消息累加器(RecordAccumulator,也称为消息收集器)中。

Sender线程负责从RecordAccumulator获取消息并将其发送到Kafka中;

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数++buffer.memory++配置,默认值为33554432B,即32M。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer.send()方法调用要么被阻塞,要么抛出异常,这个取决于参数++max.block.ms++的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,++RecordAccumulator内部为每个分区都维护了一个双端队列++,即Deque<ProducerBatch>。
消息写入缓存时,追加到双端队列的尾部;

Sender读取消息时,从双端队列的头部读取。注意:ProducerBatch是指一个消息批次;
与此同时,会将较小的ProducerBatch凑成一个较大ProducerBatch,也可以减少网络请求的次数以
提升整体的吞吐量。

ProducerBatch大小和++batch.size++参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端
队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的Producer Batch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch。

如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

Sender从RecordAccumulator获取缓存的消息之后,会进一步将<分区,Deque<Producer Batch>>的形式转变成<Node,List<ProducerBatch>>的形式,其中Node表示Kafka集群broker节点。对于网络连接来说,生产者客户端是与具体broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node,List<ProducerBatch>>的形式之后,Sender会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是Kafka各种协议请求;

请求在从sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<Nodeld,Deque<request>>,++它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld是一个String类型,表示节点的id编号)++。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为++max.in.flight.request.per.connection++,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。++通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题++,再继续发送请求会增大请求超时的可能。

10.7 重要的生产者参数

10.7.1 acks

acks是控制生产者在发送出消息后如何得到确认;
生产者根据得到的确认信息,来判断消息发送是否成功;

acks含义
0Producer往集群发送数据不需要等到集群的确认信息,不确保消息发送成功。安全性最低但是效率最高。
1Producer往集群发送数据只要leader成功写入消息就可以发送下一条,只确保Leader接收成功。
-1或allProducer往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下一条,确保Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。

生产者将acks设置为all,是否就一定不会丢数据呢?
否!如果在某个时刻ISR列表只剩leader自己了,那么就算acks=all,收到这条数据还是只有一个节点;

可以配合另外一个参数缓解此情况:最小同步副本数 >= 2
Broker端参数:min.insync.replicas(默认1)

10.7.2 max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB
一般情况下,这个默认值就可以满足大多数的应用场景了。
这个参数还涉及一些其它参数的联动,比如broker端(topic级别参数)的++message.max.bytes++参数
(默认1000012),如果配置错误可能会引起一些不必要的异常:比如将broker端的
message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当发送一条大小为15B
的消息时,生产者客户端就会报出异常;

10.7.3 retries 和 retry.backoff.ms

retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作
消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。

重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。

Kafka可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用来说,顺序性非常重要,比如MySQL binlog的传输,如果出现错误就会造成非常严重的后果;

如果将retries参数配置为非零值,并且max.in.flight.requests.per.connection参数配置为大于1的值,那可能会出现错序的现象:如果批次1消息写入失败,而批次2消息写入成功,那么生产者会重试发送批次1的消息,此时如果批次1的消息写入成功,那么这两个批次的消息就出现了错序。

一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把retries配置为0,不过这样也会影响整体的吞吐。

10.7.4 compression.type

这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。
该参数还可以配置为”gzip”,”snappy”和”lz4”。
对消息进行压缩可以极大地减少网络传输、降低网络I/O,从而提高整体的性能。
消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;

10.7.5 batch.size

每个Batch要存放batch.size大小的数据后,才可以发送出去。比如说++batch.size默认值是16KB++,那么里面凑够16KB的数据才会发送。
理论上来说,提升batch.size的大小,可以允许更多的数据缓冲在recordAccumulator里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。
但是batch.size也不能过大,要是数据老是缓冲在Batch里迟迟不发送出去,那么发送消息的延迟就
会很高。
一般可以尝试把这个参数调节大些,利用生产环境发消息负载测试一下。

10.7.6 linger.ms

这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入
ProducerBatch时间,默认值为0。
生产者客户端会在ProducerBatch填满或等待时间超过linger.ms值时发送出去。
增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

10.7.7 enable.idempotence

是否开启幂等性功能,详见后续原理加强;
幂等性,就是一个操作重复做,也不会影响最终的结果!
int a = 1;
a++; //非幂等操作
val map = new HashMap()
map.put(a,l); //幂等操作

在kafka中,同一条消息,生产者如果多次重试发送,在服务器中的结果如果还是只有一条,这就是
具备幂等性;否则,就不具备幂等性!

10.7.8 partitioner.class

用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner
默认分区器的分区规则:

  • 如果数据中有key,则按key的murmur hash值 % topie分区总数得到目标分区
  • 如果数据只有value,则在各个分区间轮询

自定义partitioner需要实现org.apache.kafka.clients.producer.Partitioner接口

10.8 消费者组再均衡分区分配策略

消费者组的意义何在?为了提高数据处理的并行度!

当以下事件发生时,kafka将会进行一次分区分配:

  • 同一个consumer group内新增或减少了消费者
  • 订阅的主题新增分区
  • 订阅的主题增加
    将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何rebalance也涉及到分区分配策略。
    kafka内部存在两种的分区分配策略:range(默认)和round robin。
    (消费者组的分区分配策略/消费者组的负载均衡策略/消费者组的再均衡策略)

10.8.1 Range Strategy

先将消费者按照client.id字典排序,然后按topic逐个处理;
针对一个topic,将其partition总数/消费者数 得到商n和余数m,则每个consumer至少分到n
个分区,且前m个consumer每人多分一个分区;

举例说明1:假设有TOPIC_A有5个分区,由3个consumer(C1,C2,C3)来消费;
5/3得到商1,余2,则每个消费者至少分1个分区,前两个消费者各多1个分区
C1:2个分区,C2:2个分区,C3:1个分区
接下来,就按照“区间”进行分配:
C1:TOPIC_A-0 TOPIC_A-1
C2:TOPIC_A-2 TOPIC_A_3
C3:TOPIC_A-4

举例说明2:假设TOPIC_A有5个分区,TOPIC_B有3个分区,由2个consumer(C1,C2)来消费

  • 先分配TOPIC_A
    5/2得到商2,余1,则C1有3个分区,C2有2个分区,得到结果
    C1:TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
    C2:TOPIC_A-3 TOPIC_A-4
  • 再分配TOPIC_B
    3/2得到商1,余1,则C1有2个分区,C2有1个分区,得到结果
    C1:TOPIC_B-0 TOPIC_B-1
    C2:TOPIC_B-2
  • 最终分配结果:
    C1:TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
    C2:TOPIC_A-3 TOPIC_A-4 TOPIC_B-2

10.8.2 Round-Robin Strategy

将所有主题分区组成TopicAndPartition列表,并对TopicAndPartition列表按照其hashCode排序,然后,以轮询的方式分配给各消费者

以上述“例2”来举例:

  • 先对TopicAndPartition的hashCode排序,假如排序结果如下:
    TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
  • 然后按轮询方式分配
    C1:TOPIC_A-0 TOPIC_A-1 TOPIC_B-1 TOPIC_A-4
    C2:TOPIC_B-0 TOPIC_A-2 TOPIC_A-3 TOPIC_B-2

我们可以通过++partition.assignment.strategy++参数选择range或roundrobin。
partition.assignment.strategy参数默认的值是range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

这个参数属于“消费者”参数!

10.8.3 Sticky Strategy

对应的类叫做:org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:

  • 要去达成最大化的均衡
  • 尽可能保留各消费者原来分配的分区

再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽
量让原来属于谁的分区依然分配给谁)

10.8.3 CooperativeSticky Strategy

对应的类叫做:org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:

  • 逻辑与sticky策略一致
  • 支持cooperative再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配)

10.9 消费者组再均衡流程

消费组在消费数据的时候,有两个角色进行组内的各事务的协调:
角色1:Group Coordinator(组协调器)位于服务端(就是某个broker)
角色2:Group Leader(组长)位于消费端(就是消费组中的某个消费者)

10.9.1 GroupCoordinator介绍

每个消费组在服务端对应一个GroupCoordinator进行管理,GroupCoordinator是Kafka服务端中用
于管理消费组的组件。
消费者客户端中由ConsumerCoordinator组件负责与GroupCoordinator进行交互;
ConsumerCoordinator和GroupCoordinator最重要的职责就是负责执行消费者rebalance操作,包括前面提及的分区分配工作也是在rebalance期间完成的。

会触发rebalance的事件可能是如下任意一种:

  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消
    费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者己下线。
  • 有消费者主动退出消费组(发送LeaveGroupRequest请求):比如客户端调用了unsubscrible()
    方法取消对某些主题的订阅。
  • 消费组所对应的GroupCoorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。

10.9.2 再均衡流程

阶段1:定位Group Coordinator

coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上查找Group Coordinator的方式:

  • 先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets中的分区编号;
    Utils.abc(gropId.hashCode)%groupMetadataTopicPartitionCount
    groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数
    offset.topic.num.partitions来配置,默认值是50;
  • 找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;

阶段2:加入组join the group

此阶段的重要操作之1:选举消费组的leader
private val members = new mutable.HashMap[String, MemberMetadata]
var leaderid = members.keys.head

消费组leader的选举,策略就是:随机!

此阶段的重要操作之2:选择分区分配策略
最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:
(1) 收集各个消费者支持的所有分配策略,组成候选集candidates
(2) 每个消费者从候选集candidates找出第一个自身支持的策略,为这个策略投上一票。
(3) 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。

其实,此逻辑并不需要consumer来执行,而是由Group Coordinator来执行

阶段3:组信息同步SYNC group

此阶段,主要是由消费组leader将分区分配方案,通过Group Coordinator来转发给组中各消费者

阶段4:心跳联系 HEART BEAT

进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。
各消费者在消费数据的同时,保持与Group Coordinator的心跳通信;

消费者的心跳间隔时间由参数++heartbeat.interval.ms++指定,默认值为3000,即这个参数必须比++session.timeout.ms++参数设定的值要小;一般情况下heartbeat.interval.ms的配置值不能超过session.timeout.ms配置值的l/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间;

如果一个消费者发生崩溃,并停止读取消息,那么GroupCoordinator会等待一小段时间确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。
这个一小段时间由session.timeout.ms参数控制,该参数的配置值必须在broker端参数

10.9.3 再均衡监听器

一个消费组中,一旦有消费者的增减发生,会触发消费者组的rebalance再均衡;
如果想控制消费者在发生再均衡时执行一些特定的工作,可以通过订阅主题时注册“再均衡监听器”来实现;

场景举例:在发生再均衡时,处理消费位移
如果A消费者消费掉的一批消息还没来得及提交offset,而它所负贵的分区在rebalance中转移给了B
消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
consumer.subscribe(Collections.singletonList("tpc_5"), new ConsumerRebalanceListener(){
//被取消旧分区后被调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//store the current offset to db
}

//分配到新的分区后被调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//fetch the current offset from db
I }
});

10.10 Kafka系统的CAP保证

10.10.1 分布式系统的CAP理论

CAP理论作为分布式系统的基础理论,它描述的是一个分布式系统在以下三个特性中:

  • 一致性(Consistency)
  • 可用性(Availability)
  • 分区容错性(Partition tolerance)

最多满足其中的两个特性。也就是下图所描述的。分布式系统要么满足CA,要么CP,要么AP。无法同时满足CAP。

分区容错性:指的是分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。
事实上我们在设计分布式系统是都会考虑到bug,硬件,网络等各种原因造成的故障,所以即使部分节点或者网络出现故障,我们要求整个系统还是要继续使用的
(不继续使用,相当于只有一个分区,那么也就没有后续的一致性和可用性了)

可用性:一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。

一致性:在分布式系统完成某些操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。

Kafka作为一个商业级消息中间件,数据可靠性和可用性是优先考虑的重点,兼顾尽可能保证数据一致性;

10.10.2 分区副本机制

kafka从0.8.0版本开始引入了分区副本:引入了数据冗余
也就是说每个分区可以人为的配置几个副本(创建主题的时候指定replication-factor,也可以在broker级别进行配置default.replication.factor);
在众多的分区副本里面有一个副本是Leader,其余的副本是follower,所有的读写操作都是经过Leader
进行的,同时follower会定期地去leader上复制数据。当Leader挂了的时候,其中一个follower会
重新成为新的Leader。通过分区副本,引入了数据冗余,同时也提供了kafka的数据可靠性。
++Kaka的分区多副本架构是Kaka可靠性保证的核心,把消息写入多个副本可以使Kaka在发生
崩溃时仍能保证消息的持久性。++

10.10.3 ISR同步副本列表

ISR概念:(同步副本)。每个分区的leader会维护一个ISR列表,ISR列表里面就是follower副本的broker编号,只有跟得上Leader的follower副本才能加入到ISR里面,这个是通过replica.lag.time.max.ms=l0000(默认值)参数配置的,只有ISR里的成员才有被选为leader的可能。

10.10.4 分区副本的数据一致性解决方案

kafka让分区多副本同步的基本手段是:follower副本定期向leader请求数据同步!
既然是定期同步,则leader和follower之间必然存在各种数据不一致的情景!

动态过程中的副本数据不一致,是很难解决的;
kafka先尝试着解决上述“消费者所见不一致”及“分区数据最终不一致”的问题;

解决方案的核心思想

  • 在动态不一致的过程中,维护一条步进式的“临时一致线”(既所谓的High Watermark);
  • 高水位线HW=ISR副本中最小LEO+1;
  • 底层逻辑就是:offset < HW的message,是各副本间一致的且安全的!

Kafka图11

如上图所示:offset < HW:3 的message,是所有副本都已经备份好的数据

解决“消费者所见不一致”(消费者只允许看到HW以下的message)

Kafka图12

解决“分区副本数据最终不一致”(follower数据按HW截断)

Kafka图13

Kafka图14

Kafka图15

10.10.5 HW方案的天生缺陷

如前所述,看似HW解决了“分区数据最终不一致”的问题,以及“消费者所见不一致”的问题,但其实,这里面存在一个巨大的隐患,导致:

  • “分区数据最终不一致”的问题依然存在
  • producer设置acks=al后,依然有可能丢失数据的问题

产生如上结果的根源是:++HW高水位线的更新,与数据同步的进度,存在迟滞!++

Kafka图16

Step 1: leader和follower副本处于初始化值,follower副本发送fetch请求,由于leader副本没有数据,因此不会进行同步操作;

Step2: 生产者发送了消息m1到分区leader副本,写入该条消息后leader更新LEO=1;

Step3: follower发送fetch请求,携带当前最新的offset=0,leader处理fetch请求时,更新remote LEO=0,对比LEO值最小为0,所以HW=0,leader副本响应消息数据及leader HW=0给follower,follower写入消息后,更新LEO值,同时对比leader HW值,取最小的作为新的HW值,此时follower HW=0,这也意味着,follower HW是不会超过leader HW值的。

Step4: follower发送第二轮fetch请求,携带当前最新的offset=l,leader处理fetch请求时,更新remote LEO=l,对比LEO值最小为l,所以HW=l,此时leader没有新的消息数据,所以直接返回leader HW=1给follower,follower对比当前最新的LEO值与leader HW值,取最小的作为新的HW值,此时follower HW=1。

从以上步骤可看出,leader中保存的remote LEO值的更新(也即HW的更新)总是需要额外一轮
fetch RPC请求才能完成,这意味着在leader切换过程中,会存在数据丢失以及数据不一致的问题!

10.10.6 HW会产生数据丢失和副本最终不一致问题

数据丢失的问题(即使produce设置acks=all,依然会发生)

Kafka图17

注意:leader中的HW值是在follower下一轮fetch RPC请求中完成更新的

如上图所示:

  • 状态起始:B为leader,A为follower;最新消息m2已同步,但B的HW比A的HW大1
  • A在此时崩溃(即follower没能通过下一轮请求来更新HW值)
  • A重启时,会自动将LEO值调整到之前的HW值,即会进行日志截断
  • B重启后,会从向A发送fetch请求,收到fetch响应后,拿到HW值,并更新本地HW值,这时B会做日志截断,因此,offsets=1的消息被永久地删除了。

副本间数据最终不一致的问题(即使produce设置acks=all,依然会发生)

Kafka图18

如上图所示:

  • 状态起始:A为leader,B为follower;最新消息m2已同步,但B的HW比A的HW大1
  • A在此时崩溃(即follower没能通过下一轮请求来更新HW值)
  • B先重启,会自动将LEO值调整到之前的W值,即会进行日志截断,并在此刻接收了新的消息m3,HW随之上升为2
  • 然后,A重启上线,会从向B发送fetch请求,收到fetch响应后,拿到HW值,并更新本
    地HW值,发现不需要截断,从而己经产生了“副本间数据最终不一致”!

只要新一届leader在老leader重启上线前,接收了新的数据,就可能发生上图中的场景,根源也在于HW的更新落后于数据同步进度

10.10.7 Leader-Epoch机制的引入

为了解决HW更新时机是异步延迟的,而HW又是决定日志是否备份成功的标志,从而造成数据丢失和数据不一致的现象,Kafka引入了leader epoch机制;
在每个副本日志目录下都创建一个leader-epoch-checkpoint文件,用于保存leader的epoch信息;

Kafka图19

它的格式为(epoch offset),epoch指的是leader版本,它是一个单调递增的一个正整数值,每次leader变更,epoch版本都会+l,offset是每一代leader写入的第一条消息的位移值,比如:
(0,0)
(1,300)
以上第2个版本是从位移300开始写入消息,意味着第一个版本写入了0-299的消息。

leader epoch具体的工作机制

  • 当副本成为leader时:
    这时,如果此时生产者有新消息发送过来,会首先更新leader epoch以及LEO,并添加到
    leader–epoch-checkpoint文件中;

  • 当副本变成follower时:
    发送LeaderEpochRequest请求给leader副本,该请求包括了follower中最新的epoch版本;
    leader返回给follower的响应中包含了一个LastOffset,如果follower last epoch=leader last epoch(纪元相同),则LastOffset=leader LEO,否则取follower last epoch中最小的leader epoch的start offset值;

举个例子:假设follower last epoch=1,此时leader有(1,20)(2,80)(3,120),则LastOffset=80;follower拿到LastOffset之后,会对比当前LEO值是否大于LastOffset,如果当前LEO大于LastOffset,则从LastOffset截断日志;
follower开始发送fetch请求给leader保持消息同步。

解决数据丢失:

Kafka图20

如上图所示:
A重启之后,发送LeaderEpochRequest请求给B,由于B还没追加消息,此时epoch=request epoch
=0,因此返LastOffset=leader LEO=2给A
A拿到LastOffset之后,发现等于当前LEO值,故不用进行日志截断。就在这时B宕机了,A成为leader,在B启动回来后,会重复A的动作,同样不需要进行日志截断,数据没有丢失。

解决数据最终不一致问题:

Kafka图21

如上图所示:

  • A和B同时宕机后,B先重启回来成为分区leader,这时候生产者发送了一条消息过来,leader
    epoch更新到1
  • 此时A启动回来后,发送LeaderEpochRequest(follower epoch=0)给B,B判断follower epoch
    不等于最新的epoch,于是找到大于follower epoch最小的epoch=l,即LastOffset=epoch start offset=1
  • A拿到LastOffset后,判断小于当前LEO值,于是从LastOffset位置进行日志截断,接着开
    始发送fetch请求给B开始同步消息,避免了消息不一致/离散的问题。

10.10.8 LEO/HW/LSO等相关术语速查

LEO:(last end offset) 就是该副本中消息的最大偏移量的值+1;
HW:(high watermark) 各副本中LEO的最小值。这个值规定了消费者仅能消费HW之前的数据;
LW:(low watermark) 一个副本的log中,最小的消息偏移量;     
LS0:(last stable offset) 最后一个稳定的offset;
对未完成的事务而言,LSO的值等于事务
中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同HW相同;