kafka-副本机制

kafka虽然可以对topic进行分片,但是对于partition来说,它还是单点的,当partition所在的broker宕机了,那么这部分消息就无法被消费。所以kafka为了提高partition的可靠性,提供了副本replica的概念,通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个learder副本,所有的读写请求都是由leader来处理。其余的副本称为follower副本,follower会主动从leader同步消息日志。一般情况下,同一个分区的多个副本会被均匀的分配到集群的不同broker上,当leader所在的broker出现故障,可以重新选举新的leader副本继续向外提供服务。

通过下面的命令创建一个带副本的topic

1
sh kafka-topic.sh --create --zookeeper 192.168.2.112:9092 --replication-factor 3 --partition 3 --topic test_topic

如何知道各个分区中对应的leader是谁呢?

在zookeeper服务器上,通过如下命令去获取对应分区的信息,比如下面这个是获取Topic第1个分区的状态信息。

1
get /brokers/topics/topicName/partitions/1/state

{“controller_epoch”:12,”leader”:0,”version”:1,”leader_epoch”:0,”isr”:[0,1]}

或通过这个命令

1
sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition

leader表示当前分区的leader是那个broker-id。

需要注意的是,kafka集群中的一个broker中最多只能持有一个分区的一个副本,leader副本所在的broker节点叫分区的leader节点,follower副本所在的broker节点叫分区的follower节点

副本同步中的重要概念

Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的 leader节点发生故障,这个时候怎么处理呢?

kafka必须要保证从follower副本中选择一个新的leader副本。那么kafka是如何实现选举的呢? 要了解leader选举,我们需要了解几个概念

Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同可分为3类:

  • leader副本:响应clients端读写请求的副本

  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。

  • ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同步后面会提到。

每个Kafka副本对象都有两个重要的属性:LEOHW。注意是所有的副本,而不只是 leader副本。

  • LEO:日志末端位移(Log End Offset),记录了该副本底层日志(log)中下一条消息的offset。

    注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。

  • HW:高水位(High Water),对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是已备份的。

同理,leader副本和follower副本的HW更新是有区别的

从生产者发出的一条消息首先会被写入分区的leader 副本,不过还需要等待ISR集合中的所有 follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费到这条消息。

副本协同机制

刚刚提到了,消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当leader副本所在的broker挂了以后,会从follower副本中选取新的leader。写请求首先由Leader副本处理,之后follower副本会从leader上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍。

但是如果一个follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,leader就会把它踢出去。kafka通过ISR集合来维护一个分区副本信息,leader负责维护和跟踪ISR中所有follower滞后的状态。当 producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。

ISR集合

ISR表示目前”可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集”。怎么去理解可用相差不多这两个词呢?具体来说,ISR集合中的副本必须满足两个条件 :

  1. 副本所在节点必须维持着与zookeeper的连接 ;

  2. 副本最后一条消息的offset与leader副本的最后一条消息的offset(也就是LEO)之间的差值不能超过指定的阈值 。

ISR数据保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 节点中。

follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上了leader 副本,这个时候会更新这个副本的lastCaughtUpTimeMs标识,顾名思义,最后一次赶上的时间

kafk副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本lastCaughtUpTimeMs的差值是否大于参数 replica.lag.time.max.ms 的值,如果大于,则会把这个副本踢出ISR集合


副本同步原理

了解了副本的协同过程以后,还有一个最重要的机制,就是数据的同步过程。它需要解决

  1. 怎么传播消息

  2. 在向消息发送端返回ack之前需要保证多少个Replica已经接收到这个消息

Producer在发布消息到某个Partition时:

  1. 先通过ZooKeeper找到该Partition的Leader ,get /brokers/topics/<topic>/partitions/2/state ,然后无论该Topic的Replication Factor为多 少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。
  2. Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。
  3. 一旦Leader收到了ISR中的所有follower的ACK,该消息就被认为已经commit了,Leader将增加 HW值并且向Producer发送ACK。

此外, 假如某工作不积极的follower副本被踢出ISR集合,也会导致这个分区的HW发生变化,一般是增加

1、初始状态

初始状态下,leader和follower的HW和LEO都是0,leader副本会保存remote LEO,表示所有follower LEO,也会被初始化为0,这个时候,producer没有发送消息。follower会不断地个leader发送FETCH请求,但是因为没有数据,这个请求会被leader寄存,当在指定的时间之后会强制完成请求,这个时间配置是:

replica.fetch.wait.max.ms

如果在指定时间内producer有消息发送过来,那么kafka会唤醒 fetch请求,让leader继续处理。

数据的同步处理会分两种情况,这两种情况下处理方式是不一样的

第一种是leader处理完producer请求之后,follower发送一个fetch请求过来

第二种是follower阻塞在leader指定时间之内,leader副本收到producer的请求。

首先要清楚,fetch请求会携带当前follower的LEO,leader响应会携带当前leader的HW

2、第一种情况

生产者发送一条消息

leader处理完producer请求之后,follower发送一个fetch请求过来 。

leader副本收到请求以后,会做几件事情

  1. 把消息追加到log文件,同时更新leader副本的LEO

  2. 尝试更新leader HW值。这个时候由于follower副本还没有发送fetch请求,那么leader的remote LEO仍然是0。leader会比较自己的LEO以及remote LEO的值发现最小值是0,与HW的值相同,所以不会更新HW

follower fetch消息

