前言:
大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。
介绍:
我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。
给大家解释配置含义。
1.Kafka配置代码
public KafkaConsumer<String, String> getCustomer() {
// 1. 配置属性参数
Properties properties = new Properties();
// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
// 设置消费者是否自动提交offset,true表示自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 设置自动提交offset的时间间隔(单位:毫秒)
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// 设置每次poll操作返回的最大记录数
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
// 根据配置属性创建Kafka消费者实例
return new KafkaConsumer<>(properties);
}
2.Kafka消费者代码
@Test
void KafkaConsumerTest() {
// 创建Kafka消费者实例,通过getCustomer()方法获取
KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();
// 订阅要消费的主题,这里是 "test-topic"
consumer.subscribe(Collections.singletonList("test-topic"));
// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));
for (ConsumerRecord<String, String> record : records) {
// 处理消息的逻辑
// 打印消息的offset、key和value
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。
boolean flag = true;
if (flag){
// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息
// 如果需要手动提交offset,可以取消注释下面的代码
// consumer.commitAsync();
// 由于flag为true,这里会跳出循环,不再处理后续的消息
break;
}
}
// 关闭消费者,释放资源
consumer.close();
// 打印结束消费的日志
System.out.println("结束消费");
}