💻目录
- 前言
- 一、依赖
- 二、原生使用kafka
- 1、发送消息
- 1.1、生产者同步发送消息
- 1.2、生产者异步发送消息
- 1.3、常用配置:
- 2、接收消息
- 2.1、关于消费者的自动提交和手动提交
- 2.2、长轮训poll消息
- 2.3、消费者的健康状态检查
- 2.4、指定分区和偏移量,时间消费
- 2.5、新消费组的消费offset规则
- 三、Spring boot配置连接kafka
- 1、配置yml配置文件
- 2、配置生产者
- 3、配置消费者
前言
本文主要是介绍通过使用原生代码方式和结合springboot分别如何更好的去使用理解kafka
如果需要看理论或者安装kafka可以看我前面两篇内容
🍅kafka使用和安装
一、依赖
主要分为springboot和原生代码的依赖,还有hutool工具包
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- springbootkafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- springmvc原生kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
二、原生使用kafka
构建项目就不做过多说了,普通maven项目就行,重点在于方法API的使用。
1、发送消息
发送消息流程
- 首先是先创建一个Properties对象用于传递配置参数
- 然后通过props.put()方法添加需要添加的配置
- 添加连接kafka地址和设置序列化是必须的,后面的有默认的,可以根据情况设置
- 创建一个KafkaProducer连接客户端
- 创建ProducerRecord发送记录类,发送消息就是通过这个类进行发送
- 发送时可以选择同步或者异步进行发送
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @projectName: kafka-mode
* @package: com.zheng.kafkamode.kafka
* @className: MyProducer
* 消息的发送者---简单发送
* @version: 1.0
*/
@Slf4j
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.211.55.6:9092");
// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
配置ACK
// props.put(ProducerConfig.ACKS_CONFIG,"1");
失败重试次数,3次
// props.put(ProducerConfig.RETRIES_CONFIG,3);
//
失败重试时间间隔,300毫秒后重试
// props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);
// kafka消息缓冲区大小,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
// 连接客户端
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)
// 默认partition数量和Broker创建的数量一致
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", "hello");
// 同步
send(producer,producerRecord);
// 异步
asyncSend(producer,producerRecord);
}
/**
* @param producer: 客户端对象
* @return void
* 同步发送
* @date 2024/3/22 17:09
*/
private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {
// 等待发送成功的阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
+"=====offset:"+metadata.offset());
}
/**
* @param producer: 客户端对象
* @return void
* 异步发送
* @date 2024/3/22 17:09
*/
private static void asyncSend(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException {
int sum = 5;
CountDownLatch countDownLatch = new CountDownLatch(sum);
for (int i = 0; i < sum; i++) {
ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>(TOPIC_NAME, "my-keyValue"+i, "zhangsan"+i);
// 异步发送消息
producer.send(producerRecord1,(metadata, exception) -> {
log.info("异步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
+"=====offset:"+metadata.offset());
countDownLatch.countDown();
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
}
}
1.1、生产者同步发送消息
同步发送收到消息后会回复一个ack
ack有三个参数配置:(默认是1)
- cak = 0:kafka收到消息后,不需要关心消息是否成功写入到分区中,马上就返回ack,
- 会容易丢失消息,但效率最高
- ack = 1 :多副本之间的leader分区已经收到消息,并把消息写入到本地log中,才会返回ack给生产者
- 性能和安全是最均衡的
- ack = -1/all :里面有默认配置min.insync.replicas=2(默认为1,推荐配置大于等于2),
- min.insync.replicas:代表同步副本的个数(如果是1,则是只需要leader收到就可以)
- 最安全但性能最差
生产者如果3秒没有收到回复(ack),则会重试,如果重试3次还没成功,则抛出异常。
- 消息丢失概率较小
1.2、生产者异步发送消息
异步发送不需要等待客户端回复ack
生产者发送消息后就可以做之后的业务,不需要等待broker在收到消息后异步调用生产者提供的callback。
-
会出现消息丢失问题
1.3、常用配置:
- 基础配置
// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");
// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- 发送消息配置
// 配置ACK
props.put(ProducerConfig.ACKS_CONFIG,"1");
// 失败重试次数,3次
props.put(ProducerConfig.RETRIES_CONFIG,3);
// 失败重试时间间隔,300毫秒后重试
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);
-
缓冲区配置
kafka发送消息的流程是先创建一个缓冲区,把消息先发送到缓冲区,然后再有一个本地线程,来这个缓冲区拉取数据,通过本地线程把数据从缓冲区拉取发送到kafka客户端
- kafka默认会创建一个消息缓冲区,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
- 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
- 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
2、接收消息
消费消息流程
- 也是首先是先创建一个Properties对象用于传递配置参数
- 然后在props.put()方法传递参数
- 在创建一个连接客户端携带上Properties。
- 然后通过消费者客户端去poll()消息,默认可以一次可以poll到五百条消息下来,
- Duration.ofMillis(1000):表示如果没poll到五百条,1000ms后也结束这次poll。然后处理poll下来的消息,在继续循环poll下一次
@Slf4j
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");
// 设置消费组名
props.put(ConsumerConfig.GROUP_ID_CONFIG,"CONSUMER_GROUP_NAME");
// 设置序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建一个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true){
// poll() API 是拉取消息的长轮训
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 取出单个offset
for (ConsumerRecord record : records){
log.info("收到的消息:partition = {};offset = {};key = {};value = {}"
,record.partition()
,record.offset()
,record.key()
,record.value());
}
}
}
}
2.1、关于消费者的自动提交和手动提交
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息交给集群的_consumer_offsets主题里面。
-
自动提交
消费者poll消息下来以后就会自动提交offset
// 设置自动提交offset:true=自动提交、false=手动提交。 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 自动提交offset的时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
注意:自动提交可能会丢失消息
-
手动提交
需要把自动提交的配置改为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
手动提交分为两种:
-
手动同步提交
在消费完消息后调用同步方法,会阻塞等待提交成功返回ack
// 取出单个offset for (ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}" ,record.partition() ,record.offset() ,record.key() ,record.value()); } // 所有的消息已消费完 if (records.count()>0){ //有消息 // 手动提交offset,当前线程会阻塞直到offset提交成功 // 一般使用同步提交,因为提交之后页没什么业务逻辑 consumer.commitSync(); }
-
手动异步提交
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
// 取出单个offset for (ConsumerRecord record : records){ log.info("收到的消息:partition = {};offset = {};key = {};value = {}" ,record.partition() ,record.offset() ,record.key() ,record.value()); } // 所有的消息已消费完 if (records.count()>0){ //有消息 // 手动异步提交,异步回调处理 consumer.commitAsync((offsetAndMetadataMap,exception)->{ if (exception != null){ System.out.println("提交失败!失败原因:"+exception.getStackTrace()); } System.out.println("提交成功!=="+offsetAndMetadataMap); }); }
-
2.2、长轮训poll消息
- 默认情况下,消费者一次性会poll500条消息。
// 一下poll消费的消息数,可以根据消费消息的快慢来决定,一次性消费多少消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
- 代码中设置了长轮训的时间是1000毫秒
while (true){
// poll() API 是拉取消息的长轮训,1000代表本次拉取1秒钟就结束
// 拉取消息到records中,
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 取出单个offset进行消费
for (ConsumerRecord record : records){
log.info("收到的消息"}
}
}
意味着:
- 结束单次消费原因可能是:
- 如果一次poll500条,就直接结束这次poll,去进入到for循环去消费这次poll到的消息。
- 如果一次没有poll到500条,且时间在1秒内,那么长轮训继续poll;要么拉取到500条,要么达到1秒,才会结束拉取
- 如果多次poll都没达到500条,且1秒时间到了,那么直接进入for循环。
- 如果两次poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
// 如果两次poll的时间如果超过30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者。
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);
2.3、消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发rebalance机制,将该分区交给消费组里的其他消费者进行消费。
// consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
// 如果超过10s没收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10 * 1000);
2.4、指定分区和偏移量,时间消费
-
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
-
从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
-
指定offset
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0))); consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
-
指定时间节点开始消费
List<PartitionInfo> topicPartition = consumer.partitionsFor(TOPIC_NAME); // key=分区:value=偏移量 HashMap<TopicPartition, Long> map = new HashMap<>(); // 从1小时前开始消费 long fetchDateTime = new Date().getTime() - 1000 * 60 * 60; for (PartitionInfo par : topicPartition){ map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDateTime); } // 根据时间查找指定分区的偏移量 Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry:parMap.entrySet()){ TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; long offset = value.offset(); System.out.println("partition-"+key.partition()+"===offset-"+offset); System.out.println(); // 再通过指定offset消费 if (value != null){ consumer.assign(Arrays.asList(key)); consumer.seek(key,offset); } }
2.5、新消费组的消费offset规则
新消费组的消费者在启动以后,默认会从连接后的分区的offset开始消费(消费新消息)。可以通过下面设置,让新消费者第一次从头开始消费,之后则会只消费新的消费者(最后消费的位置的偏移量+1)
- Latest:默认的,消费新消息
- earliest:第一次从头开始消费。之后开始消费新消息
// 新连接的消费组是否需要从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
三、Spring boot配置连接kafka
1、配置yml配置文件
和原生的区别是把客户端交给spring来管理,通过yml进行配置。配置的参数名字也基本上都是一样的,参数也是一样的。
spring:
kafka:
bootstrap-servers: 10.211.55.6:9092
# 生产者
producer:
retries: 3 #设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 #一次从缓冲区拉取的大小16k
buffer-memory: 33554432 #本地缓冲区大小32m
acks: 1
# 编解码规则(默认)
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者
consumer:
group-id: default-group #消费组
enable-auto-commit: false #手动提交
auto-offset-reset: earliest #新消费组从头消费
#编解码规则
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500 #一次拉取五百条
listener:
ack-mode: MANUAL_IMMEDIATE
# 手动调用acknowledge()后立即提交,一般使用这个
# MANUAL_IMMEDIATE
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于time时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于count时提交
# COUNT
# TIME |COUNT 有一个条件满足时提交
# COUNT_TIME
# poll()拉取一批消息,处理完业务后,手动调用acknowledge()后立即提交
# MANUAL
2、配置生产者
简单创建一个Controller用来生产消息
生产消息主要是通过kafkaTemplate模版类,
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Resource
private KafkaTemplate<String ,Object> kafkaTemplate;
@PostMapping("/test")
public String test(@RequestBody User user){
JSONObject jsonObject = new JSONObject(user);
ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_NAME,0,"key", jsonObject.toString());
return "成功!";
}
}
3、配置消费者
可以通过ConsumerRecord一条一条的接收处理或者通过ConsumerRecords批量接收处理,但我们还是得for一条一条的处理,所以一般选择第一种就好
通过KafkaListener注解,配置接收的topics,以及消费者组id,以及一些其他的消费者信息
如:
listenGroupPro
方法的使用
@Slf4j
@Component
public class MySpringBootConsumer {
/**
* @param record:
* @param ack:
* @return void
* 一次性读取一条,实际上也是一次性接收500条,然后一条消息回调一次,和下面的一样
* @date 2024/3/26 21:12
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "testGroup")
public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){
log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {}"
,record.partition()
,record.offset()
,record.key()
,record.value());
// 手动提交ack,每处理完一条消息提交一次
ack.acknowledge();
}
/**
* @param records:
* @param ack:
* @return void
* 一次性全部接收,指定分组和topic,和上面的区别是提交ack时机会不一样,
* @date 2024/3/26 21:09
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "testGroup1")
public void listenGroupS(ConsumerRecords<String,Object> records, Acknowledgment ack){
for (ConsumerRecord<String,Object> record: records){
log.info("testGroup1收到的消息:partition = {};offset = {};key = {};value = {}"
,record.partition()
,record.offset()
,record.key()
,record.value());
}
// 手动提交ack,处理完records一批消息提交一次
ack.acknowledge();
}
@KafkaListener(groupId = "testGroup2",topicPartitions = {
@TopicPartition(topic = "my-replicated-topic2",partitions = {"0","1"}), //指定多个分区
@TopicPartition(topic = "my-replicated-topic",partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "5")) //从5号offset开始消费
},concurrency = "3") //concurrency就是同区消费组下的消费者个数,建议小于分区总数
public void listenGroupPro(ConsumerRecord<String,String> record, Acknowledgment ack){
log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {};topic={}"
,record.partition()
,record.offset()
,record.key()
,record.value()
,record.topic());
// 手动提交ack,每处理完一条消息提交一次
ack.acknowledge();
}
}