文章目录
- 一、消费消息概览
- 1.1、基本代码
- 1.2、消费过程
- 二、消费者组
- 2.1、push & pull
- 2.2、消费者组
- 三、调度器Coordinator
- 四、消费者分配策略
- 五、偏移量offset
- 5.1、起始偏移量
- 5.2、指定偏移量消费
- 5.3、偏移量提交
- 5.3.1、自动提交
- 5.3.2、手动提交
- 5.4、偏移量的保存
- 六、消费者事务
如果想了解kafka基础架构、生产者架构和kafka存储消息可以参考 kafka基础、 Kafka进阶_1.生产消息和 kafka进阶_2.存储消息。
一、消费消息概览
1.1、基本代码
public class KafkaConsumerTest {
public static void main(String[] args) {
// TODO 创建消费者配置参数集合
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// TODO 通过配置,创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
// TODO 订阅主题
consumer.subscribe(Collections.singletonList("test"));
// TODO 消费数据
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
// TODO 遍历数据
for (ConsumerRecord<String, String> record : poll) {
System.out.println( record.value() );
}
// TODO 关闭消费者
consumer.close();
}
}
主要的属性配置:
参数名 | 参数作用 | 类型 | 推荐值 |
---|---|---|---|
bootstrap.servers | 集群地址 | 必须 | |
key.deserializer | 对数据Key进行反序列化的类完整名称 | 必须 | Kafka提供的字符串反序列化类:StringSerializer |
value.deserializer | 对数据Value进行反序列化的类完整名称 | 必须 | Kafka提供的字符串反序列化类:ValueSerializer |
group.id | 消费者组ID,用于标识完整的消费场景,一个组中可以包含多个不同的消费者对象 |
1.2、消费过程
消费者消费数据时,一般情况下,只是设定了订阅的主题名称,那是如何消费到数据的呢。我们这里说一下服务端拉取数据的基本流程:
- 服务端获取到用户拉取数据的请求:Kafka消费客户端会向Broker发送拉取数据的请求FetchRequest,服务端Broker获取到请求后根据请求标记FETCH交给应用处理接口KafkaApis进行处理。
- 通过副本管理器拉取数据:副本管理器需要确定当前拉取数据的分区,然后进行数据的读取操作。
- 判定首选副本:2.4版本前,数据读写的分区都是Leader分区,从2.4版本后,kafka支持Follower副本进行读取。主要原因就是跨机房或者说跨数据中心的场景,为了节约流量资源,可以从当前机房或数据中心的副本中获取数据。这个副本称之为首选副本。
- 拉取分区数据:Kafka的底层读取数据是采用日志段LogSegment对象进行操作的。
- 零拷贝:为了提高数据读取效率,Kafka的底层采用nio提供的FileChannel零拷贝技术,直接从操作系统内核中进行数据传输,提高数据拉取的效率。
二、消费者组
从数据处理的角度来讲,消费者和生产者的处理逻辑都相对比较简单。Producer生产者的基本数据处理逻辑就是向Kafka发送数据,并获取Kafka的数据接收确认响应。
而消费者的基本数据处理逻辑就是向Kafka请求数据,并获取Kafka返回的数据。
逻辑确实很简单,但是Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,所以在很多细节上进行了扩展和改善:比如生产者可以指定分区,可以异步和同步发送数据,可以进行幂等性操作和事务处理。对应的,消费者功能和处理细节也进行了扩展和改善。
2.1、push & pull
Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。
不过,Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,它的主题是允许多个分区的,那么就会发现不同的消费数据的方式区别还是很大的。
1、如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络、存储等资源造成极大的压力,影响吞吐量和数据传输效率。
2.如果kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向kafka申请(拉取)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。
2.2、消费者组
消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。如果希望提高消费者的消费能力,并且减少kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。
不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组 Consumer Group。在kafka中,每个消费者都对应一个消费组,消费者可以是一个线程,一个进程,一个服务实例,如果kafka想要消费消息,那么需要指定消费哪个topic的消息以及自己的消费组id(groupId)。
三、调度器Coordinator
消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(Group Coordinator)。Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator。
四、消费者分配策略
五、偏移量offset
偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据,如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。
5.1、起始偏移量
在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset
,用于从最开始获取主题数据:
public class KafkaConsumerTest {
public static void main(String[] args) {
// TODO 创建消费者配置参数集合
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
paramMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// TODO 通过配置,创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
// TODO 订阅主题
consumer.subscribe(Arrays.asList("test"));
while ( true ) {
// TODO 消费数据
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
// TODO 遍历数据
for (ConsumerRecord<String, String> record : poll) {
System.out.println( record );
}
}
}
}
auto.offset.reset有三种取值:
1、earliest
对于同一个消费者组,从头开始消费。就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费(未提交偏移量的场合)。
2、latest
对于同一个消费者组,消费者只能消费到连接topic后,新产生的数据(未提交偏移量的场合)。
3、none:生产环境不使用。
5.2、指定偏移量消费
除了上一节所讲的从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定偏移量的位置开始消费数据:
public class KafkaConsumerOffsetTest {
public static void main(String[] args) {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> c = new KafkaConsumer<String, String>(paramMap);
// TODO 订阅主题
c.subscribe(Collections.singletonList("test"));
// TODO 拉取数据,获取基本集群信息
c.poll(Duration.ofMillis(100));
// TODO 根据集群的基本信息配置需要消费的主题及偏移量
final Set<TopicPartition> assignment = c.assignment();
for (TopicPartition topicPartition : assignment) {
if ( topicPartition.topic().equals("test") ) {
c.seek(topicPartition, 0);
}
}
// TODO 拉取数据
while (true) {
final ConsumerRecords<String, String> poll = c.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : poll) {
System.out.println( record.value() );
}
}
}
}
5.3、偏移量提交
生产环境中,消费者可能因为某些原因或故障重新启动消费,那么如果不知道之前消费数据的位置,重启后再消费,就可能重复消费(earliest)或漏消费(latest)。所以Kafka提供了保存消费者偏移量的功能,而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。
注意:一旦消费者提交了偏移量,那么kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset参数是不起作用的。
5.3.1、自动提交
所谓的自动提交就是消费者消费完数据后,无需告知kafka当前消费数据的偏移量,而是由消费者客户端API周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms
,可以通过配置进行修改。
public class KafkaConsumerCommitAutoTest {
public static void main(String[] args) {
// TODO 创建消费者配置参数集合
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 启用自动提交消费偏移量,默认取值为true
paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// TODO 设置自动提交offset的时间周期为1000ms,默认5000ms
paramMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// TODO 通过配置,创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
// TODO 订阅主题
consumer.subscribe(Arrays.asList("test"));
while ( true ) {
// TODO 消费数据
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
// TODO 遍历数据
for (ConsumerRecord<String, String> record : poll) {
System.out.println( record );
}
}
}
}
5.3.2、手动提交
基于时间周期的偏移量提交,是我们无法控制的,一旦参数设置的不合理,或单位时间内数据量消费的很多,却没有来及的自动提交,那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交,也就是说当消费者消费完数据后,自行通过API进行提交。不过为了考虑效率和安全,kafka同时提供了异步提交和同步提交两种方式供我们选择。注意:需要禁用自动提交ENABLE_AUTO_COMMIT_CONFIG=false,才能开启手动提交:
1、异步提交
向Kafka发送偏移量offset提交请求后,就可以直接消费下一批数据,因为无需等待kafka的提交确认,所以无法知道当前的偏移量一定提交成功,安全性比较低,但相对,消费性能会提高
public class KafkaConsumerCommitASyncTest {
public static void main(String[] args) {
// TODO 创建消费者配置参数集合
Map<String, Object> paramMap = new HashMap<>();
paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 禁用自动提交消费偏移量,默认取值为true
paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// TODO 通过配置,创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
// TODO 订阅主题
consumer.subscribe(Arrays.asList("test"));
while ( true ) {
// TODO 消费数据
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
// TODO 遍历处理数据
for (ConsumerRecord<String, String> record : poll) {
System.out.println( record );
}
// TODO 异步提交偏移量
// 此处需要注意,需要在拉取数据完成处理后再提交
// 否则提前提交了,但数据处理失败,下一次消费数据就拉取不到了
consumer.commitAsync();
}
}
}
2、同步提交
必须等待Kafka完成offset提交请求的响应后,才可以消费下一批数据,一旦提交失败,会进行重试处理,尽可能保证偏移量提交成功,但是依然可能因为意外情况导致提交请求失败。此种方式消费效率比较低,但是安全性高。
5.4、偏移量的保存
由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己以及消费的位置。0.90版本之前,这个信息是记录在zookeeper内的,在0.90之后的版本,offset保存在_consumer_offsets
这个topic内。
每个consumer会定期将自己消费分区的offset提交给_consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号
: