springboot kafka 提高拉取数量

文章目录

  • 背景
  • 问题复现
  • 解决问题
  • 原理分析
    • fetch.min.bytes
    • fetch.max.wait.ms
    • 源码分析
      • ReplicaManager#fetchMessages

背景

开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。

问题复现

  • kafka maven依赖
		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.11</version>
        </dependency>
  • 配置消费者
@Configuration
public class KafkaBlukConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.max-poll-records:30}")
    private Integer maxPollRecords;
    @Value("${spring.kafka.consumer.groupId:group1}")
    private String group;

    /**
     * 消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * 消费者批量⼯程
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }
}

  • 消费端代码

@Component
public class KafkaBatchConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);

    @KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")
    public void consume(List<ConsumerRecord<String, String>> record) throws Exception {
        log.info("KafkaBatchConsumer recode size : {} ", record.size());
    }

}

  • 使用yml配置生产者
spring:
  kafka:
    bootstrap-servers: 192.168.56.112:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 使用生产者发送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {

    // 自定义的主题名称
    public static final String TOPIC_NAME = "topic2";
    
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * http://localhost:8080/kafka/send?msg=a
     * @param msg
     */
    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        log.info("准备发送消息为:{}", msg);
        // 1.发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 2.发送失败的处理
                log.error("生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                // 3.发送成功的处理
                log.info("生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
        return "接口调用成功";
    }
}

  • 发送消息,观察消费者批量消费情况
http://localhost:9999/kafka/send?msg=a

多次调用发现如下:

在这里插入图片描述
发现拉取消息的大小始终为1

解决问题

  • 添加下面两行代码
@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        ################ 添加下面两行 ###########
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);
        ######################################
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
  • 再次发送消息,观察消费情况

在这里插入图片描述
可以看到批量消费成功。

原理分析

fetch.min.bytes

消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

fetch.max.wait.ms

如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。

源码分析

ReplicaManager#fetchMessages

/**
      * 能够立即返回给客户端的4种情况
      * 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置
      * 2. fetch请求要拉取的分区为空
      * 3. 根据fetch.min.bytes的设置,有足够的数据返回
      * 4. 出现异常
      */
    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
      // fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合
      val fetchPartitionData = logReadResults.map { case (tp, result) =>
        tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
          result.lastStableOffset, result.info.abortedTransactions)
      }
      // 调用响应回调函数
      responseCallback(fetchPartitionData)
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/667078.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式】结构型-门面模式

前言 在软件开发中&#xff0c;设计模式是解决特定问题的经验总结&#xff0c;为开发者提供了一种可复用的解决方案。其中&#xff0c;门面模式&#xff08;Facade Pattern&#xff09;是一种结构型模式&#xff0c;旨在为复杂系统提供简化的接口&#xff0c;使客户端与系统之…

GPT-4o(OpenAI最新推出的大模型)

简介&#xff1a;最近&#xff0c;GPT-4o横空出世。对GPT-4o这一人工智能技术进行评价&#xff0c;包括版本间的对比分析、GPT-4o的技术能力以及个人感受等。 方向一&#xff1a;对比分析 GPT-4o&#xff08;OpenAI最新推出的大模型&#xff09;与GPT-4之间的主要区别体现在响应…

微服务学习Day8

文章目录 Sentinel雪崩问题服务保护框架Sentinel配置 限流规则快速入门流控模式流控效果热点参数限流 隔离和降级FeignClient整合Sentinel线程隔离&#xff08;舱壁模式&#xff09;熔断降级 授权规则及规则持久化授权规则自定义异常结果持久化 Sentinel 雪崩问题 服务保护框架…

SpringBoot打war包并配置外部Tomcat运行

简介 由于其他原因&#xff0c;我们需要使用SpringBoot打成war包放在外部的Tomcat中运行,本文就以一个案例来说明从SpringBoot打war包到Tomcat配置并运行的全流程经过 环境 SpringBoot 2.6.15 Tomcat 8.5.100 JDK 1.8.0_281 Windows 正文 一、SpringBoot配置打war包 第一步&a…

怎么通过互联网远程控制电脑?

远程访问又称为网络远程控制&#xff0c;它使用户能够通过互联网连接两台设备以解决问题。进行控制的电脑称为控制端&#xff0c;被控制的电脑则称为被控端。在远程访问过程中&#xff0c;控制端电脑掌握整个连接的操作。远程控制软件会捕获被控端电脑的操作&#xff0c;并在主…

JS数组怎么去重?| JavaScript中数组去重的14种方法

目录 一、利用for循环嵌套去重 二、利用splice() for循环嵌套&#xff08;ES5中最常用&#xff09; 三、利用indexOf() for循环去重 四、利用sort() for循环去重 五、利用includes() for循环去重&#xff08;ES7&#xff09; 六、利用Object键值对及其hasOwnProperty…

521源码网-免费网络教程-Cloudflare使用加速解析-优化大陆访问速度

Cloudfalre 加速解析是由 心有网络 向中国大陆用户提供的公共优化服务 接入服务节点: cf.13d7s.sit 接入使用方式类似于其它CDN的CNAME接入&#xff0c;可以为中国大陆用户访问Cloudflare网络节点大幅度加速&#xff0c;累计节点130 如何接入使用 Cloudflare 加速解析&#…