follower 发送第一次fetch请求,leader副本的处理逻辑是:

  1. 读取log数据、更新remote LEO=0(follower还没有写入这条消息,这个值是根据follower的fetch

请求中的offset来确定的) ;

  1. 尝试更新HW,因为这个时候LEO和remoteLEO还是不一致,所以仍然是HW=0 ;

  2. 把消息内容和当前分区的HW值发送给follower副本 。

follower副本收到response以后:

  1. 将消息写入到本地log,同时更新follower的LEO ;

  2. 更新follower HW,本地的LEO和leader返回的HW进行比较取小的值,所以仍然是0 。

第一次交互结束以后,HW仍然还是0,这个值会在下一次follower发起fetch请求时被更新

follower发第二次fetch请求,leader收到请求以后 :

  1. 读取log数据

  2. 更新remote LEO=1, 因为这次fetch携带的offset是1.

  3. 更新当前分区的HW,这个时候leader LEO和remote LEO都是1,所以HW的值也更新为1

  4. 把数据和当前分区的HW值返回给follower副本,这个时候如果没有数据,则返回为空

follower副本收到response以后 :

  1. 如果有数据则写本地日志,并且更新LEO

  2. 更新follower的HW值为 MIN(本地LEO值,leader响应的HW值)

到目前为止,数据的同步就完成了,意味着消费端能够消费offset=1这条消息。

3、第二种情况

前面说过,由于leader副本暂时没有数据过来,所以follower的fetch会被阻塞,直到等待超时或者leader接收到新的数据。当leader收到请求以后会唤醒处于阻塞的fetch请求。处理过程基本上和前面说的一致

  1. leader将消息写入本地日志,更新Leader的LEO

  2. 唤醒follower的fetch请求

  3. 更新HW

kafka使用HW和LEO的方式来实现副本数据的同步,本身是一个好的设计,但是在这个地方会存在一个数据丢失的问题,当然这个丢失只出现在特定的背景下。我们回想一下,leader的HW值是在下一轮FETCH 中才会被更新。接下来分析下这个过程为什么会出现数据丢失。

数据丢失

ack=-1(所有ISR集合中的副本一起才能确认提交)且min.insync.replicas=1(设定ISR能允许的最小副本数)

这时候可能会发生数据丢失。

这种情况下,消息一旦被写入leader就会被认为是“已提交”,而延迟一轮FETCH 更新HW值的设计使得follower HW值是异步延迟更新的,倘若在这个过程中leader发生变更,那么成为新leader的 follower的HW值就有可能是过期的,使得clients端认为是成功提交的消息被删除。

日志截断:副本重启之后,会将副本的HW值设为当前副本的LEO值。

数据丢失的解决方案

在kafka0.11.0.0版本之后,引入了一个leader epoch来解决这个问题,所谓的leader epoch实际上是 一对值(epoch,offset),epoch代表leader的版本号,从0开始递增,当leader发生过变更,epoch 就+1,而offset则是对应这个epoch版本的leader写入第一条消息的offset,比如

(0,0), (1,50) ,表示第一个leader从offset=0开始写消息,一共写了50条。第二个leader版本号是1,从 offset=50开始写,这个信息会持久化在对应的分区的本地磁盘上,文件名是:

/tmp/kafka-log/topic/leader-epoch-checkpoint

leader broker中会保存这样一个缓存,并且定期写入到checkpoint文件中

当leader写log时它会尝试更新整个缓存: 如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的offset

我们基于同样的情况来分析,follower宕机并且恢复之后,有两种情况,如果这个时候leader副本没有挂,也就是意味着没有发生leader选举,那么follower恢复之后并不会去截断自己的日志,而是先发送 一个OffsetsForLeaderEpochRequest请求给到leader副本,leader副本收到请求之后返回当前的 LEO。

如果follower副本的leaderEpoch和leader副本的epoch相同, leader的LEO只可能大于或者等于 follower副本的LEO值,所以这个时候不会发生截断。

如果follower副本和leader副本的epoch值不同,那么leader副本会查找follower副本传过来的 epoch+1在本地文件中存储的StartOffset返回给follower副本,也就是新leader副本在成为leader之前写的最后一条数据的offset。这样也可以避免了数据丢失的问题。

如果leader副本宕机了重新选举新的leader,那么原本的follower副本就会变成leader,意味着epoch 从0变成1,使得原本follower副本中LEO的值的得到了保留。

Leader副本的选举过程

  1. KafkaController会监听ZooKeeper的/brokers/ids节点路径,一旦发现有broker挂了,执行下面 的逻辑。这里暂时先不考虑KafkaController所在broker挂了的情况,KafkaController挂了,各个 broker会重新leader选举出新的KafkaController

  2. leader副本在该broker上的分区就要重新进行leader选举,目前的选举策略是

    1. 优先从ISR列表中选出第一个作为leader副本,这个叫优先副本,理想情况下有限副本就是该分区的leader副本

    2. 如果ISR列表为空,则查看该topic的unclean.leader.election.enable配置。

      unclean.leader.election.enable:为true则代表允许选用非ISR列表中的副本作为leader,那么此时就意味着数据可能丢失;为 false的话,则表示不允许,直接抛出NoReplicaOnlineException异常,造成leader副本选举失败。

    3. 如果上述配置为true,则从其他副本中选出一个作为leader副本,并且isr列表只包含该leader 副本。一旦选举成功,则将选举后的leader和ISR和其他副本信息写入到该分区的对应的zk路径上。