文章目录
- 背景
- 问题复现
- 解决问题
- 原理分析
- fetch.min.bytes
- fetch.max.wait.ms
- 源码分析
- ReplicaManager#fetchMessages
背景
开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。
问题复现
- kafka maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
- 配置消费者
@Configuration
public class KafkaBlukConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.max-poll-records:30}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.groupId:group1}")
private String group;
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消费者批量⼯程
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
- 消费端代码
@Component
public class KafkaBatchConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);
@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")
public void consume(List<ConsumerRecord<String, String>> record) throws Exception {
log.info("KafkaBatchConsumer recode size : {} ", record.size());
}
}
- 使用yml配置生产者
spring:
kafka:
bootstrap-servers: 192.168.56.112:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 使用生产者发送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {
// 自定义的主题名称
public static final String TOPIC_NAME = "topic2";
private KafkaTemplate<String, String> kafkaTemplate;
/**
* http://localhost:8080/kafka/send?msg=a
* @param msg
*/
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
log.info("准备发送消息为:{}", msg);
// 1.发送消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
// 2.发送失败的处理
log.error("生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> stringObjectSendResult) {
// 3.发送成功的处理
log.info("生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
return "接口调用成功";
}
}
- 发送消息,观察消费者批量消费情况
http://localhost:9999/kafka/send?msg=a
多次调用发现如下:
发现拉取消息的大小始终为1
解决问题
- 添加下面两行代码
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
################ 添加下面两行 ###########
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);
######################################
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
- 再次发送消息,观察消费情况
可以看到批量消费成功。
原理分析
fetch.min.bytes
消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
fetch.max.wait.ms
如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。
源码分析
ReplicaManager#fetchMessages
/**
* 能够立即返回给客户端的4种情况
* 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置
* 2. fetch请求要拉取的分区为空
* 3. 根据fetch.min.bytes的设置,有足够的数据返回
* 4. 出现异常
*/
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
// fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions)
}
// 调用响应回调函数
responseCallback(fetchPartitionData)
}