Kafka核心参数与使用02

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
        new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 业务处理
    }
    // 手动提交 offset
    consumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/kafka/send/{message}")
        public void sendMessage(@PathVariable("message") String msg) {
            kafkaTemplate.send("topic1", msg);
        }
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {
        @KafkaListener(topics = {"topic1"})
        public void onMessage(ConsumerRecord<?, ?> record) {
            System.out.println("消费内容:" + record.value());
        }
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951477.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

标准应用 | 2025年网络安全服务成本度量实施参考

01 网络安全服务成本度量依据相关新变化 为了解决我国网络安全服务产业发展中面临的服务供需两方对于服务成本组成认知偏差较大、网络安全服务成本度量缺乏依据的问题&#xff0c;中国网络安全产业联盟&#xff08;CCIA&#xff09;组织北京赛西科技发展有限责任公司、北京安…

微信小程序map组件所有markers展示在视野范围内

注意&#xff1a;使用include-points属性不生效&#xff0c;要通过createMapContext实现 <template><view class"map-box"><map id"map" class"map" :markers"markers" :enable-traffic"true" :enable-poi&…

PLC实现HTTP协议JSON格式数据上报对接的参数配置说明

IGT-SER系列PLC通讯智能网关支持HTTP协议GET和POST、PUT请求模式。支持JSON格式的文件&#xff0c;也可以实现WebService的调用。 通常智能网关是HTTP协议的客户端&#xff0c;也可以同时作为HTTP的服务端。相关案例 作为客户端时支持触发、周期、混合等多种工…

微信小程序——创建滑动颜色条

在微信小程序中&#xff0c;你可以使用 slider 组件来创建一个颜色滑动条。以下是一个简单的示例&#xff0c;展示了如何实现一个颜色滑动条&#xff0c;该滑动条会根据滑动位置改变背景颜色。 步骤一&#xff1a;创建小程序项目 首先&#xff0c;使用微信开发者工具创建一个新…

Improving Language Understanding by Generative Pre-Training GPT-1详细讲解

Improving Language Understanding by Generative Pre-Training 2018.06 GPT-1 0.有监督、半监督、无监督 CV&#xff1a;ImageNet pre-trained model NLP&#xff1a;pre-trained model? 在计算机视觉中任务包含分类、检测、分割&#xff0c;任务类别数少&#xff0c;对应…

sql server cdc漏扫数据

SQL Server的CDC指的是“变更数据捕获”&#xff08;Change Data Capture&#xff09;。这是SQL Server数据库提供的一项功能&#xff0c;能够跟踪并记录对数据库表中数据所做的更改。这些更改包括插入、更新和删除操作。CDC可以捕获这些变更的详细信息&#xff0c;并使这些信息…

如何在 Ubuntu 22.04 上安装 Caddy Web 服务器教程

简介 Caddy 是一个开源的 Web 服务器&#xff0c;它支持静态和现代 Web 应用程序&#xff0c;使用预定义的配置规则&#xff0c;并为所有链接的域名自动启用 HTTPS。Caddy 使用 GO 语言编写&#xff0c;提供了用户友好的配置指令&#xff0c;使你既可以将其用作 Web 服务器&am…

《机器学习》——贝叶斯算法

贝叶斯简介 贝叶斯公式&#xff0c;又称贝叶斯定理、贝叶斯法则&#xff0c;最初是用来描述两个事件的条件概率间的关系的公式&#xff0c;后来被人们发现具有很深刻的实际意义和应用价值。该公式的实际内涵是&#xff0c;支持某项属性的事件发生得愈多&#xff0c;则该属性成…

边缘计算网关在机床设备数据采集中的应用

边缘计算网关是连接边缘设备和云端的一个中间节点&#xff0c;负责在边缘设备和云服务器之间进行数据传输和处理。它具备数据采集、数据处理、协议转换、数据存储、安全功能及远程管理等多种能力&#xff0c;是边缘计算系统中不可或缺的关键设备。 一、功能与优势 数据采集&a…

腾讯二面:MySQL的半同步是什么?不是MySQL的两阶段提交,那是什么?

前言 年后在进行腾讯二面的时候&#xff0c;写完算法的后问的第一个问题就是&#xff0c;MySQL的半同步是什么&#xff1f;我当时直接懵了&#xff0c;我以为是问的MySQL的两阶段提交的问题呢&#xff1f;结果确认了一下后不是两阶段提交&#xff0c;然后面试官看我连问的是啥都…

云计算基础,虚拟化原理

文章目录 一、虚拟化1.1 什么是虚拟化1.2 虚拟化类型 二 、存储虚拟化2.1 存储指标2.2 存储类型2.3 存储协议2.4 RAID 三、内存 i/O虚拟化3.1 内存虚拟化基本概念地址空间转换原理内存共享与隔离原理 3.2 I/O 虚拟化基本概念模拟&#xff08;Emulation&#xff09;方式半虚拟化…

【网络协议】IPv4 地址分配 - 第二部分

前言 在第 1 部分中&#xff0c;我们学习了 IPv4 地址的分配方式&#xff0c;了解了各种类型的 IPv4 地址&#xff0c;并进行了基础的子网划分&#xff08;Subnetting&#xff09;。在第 2 部分中&#xff0c;我们将继续学习子网划分&#xff0c;并引入一些新的概念。 【网络…

JAVA 使用apache poi实现EXCEL文件的输出;apache poi实现标题行的第一个字符为红色;EXCEL设置某几个字符为别的颜色

设置输出文件的列宽&#xff0c;防止文件过于丑陋 Sheet sheet workbook.createSheet(FileConstants.ERROR_FILE_SHEET_NAME); sheet.setColumnWidth(0, 40 * 256); sheet.setColumnWidth(1, 20 * 256); sheet.setColumnWidth(2, 20 * 256); sheet.setColumnWidth(3, 20 * 25…

Cursor 实战技巧:好用的提示词插件Cursor Rules

你好啊&#xff0c;见字如面。感谢阅读&#xff0c;期待我们下一次的相遇。 最近在小红书发现了有人分享这款Cursor提示词的插件&#xff0c;下面给各位分享下使用教程。简单来说Cursor Rules就是可以为每一个我们自己的项目去配置一个系统级别的提示词&#xff0c;这样在我们…

【简博士统计学习方法】第1章:3. 统计学习方法的三要素

3. 统计学习方法的三要素 3.1 监督学习的三要素 3.1.1 模型 假设空间&#xff08;Hypothesis Space&#xff09;&#xff1a;所有可能的条件概率分布或决策函数&#xff0c;用 F \mathcal{F} F表示。 若定义为决策函数的集合&#xff1a; F { f ∣ Y f ( X ) } \mathcal{F…

60.在 Vue 3 中使用 OpenLayers 绘制自由线段、自由多边形

前言 在现代 Web 开发中&#xff0c;地图功能已经成为许多应用的重要组成部分。OpenLayers 是一个强大的开源地图库&#xff0c;支持多种地图源和地图操作。结合 Vue 3 的响应式特性&#xff0c;我们可以轻松实现地图的交互功能。本文将详细介绍如何在 Vue 3 中使用 OpenLayer…

krpano 实现文字热点中的三角形和竖杆

krpano 实现文字热点中的三角形和竖杆 实现文字热点中的三角形和竖杆 一个后端写前端真的是脑阔疼 一个后端写前端真的是脑阔疼 一个后端写前端真的是脑阔疼 实现文字热点中的三角形和竖杆 上图看效果 v&#xff1a;2549789059

【算法】字符串算法技巧系列

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 引入&#xff1a;字符串相关算法技巧 1&#xff1a;字符串转数组 2&#xff1a;子字符串 3&#xff…

Linux x86_64 程序静态链接之重定位

文章目录 一、简介二、链接器2.1 简介2.2 可重定位目标模块2.3 符号解析2.4 重定位 三、重定位 demo 演示3.1 外部函数重定位3.2 static 函数重定位 四、补充参考链接 一、简介 编程的代码无非是由函数和各种变量&#xff0c;以及对这些变量的读、写所组成&#xff0c;而不管是…

【OJ刷题】同向双指针问题

这里是阿川的博客&#xff0c;祝您变得更强 ✨ 个人主页&#xff1a;在线OJ的阿川 &#x1f496;文章专栏&#xff1a;OJ刷题入门到进阶 &#x1f30f;代码仓库&#xff1a; 写在开头 现在您看到的是我的结论或想法&#xff0c;但在这背后凝结了大量的思考、经验和讨论 目录 1…