LabVIEW中进行步进电机的位置控制

在LabVIEW中进行步进电机的位置控制&#xff0c;通常涉及以下几个关键步骤&#xff1a;设置硬件、配置通信、编写控制算法和实施反馈控制。以下是一个详细的介绍。 硬件设置 步进电机&#xff1a;选择合适的步进电机&#xff0c;根据负载和应用需求选择适当的步数和转矩。 驱…

Kafka broker的新增和剔除(服役与退役)

说明&#xff1a;集群现有broker:node1,node2,node3三个,broker.id分别为0&#xff0c;1&#xff0c;2 已有两个topic&#xff1a;products、cities 1、退役&#xff08;Kafka集群中减少一个服务器broker2&#xff09; 退役后要保证剩下的服务器数量大于等于备份数&#xff0c…

SpringBoot+layui实现Excel导入操作

excel导入步骤 第三方插件引入插件 效果图 &#xff08;方法1&#xff09;代码实现&#xff08;方法1&#xff09;Html代码&#xff08; 公共&#xff09;下载导入模板 js实现 &#xff08;方法1&#xff09;上传文件实现 效果图&#xff08;方法2&#xff09;代码实现&#xf…

智慧医院物联网建设-统一管理物联网终端及应用

近年来&#xff0c;国家卫健委相继出台的政策和评估标准体系中&#xff0c;都涵盖了强化物联网建设的内容。物联网建设已成为智慧医院建设的核心议题之一。 作为医院高质量发展的关键驱动力&#xff0c;物联网的顶层设计与网络架构设计规划&#xff0c;既需要结合现代信息技术的…

AI炒股-批量爬取网易财经的要闻板块

工作任务和目标&#xff1a;批量爬取网易财经的要闻板块 在class"tab_body current"的div标签中&#xff1b; 标题和链接在&#xff1a;<a href"https://www.163.com/dy/article/J2UIO5DD051188EA.html">华为急需找到“松弛感”</a> 第一步&…

linux磁盘满了,如何查找大文件清除?

将整个Linux中文件按照文件大小排序&#xff0c;从大到小排序 只显示前100条数据 命令&#xff1a; find / -type f -exec du -h {} | sort -rh | head -n 100结果&#xff1a;

Llama改进之——分组查询注意力

引言 今天介绍LLAMA2模型引入的关于注意力的改进——分组查询注意力(Grouped-query attention,GQA)1。 Transformer中的多头注意力在解码阶段来说是一个性能瓶颈。多查询注意力2通过共享单个key和value头&#xff0c;同时不减少query头来提升性能。多查询注意力可能导致质量下…

联芸科技偏高的关联交易:业绩波动性明显,海康威视曾拥有一票否决

《港湾商业观察》施子夫 5月31日&#xff0c;上交所上市审核委员会将召开2024年第14次审议会议&#xff0c;届时将审议联芸科技&#xff08;杭州&#xff09;股份有限公司招股书&#xff08;以下简称&#xff0c;联芸科技&#xff09;的首发上会事项。 据悉&#xff0c;此次系…

php反序列化学习(3)

1、session 当session_start()被调用或者php.ini中session.auto_start为1时&#xff0c;php内部调用会话管理器&#xff0c;访问用户session被序列化后&#xff0c;存储到指定目录&#xff08;默认为/tmp&#xff09;。 漏洞产生&#xff1a;写入格式与读取格式不一致 处理器…

C# 代码配置的艺术

文章目录 1、代码配置的定义及其在软件工程中的作用2、C# 代码配置的基本概念和工具3、代码配置的实践步骤4、实现代码配置使用属性&#xff08;Properties&#xff09;使用配置文件&#xff08;Config Files&#xff09;使用依赖注入&#xff08;Dependency Injection&#xf…

模拟建造游戏:城市:天际线Cities: Skylines for Mac/win中文原生版

《城市&#xff1a;天际线》&#xff08;Cities: Skylines&#xff09;是一款由Colossal Order开发&#xff0c;Paradox Interactive发行的城市建设模拟游戏。这款游戏于2015年首次发布&#xff0c;迅速赢得了玩家和评论家的好评&#xff0c;并成为了备受欢迎的城市建设游戏之一…

Centos7.9环境下keepalived结合nginx实现负载均衡的高可用(亲测版)

目录 一、负载均衡高可用解释 二、安装 三、Nginx检查脚本创建 四、修改keepalived配置文件 一、负载均衡高可用解释 nginx 作为负载均衡器&#xff0c;所有请求都到了nginx&#xff0c;如果nginx服务器宕机后端web服务将无法提供服务&#xff0c;影响严重。这样nginx作为负…

使用 Django Model 构建强大的数据库模型

文章目录 创建一个简单的 Django Model迁移数据库使用 Django Shell 操作模型Django Admin结论 在 Django 中&#xff0c;Model 是构建数据库模型的基础。它允许开发人员定义数据的结构&#xff0c;并提供了方便的方式来与数据库进行交互。本文将介绍如何使用 Django Model 来创…