kafka-提交offset存储

每个topic可以划分多个分区partition,同一个topic下的不同分区存储的消息是不重复的。在每个消息被分配给一个分区时,会生成一个偏移量offset,它是消息在分区中的唯一编号。kafka通过offset来保证消息在分区内的顺序,分区之间不能保证消息的顺序性。

消费者可以通过以下配置来开启自动提交:

1
2
3
4
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_2");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 开启自动提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //自动提交时间间隔
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //消费者启动时从哪里开始消费

对于消费者来说,每次消费一个消息并且提交以后,kafka会保存当前消费到的最后的一个offset,那么offset保存在哪里?

在kafka中提供了一个名为__consumer_offsets-*的topic,默认有50个分区。消费者消费数据并提交之后,offset将存储到该topic的某个分区中。

如何确定在哪个分区?通过以下公式

1
Math.abs("group_2".hashCode()%50)

能获取一个0-49的整型值,假如是0,那么意味着当前group的offset信息保存在__consumer_offsets-0这个文分区中。

执行以下命令可以查看指定分区中offset位移提交信息

1
kafka-console-consumer.sh --topic __consumer_offsets --partition 0 --bootstrap server 192.168.2.112:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetMessageFormatter'

结果如下图所示:

根据结果可以看出,组gropu_2消费的test-topic的0分区,offset是40