消费者

stateDiagram-v2  state 主题T1 {    分区0 分区1 分区2 分区3  }  state 消费者群组1 {    消费者1 消费者2 消费者3 消费者4  }  state 消费者群去2 {    消费者5 消费者6  }  分区0 --> 消费者1  分区1 --> 消费者2  分区2 --> 消费者3  分区3 --> 消费者4  分区0 --> 消费者5  分区1 --> 消费者5  分区2 --> 消费者6  分区3 --> 消费者6

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系

Properties props = new Properties();//kafka 集群,broker-listprops.put("bootstrap.servers", "172.24.211.140:9092");props.put("group.id", "consumer1");props.put("key.deserializer",        "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",        "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String,                        String>(props);consumer.subscribe(List.of("test"));while(true){    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));    for (ConsumerRecord<String, String> record : records) {        System.err.println(record);    }}

消费方式:

采用 pull(拉)模式从 broker 中读取数据

相较于服务器 push 模式,pull模式使消费者可以根据自己的情况来决定消费的速率,另一方面,pull模式每次也可以批量拉取数据,提高服务器吞吐量,而pull模式一旦有消息产生,就会立马推送,这意味着网络开销会比pull模式高一些。

但pull模式也有一些缺陷,如果没有消息的话,客户端就会在那里空转等待,所以Kafka为消费者的poll方法指定了一个时间参数,避免空转浪费CPU资源

配置

偏移量

每一个消费者组都会对有进行消费的主题分区的偏移量进行记录,这也就意味着消费者可以手动设置这个偏移量从头消费数据。

更新分区当前偏移量的操作叫作提交

Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic为__consumer_offsets。

__consumer_offsets的key为(主题、分区号、消费组),value 为(版本号、偏移量、元数据、提交时间戳、leader epoch、过期时间戳),随着不断提交偏移量,这个主题的消息会越来越多,所以会有一个线程定时对这个主题 compact,因为只需要保留一个最新的偏移量即可

__consumer_offsets 位移主题中的每个消费者组只能被一个 Coordinator 所管理,这个 Coordinator 通过使用 grouId hashCode 取余决定由哪台 broker 来进行管理

消费者获取偏移量不是直接消费 __consumer_offsets 位移主题,而是 Coordinator 会异步地去消费这个位移主题的数据,填充到元数据缓存中,对外提供查询的是元数据缓存

自动提交:

手动提交:

把 auto.commit.offset 设为 false ,使用 commitSync()

异步提交 commitAsync() , 但该方法在发生错误时不会进行重试

消费者提交偏移量,都是向 coordinator 所在的 broker 提交,coordinator 专门为 消费组服务,负责执行再平衡以及提供位移管理和组成员管理等

要确定 coordinator,首先确定由位移主题的哪个分区来保存该消费组的数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。接下来找出该分区 leader 副本所在的 broker,该 broker 即为对应的 coordinator

再均衡监听:

订阅的时候传入 ConsumerRebalanceListener 实现相关接口

从特定偏移量开始处理:seekToBeginning(..)

读取特定偏移量:seek(..)

Kafka 消费异常没有手动提交导致的问题

假设本次偏移量消费到28 抛出异常 没有手动提交偏移量

该消费者下次迭代也会从29开始消费 为解决这个问题 所以只能新起消费者来替代此消费者

CommitFailedException

重设偏移量

再平衡

如果未能及时发送心跳或者消费时间过长,消费者都会被认为已经死了,此时消费者数量会发生变化

当消费者数量、订阅的主题数或者分区数发生变更时,就会触发消费组的再平衡操作,即重新分配消费者与分区的映射关系,再平衡期间,所有消费都会停止,消费者越多这个过程越慢,停止消费的时间就越长

0.11 之后的版本推出了 StickyAssignor,即有粘性的分区分配策略,使用这个策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动

消费者组状态机:

stateDiagram-v2  Empty --> Dead: 组信息过期被删除  Empty --> PreparingRebalance: 准备开启rebalance  PreparingRebalance --> Empty: 组内所有成员离组  PreparingRebalance --> Dead: 位移主题分区leader发生变更  PreparingRebalance --> CompletingRebalance: 有成员入组  CompletingRebalance --> PreparingRebalance: 有成员加入或离开  CompletingRebalance --> Stable: leader完成分配  Stable --> PreparingRebalance: 心跳过期/成员离组/新成员加入  Stable --> Dead: 位移主题分区leader发生变更
sequenceDiagram  消费领导者 ->> coordinator: joingroup(订阅主题)  消费者1 ->> coordinator: joingroup(订阅主题)  消费者2 ->> coordinator: joingroup(订阅主题)  coordinator -->> 消费领导者: 当前组成员(1,2)  coordinator -->> 消费者1: 消费领导者为0  coordinator -->> 消费者2: 消费领导者为0  消费领导者 ->> 消费领导者: 制订消费方案  消费领导者 ->> coordinator: 提交消费方案  coordinator -->> 消费者1: 下发消费方案  coordinator -->> 消费者2: 下发消费方案

退出

其他线程调用consumer.wakeup() 会使consumer在poll抛出异常 然后进行close即可

没有群组的消费者

调用assign为其设置消费的分区

消费进度监控

(start)----lead-----(consume offset)---------lag--------(end)--->