kafka-分区分配策略及分配流程

当出现以下几种情况时,kafka会进行一次分区分配操作,也就是kafka consumer的rebalance

  1. 同一个consumer group内新增了消费者

  2. 消费者离开当前所属的consumer group,比如主动停机或者宕机

  3. topic新增了分区(也就是分区数量发生了变化)

kafka consuemr的rebalance机制规定了一个consumer group下的所有consumer如何达成一致来分配订阅的topic的每个分区。

本文涉及到的配置项:

消费者配置: partition.assignment.strategy

1. 分区分配策略

同一个group中的消费者对于一个topic中的多个partition,存在一定的分区分配策略。在kafka中,存在三种分区分配策略,一种是Range(默认)、 另一种是RoundRobin(轮询)、StickyAssignor(粘性)。 在消费端中的ConsumerConfifig中,通过这个属性来指定分区分配策略

1
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
1.2 RangeAssignor(范围分区)

Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

1
2
3
假设n = 分区数/消费者数量 
m= 分区数%消费者数量
那么前m个消费者每个分配n+l个分区,后面的(消费者数量-m)个消费者每个分配n个分区

设我们有10个分区,3个消费者,那么n=10/3=3m=10%3=1,那么前1个消费者分配4个分区,结果如下:

C1———– 0, 1, 2, 3

C2———– 4, 5, 6

C3———– 7, 8, 9

假如我们有11个分区,n=10/3=3m=10%3=2,那么前2个消费者个分配4个分区,分配的结果看起来是这样的:

C1———– 0, 1, 2, 3

C2———– 4, 5, 6, 7

C3———– 8, 9, 10

假如group消费2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

C1 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区

C2 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区

C3 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出来每个主题将独自进行分区分配,C1负载最大。

因此这种方式的弊端很明显:分配不均匀,主题越多,排在前面的消费者负载越大

1.2 RoundRobinAssignor(轮询分区)

轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序。最后通过轮询算法分配partition给消费线程。如果所有consumer实例的订阅是相同的,那么partition会均匀分布。

在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-

2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果

为:

C1-0 将消费 T1-5, T1-2, T1-6 分区;

C1-1 将消费 T1-3, T1-1, T1-9 分区;

C2-0 将消费 T1-0, T1-4 分区;

C2-1 将消费 T1-8, T1-7 分区;

使用轮询分区策略必须满足两个条件

  1. 每个主题的消费者实例具有相同数量的流

  2. 每个消费者订阅的主题必须是相同的

1.3 StrickyAssignor (粘性分区)

kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘滞策略,它主要有两个目的假设

  • 分区的分配尽可能的均匀

  • 分区的分配尽可能和上次分配保持相同

当两者发生冲突时, 第 一 个目标优先于第二个目标。 鉴于这两个目标, StickyAssignor分配策略的具体实现要比RangeAssignor和RoundRobinAssi gn or这两种分配策略要复杂得多,假设我们有这样一个场景

假设消费组有3个消费者:C0,C1,C2,它们分别订阅了4个Topic(t0,t1,t2,t3),并且每个主题有两个分

区(p0,p1),也就是说,整个消费组订阅了8个分区:t0p0、 t0p1 、 t1p0 、 t1p1 、 t2p0 、

t2p1、t3p0 、 t3p1

那么最终的分配场景结果为

C0: t3p1 、t1p1 、 t3p0

Cl: t0p1 、t2p0 、 t3p1

C2: t1p0 、t2p1

这种分配方式有点类似于轮询策略,但实际上并不是,因为假设这个时候,C1这个消费者挂了,就势必会造成

重新分区(reblance),如果是轮询,那么结果应该是

C0: t3p1 、t1p0 、t2p0、t3p0

C2: t0p1 、t1p1 、t2p1、t3p1

然后,strickyAssignor它是一种粘滞策略,所以它会满足分区的分配尽可能和上次分配保持相同,所以

分配结果应该是

消费者C0: t0p0、t1p1 、 t3p0、t2p0

消费者C2: t1p0、t2p1、t0p1、t3p1

也就是说,C0和C2保留了上一次是的分配结果,并且把原来C1的分区分配给了C0和C2。 这种策略的好处是

使得分区发生变化时,由于分区的“粘性,减少了不必要的分区移动。

2. Rebalance

Kafka提供了一个角色:coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。

consumer group如何确定自己的coordinator是谁呢,,首先启动的消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,请求参数就一个group_id,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator,之后只要这个borker不宕机,它永远都是这个消费者组的协调者。

在rebalance之前,需要保证coordinator已经确定好了,整个rebalance的过程分为两个步骤,JoinSync

2.1 JoinGroup的过程

join: 表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送JoinGroupRequest的请求。coordinator会从发送请求的消费者中选择一个担任leader角色,并把组成员信息和组订阅信息发送给消费者leader。

leader选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似于随机算法

protocol_metadata: 序列化后的消费者的订阅信息

leader_id: 消费组中的消费者,coordinator会选择一个作为leader,对应的就是member_id

member_metadata:对应消费者的订阅信息

members:consumer group中全部的消费者的订阅信息

generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是一样的,对于每一轮rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就是上一轮的consumer成员无法提交offset到新的consumer group中。

每个消费者都可以设置自己的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现整体的分区分配,这个”赞同”的规则是,消费组内的各个消费者会通过投票来决定

  • 在joingroup阶段,每个consumer都会把自己支持的分区分配策略发送到coordinator
  • coordinator收集到所有消费者的分配策略,组成一个候选集
  • 每个消费者需要从候选集里找出一个自己支持的策略,并且为这个策略投票
  • 最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略
2.2 Synchronizing Group State阶段

consumer leader完成分区分配之后,就进入了Synchronizing Group State阶段,主要逻辑是所有的消费之向coordinator发送SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer

注意是每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,此时其他消费者只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到SyncGroupResponse中响应给所有消费者。这样所有成员都知道自己应该消费哪个分区。

consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

3. 总结

  • 消费者可以通过配置“partition.assignment.strategy”来选择分区的分配策略,有三种策略:range、roundRobin、strick
  • 一个消费者组第一个消费者启动的时候,会向集群中任意一个broker发送一个GroupCoordinatorRequest请求,集群返回当前负载最小的broker成为该group的coordinator
  • 三种情况会引起Rebalance操作:有新的消费者加入组;有消费者离开组;topic增加新的分区

  • Rebalance第一步join,所有组内在线的消费者向coordinator发送joinGroup请求,coordinator收到所有的消费者请求后指定一个消费者leader(一般是第一个加入组的消费者),将组成员信息和组订阅信息发送给leader

  • leader根据分配策略进行分配,完成分配之后将分配方案发送给coordinator,使用SyncGroupRequest请求,所有的消费者都会发送该请求,但只有leader的请求会携带分配方案。
  • coordinator收到分配方案之后,降费配方案放在SyncGroupResponse响应中,响应给所有的消费者,所有的消费者都直到自己应该消费哪些分区了。