kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        int numConsumers = 5; // 增加消费者的数量
        for (int i = 0; i < numConsumers; i++) {
            new Thread(new KafkaConsumerThread()).start();
        }
    }

    static class KafkaConsumerThread implements Runnable {
        @Override
        public void run() {
            // 配置消费者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", "114.15.78.14:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 调整消费者配置
            props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据
            props.put("fetch.max.wait.ms", "500"); // 最大等待500ms

            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));

            // 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                if (!records.isEmpty()) {
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                    // 批量提交偏移量
                    consumer.commitSync();
                }
            }
        }
    }
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        int numConsumers = 5; // 增加消费者的数量
        for (int i = 0; i < numConsumers; i++) {
            new Thread(new KafkaConsumerThread()).start();
        }
    }

    static class KafkaConsumerThread implements Runnable {
        @Override
        public void run() {
            // 配置消费者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", "114.15.78.14:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 调整消费者配置
            props.put("fetch.min.bytes", "1"); // 减少最小获取字节数
            props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间
            props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数

            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            // 订阅主题
            consumer.subscribe(Collections.singletonList("test-topic"));

            // 消费消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (!records.isEmpty()) {
                    processRecords(records); // 异步处理消息
                    consumer.commitAsync(); // 异步提交偏移量
                }
            }
        }

        private void processRecords(ConsumerRecords<String, String> records) {
            // 异步处理消息的逻辑
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                // 这里可以添加消息处理逻辑,例如使用线程池并行处理
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916011.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Odoo:免费开源的流程制造行业ERP管理系统

概述 聚焦流程制造连续性生产的特性&#xff0c;提供集成PLMERPMESBI的一体化解决方案&#xff0c;涵盖计划、生产、质量、配方、供销、库存、成本、设备、资金管理等业务领域的整体性解决方案 行业的最新洞察&行业典型痛点 一、生产过程需要精细化控制 需要在各种制约…

ERP管理系统(源码+文档+部署+讲解)

本文将深入解析“ERP管理系统”的项目&#xff0c;探究其架构、功能以及技术栈&#xff0c;并分享获取完整源码的途径。 系统概述 ERP管理系统是一款全面的资源规划软件&#xff0c;旨在通过集成各种业务流程和功能模块来提高管理效率和决策质量。该系统覆盖了从基础设置、供…

mysql每日一题(上升的温度,date数据的计算)

日期之间的运算 日期类型的加法运算 data_add(now_data,interval 1 month) select date_add(now(), interval 1 day); -- 加1天 select date_add(now(), interval 1 hour); -- 加1小时 select date_add(now(), interval 1 minute); -- 加1分钟 select date_add(now(), inter…

CTF攻防世界小白刷题自学笔记13

1.fileinclude,难度&#xff1a;1,方向&#xff1a;Web 题目来源:宜兴网信办 题目描述:无 给一下题目链接&#xff1a;攻防世界Web方向新手模式第16题。 打开一看给了很多提示&#xff0c;什么language在index.php的第九行&#xff0c;flag在flag.php中&#xff0c;但事情显…

FFmpeg 4.3 音视频-多路H265监控录放C++开发十三.2:avpacket中包含多个 NALU如何解析头部分析

前提&#xff1a; 注意的是&#xff1a;我们这里是从avframe转换成avpacket 后&#xff0c;从avpacket中查看NALU。 在实际开发中&#xff0c;我们有可能是从摄像头中拿到 RGB 或者 PCM&#xff0c;然后将pcm打包成avframe&#xff0c;然后将avframe转换成avpacket&#xff0…

LabVIEW环境监测系统

随着环境问题的日益严重&#xff0c;环境参数的实时监测成为保障公共健康和生态平衡的重要手段。开发了一款基于LabVIEW开发的环境监测系统&#xff0c;能够对大气中的温度、湿度及二氧化硫浓度进行实时监测&#xff0c;并提供数据存储和超阈值报警功能。 系统组成 本系统由下…

【视觉SLAM】2-三维空间刚体运动的数学表示

读书笔记&#xff1a;学习空间变换的三种数学表达形式。 文章目录 1. 旋转矩阵1.1 向量运算1.2 坐标系空间变换1.3 变换矩阵与齐次坐标 2. 旋转向量和欧拉角2.1 旋转向量2.2 欧拉角 3. 四元数 1. 旋转矩阵 1.1 向量运算 对于三维空间中的两个向量 a , b ∈ R 3 a,b \in \R^3 …

SystemVerilog学习笔记(十):进程/细粒度进程控制

进程 进程或线程是作为独立实体执行的任何代码片段。fork-join块创建并行运行的不同线程。在下面的图-1中&#xff0c;可以看到进程的类型和进程控制。 序号进程描述1.fork-join只有所有子线程执行完毕时&#xff0c;父线程才会执行。2.fork-join_any只有任何一个子线程执行完…

【Visual Studio系列教程】如何在 VS 上编程?

上一篇博客中&#xff0c;我们介绍了《什么是 Visual Studio&#xff1f;》。本文&#xff0c;我们来看第2篇《如何在 VS 上编程&#xff1f;》。阅读本文大约10 分钟。我们会向文件中添加代码&#xff0c;了解 Visual Studio 编写、导航和了解代码的简便方法。 本文假定&…

【3D Slicer】的小白入门使用指南八

3D Slicer DMRI(Diffusion MRI)-扩散磁共振认识和使用 0、简介 大脑解剖 ● 白质约占大脑的 45% ● 有髓神经纤维(大约10微米轴突直径) 白质探索 朱尔斯约瑟夫德杰林(Jules Joseph Dejerine,《神经中心解剖学》(巴黎,1890-1901):基于髓磷脂染色标本的神经解剖图谱)…

GraphPad Prism与鹰谷电子实验记录本强强联合,数据兼容互通

在科研探索的征途上&#xff0c;每一次数据的记录与分析都至关重要。鹰谷很高兴地宣布&#xff0c;鹰谷电子实验记录本InELN&#xff0c;与国际知名生物数据统计分析GraphPad Prism软件&#xff0c;实现数据快速兼容互通&#xff01;使用鹰谷电子实验记录本的用户&#xff0c;将…

HarmonyOS的@State装饰器的底层实现

HarmonyOS的State装饰器的底层实现 序言准备工作实现State装饰器 序言 ArkTS是鸿蒙生态的应用开发语言。它在保持TypeScript&#xff08;简称TS&#xff09;基本语法风格的基础上&#xff0c;进一步通过规范强化静态检查和分析&#xff0c;使得在程序运行之前的开发期能检测更…

实战:深入探讨 MySQL 和 SQL Server 全文索引的使用及其弊端

在数据库中处理大量文本数据时,包含搜索(例如查找包含特定单词的文本)往往是必需的。然而,直接使用 LIKE %text% 的方式在大数据量中进行模糊查询会造成性能瓶颈。为了解决这一问题,MySQL 和 SQL Server 提供了全文索引(Full-Text Indexing)功能,可以显著加速文本数据的…

shell 100例

1、每天写一个文件 (题目要求&#xff09; 请按照这样的日期格式(xxxx-xx-xx每日生成一个文件 例如生成的文件为2017-12-20.log&#xff0c;并且把磁盘的使用情况写到到这个文件中不用考虑cron&#xff0c;仅仅写脚本即可 [核心要点] date命令用法 df命令 知识补充&#xff1…

网络管理之---3种网络模式配置

目标&#xff1a; 了解几个概念&#xff1a; 1.什么是IP&#xff1f;什么是IP地址&#xff1f; 2.什么是桥接、NAT、仅主机模式 3.端口&#xff1f; 4.什么是网络接口命名规则 5.网络管理器 IP&#xff1a;指网络之间互联的协议&#xff0c;是TCP/IP 体系中的网络协议 I…

ubuntu 下mosquitto TLS配置

1、/etc/mosquitto/mosquitto.conf文件配置 persistence true persistence_location /var/lib/mosquitto/ log_dest file /var/log/mosquitto/mosquitto.log include_dir /etc/mosquitto/conf.d listener 1883 listener 8883 0.0.0.0 password_file /etc/mosquitto/pwfile cert…

zabbix搭建钉钉告警流程

目录 &#x1f324;️zabbix实验规划 &#x1f324;️zabbix实验步骤 &#x1f4d1;1 使用钉钉添加一个自定义的机器人 ​ &#x1f4d1;2在zabbix-server上编写钉钉信息发送脚本&#xff0c;设置钉钉报警媒介 ☁️ 设置钉钉报警媒介​编辑​编辑 ☁️在添加消息模板​编辑​…

【JavaWeb】JavaWeb入门之XML详解

目录 1.XML介绍 1.1.XML概述 1.1.1.什么是XML 1.1.2.XML的作用 1.1.3.XML与HTML的比较 1.1.4.XML和properties&#xff08;属性文件&#xff09;比较 1.1.5.W3C组织 1.2.XML语法概述 1.2.1.XML文档展示 1.2.2.XML文档的组成部分 1.3.XML文档声明 1.3.1.什么是XML文…

基于Zynq FPGA对雷龙SD NAND的测试

一、SD NAND特征 1.1 SD卡简介 雷龙的SD NAND有很多型号&#xff0c;在测试中使用的是CSNP4GCR01-AMW与CSNP32GCR01-AOW。芯片是基于NAND FLASH和 SD控制器实现的SD卡。具有强大的坏块管理和纠错功能&#xff0c;并且在意外掉电的情况下同样能保证数据的安全。 其特点如下&…

<Project-23 Navigator Portal> Python flask web 网站导航应用 可编辑界面:添加图片、URL、描述、位置移动

目的&#xff1a; 浏览器的地址簿太厚&#xff0c;如下图&#xff1a; 开始&#xff0c;想给每个 Web 应用加 icon 来提高辨识度&#xff0c;发现很麻烦&#xff1a;create image, resize, 还要挑来挑去&#xff0c;重复性地添加代码。再看着这些密密麻麻的含有重复与有规则的…