【kafka系列】消费者

目录

获取消息

1. 消费者获取消息的流程逻辑分析

阶段一:消费者初始化

阶段二:分区分配与重平衡(Rebalance)

阶段三:消息拉取与处理

阶段四:偏移量提交

核心设计思想

2. 流程

关键点总结

常见参数

一、核心必填参数

二、消费者组与重平衡参数

三、消息拉取与处理参数

四、偏移量(Offset)提交参数

五、错误处理与容错参数

六、高级配置


获取消息

1. 消费者获取消息的流程逻辑分析

Kafka 消费者通过 消费者组(Consumer Group) 协作消费消息,核心流程分为 初始化、分区分配、消息拉取、偏移量提交 四个阶段:


阶段一:消费者初始化
  1. 订阅 Topic
    • 消费者通过 consumer.subscribe() 订阅一个或多个 Topic。
    • 若消费者属于同一消费者组,组内消费者会均分 Topic 的分区
  1. 加入消费者组
    • 消费者启动时向 Broker 发送 JoinGroup 请求,加入消费者组。
    • 若消费者是组内第一个成员,会被选举为 Leader 消费者,负责分区分配。

阶段二:分区分配与重平衡(Rebalance)
  1. 分区分配策略
    • Leader 消费者根据策略(如 RangeAssignorRoundRobinAssignor)分配分区。
    • 分配结果通过 SyncGroup 请求同步给所有消费者。
  1. 重平衡触发条件
    • 消费者加入或离开组。
    • Topic 的分区数量变化。
    • 消费者心跳超时(默认 session.timeout.ms=45s)。

阶段三:消息拉取与处理
  1. 拉取消息
    • 消费者向分区的 Leader Broker 发送 FetchRequest,从当前偏移量(Offset)拉取消息。
    • 关键配置:
      • max.poll.records:单次拉取最大消息数(默认 500)。
      • fetch.min.bytes:最小拉取数据量(默认 1B,优先吞吐量时可调大)。
  1. 处理消息
    • 用户通过 ConsumerRecords 处理消息,需在 max.poll.interval.ms(默认 5分钟)内完成,否则触发重平衡。

阶段四:偏移量提交
  1. 提交 Offset
    • 自动提交:由消费者线程周期性提交(enable.auto.commit=true,默认 5秒)。
    • 手动提交:用户调用 commitSync()commitAsync() 精确控制。
    • Offset 存储在 Kafka 内部 Topic __consumer_offsets 中。

核心设计思想
  • 负载均衡:通过消费者组实现分区并行消费。
  • 容错性:心跳机制检测消费者存活,重平衡保障分区重新分配。
  • 至少一次语义:Offset 提交后移,确保消息至少被消费一次。

2. 流程


关键点总结

  1. 重平衡机制:保障消费者组动态扩展和容错。
  2. Offset 管理:通过提交 Offset 实现消费进度持久化。
  3. 消息拉取优化:通过 fetch.min.bytesmax.poll.records 平衡吞吐与延迟。
  4. 超时控制session.timeout.msmax.poll.interval.ms 防止消费者僵死

常见参数

一、核心必填参数

参数名

默认值

说明

bootstrap.servers

Kafka 集群地址列表(逗号分隔,如 host1:9092,host2:9092

)。

group.id

消费者组 ID(同一组内的消费者共享分区负载)。

key.deserializer

Key 的反序列化类(如 org.apache.kafka.common.serialization.StringDeserializer

)。

value.deserializer

Value 的反序列化类(同上)。


二、消费者组与重平衡参数

参数名

默认值

说明

session.timeout.ms

45000

(45秒)

消费者与 Broker 的心跳超时时间,超时触发重平衡。

heartbeat.interval.ms

3000

(3秒)

消费者发送心跳的间隔时间(需小于 session.timeout.ms

的 1/3)。

max.poll.interval.ms

300000

(5分钟)

两次 poll()

调用的最大间隔时间,超时触发重平衡。

partition.assignment.strategy

RangeAssignor

分区分配策略(如 RoundRobinAssignor

CooperativeStickyAssignor

)。


三、消息拉取与处理参数

参数名

默认值

说明

fetch.min.bytes

1

(1字节)

单次拉取的最小数据量(Broker 等待足够数据后返回,提升吞吐量)。

fetch.max.bytes

52428800

(50MB)

单次拉取的最大数据量(需小于 Broker 的 message.max.bytes

)。

max.poll.records

500

单次 poll()

返回的最大消息数(避免内存溢出)。

max.partition.fetch.bytes

1048576

(1MB)

单分区单次拉取的最大数据量。


四、偏移量(Offset)提交参数

参数名

默认值

说明

enable.auto.commit

true

是否自动提交 Offset(建议设为 false

,手动提交确保精确控制)。

auto.commit.interval.ms

5000

(5秒)

自动提交 Offset 的时间间隔(enable.auto.commit=true

时生效)。

auto.offset.reset

latest

无初始 Offset 时的策略:<br>- earliest

:从最早消息开始。<br>- latest

:从最新消息开始。


五、错误处理与容错参数

参数名

默认值

说明

isolation.level

read_uncommitted

事务消息隔离级别:<br>- read_committed

:仅读取已提交的事务消息。


六、高级配置

参数名

默认值

说明

client.id

客户端标识(用于监控和日志)。

connections.max.idle.ms

540000

(9分钟)

空闲连接超时时间(Broker 主动关闭超时连接)。

request.timeout.ms

30000

(30秒)

消费者等待 Broker 响应的超时时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/971714.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

游戏引擎学习第105天

仓库:https://gitee.com/mrxiao_com/2d_game_2 查看当前进度 今天的工作重点是继续进行渲染系统的清理。昨天已经完成了一次渲染清理&#xff0c;现在还有一些内容需要继续处理。首先&#xff0c;已经解决了坐标系统的问题&#xff0c;其中世界坐标基本上是正确的&#xff0c…

重新求职刷题力扣DAY15

1.[226. 翻转二叉树](https://leetcode.cn/problems/symmetric-tree/) 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a;root [4,2,7,1,3,6,9] 输出&#xff1a;[4,7,2,9,6,3,1]示例 2&#xff1a…

OpenGL ES -> 投影变换矩阵完美解决绘制GLSurfaceView绘制图形拉伸问题

GLSurfaceView绘制图形拉伸问题 假如在XML文件中声明GLSurfaceView的宽高为 android:layout_width"match_parent"android:layout_height"match_parent GLSurfaceView绘制的图形在Open GL ES坐标系中&#xff0c;而Open GL ES坐标系会根据GLSurfaceView的宽高将…

Java并发编程8--线程

1.什么是线程&#xff1f; 现代操作系统调度的最小单元是线程&#xff0c;也叫轻量级进程&#xff08;Light Weight Process&#xff09;&#xff0c;在一个进程里可以创建多个线程&#xff0c;这些线程都拥有各自的计数器、堆栈和局部变量等属性&#xff0c;并且能够访问共享的…

java八股文-mysql

1. 索引 1.1 什么是索引 索引(index)是帮助Mysql高效获取数据的数据结构(有序).提高数据的检索效率,降低数据库的IO成本(不需要全表扫描).通过索引列对数据进行排序,降低数据排序成本,降低了CPU的消耗. 1.2 mysql索引使用的B树? 1. 没有使用二叉树&#xff0c;最坏情况o&…

开启蓝耘之旅:DeepSeek R1 模型在智算平台的起步教程

----------------------------------------------------------我的个人主页-------------------- 动动你的手指----------------------------------------点赞&#x1f44d; 收藏❤--------------------------------------------------------------- 引言 在深度学习的广袤领…

【设计模式】【行为型模式】访问者模式(Visitor)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f44d; 欢迎点赞、收藏、关注&#xff0c;跟上我的更新节奏 &#x1f3b5; 当你的天空突…

将pyspark中的UDF提升6倍

本文亮点 调用jar中的UDF&#xff0c;减少python与JVM的交互&#xff0c;简单banchmark下对于54亿条数据集进行udf计算&#xff0c;从3小时的执行时间缩短至16分钟。 牺牲UDF部分的开发时间&#xff0c;尽量提高性能。 以接近纯python的开发成本&#xff0c;获得逼近纯scala的性…

告别第三方云存储!用File Browser在Windows上自建云盘随时随地访问

文章目录 前言1.下载安装File Browser2.启动访问File Browser3.安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4.固定公网地址访问 前言 无论是个人用户还是企业团队&#xff0c;都希望能够有一个高效、安全的解决方案来…

vue2老版本 npm install 安装失败_安装卡主

vue2老版本 npm install 安装失败_安装卡主 特别说明&#xff1a;vue2老版本安装慢、运行慢&#xff0c;建议升级vue3element plus vite 解决方案1&#xff1a; 第一步、修改npm 镜像为国内镜像 使用淘宝镜像&#xff1a; npm config set registry https://registry.npmmir…

Qwen2-VL 的重大省级,Qwen 发布新旗舰视觉语言模型 Qwen2.5-VL

Qwen2.5-VL 是 Qwen 的新旗舰视觉语言模型&#xff0c;也是上一代 Qwen2-VL 的重大飞跃。 Qwen2.5-VL主要特点 视觉理解事物&#xff1a;Qwen2.5-VL不仅能够熟练识别花、鸟、鱼、昆虫等常见物体&#xff0c;而且还能够分析图像中的文本、图表、图标、图形和布局。 代理性&…

[matlab优化算法-18期】基于遗传算法的模糊PID控制优化

遗传算法优化模糊PID控制器&#xff1a;原理与实践 第一节&#xff1a;背景介绍 在现代控制系统中&#xff0c;PID控制器因其结构简单、参数调整方便而被广泛应用。然而&#xff0c;传统PID控制器的参数整定依赖于经验或试错法&#xff0c;难以适应复杂系统的动态变化。模糊控…

Kotlin Lambda

Kotlin Lambda 在探索Kotlin Lambda之前&#xff0c;我们先回顾下Java中的Lambda表达式&#xff0c;Java 的 Lambda 表达式是 Java 8 引入的一项强大的功能&#xff0c;它使得函数式编程风格的代码更加简洁和易于理解。Lambda 表达式允许你以一种更简洁的方式表示实现接口&…

实现pytorch注意力机制-one demo

主要组成部分&#xff1a; 1. 定义注意力层&#xff1a; 定义一个Attention_Layer类&#xff0c;接受两个参数&#xff1a;hidden_dim&#xff08;隐藏层维度&#xff09;和is_bi_rnn&#xff08;是否是双向RNN&#xff09;。 2. 定义前向传播&#xff1a; 定义了注意力层的…

【Prometheus】prometheus结合domain_exporter实现域名监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

基于Java+Springboot+MySQL企业公司网站系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育、辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

SQL复习

SQL复习 MySQL SQL介绍 SQL SQL的全拼是什么&#xff1f; SQL全拼&#xff1a;Structured Query Language&#xff0c;也叫结构化查询语言。 SQL92和SQL99有什么区别呢&#xff1f; SQL92和SQL99分别代表了92年和99年颁布的SQL标准。 在 SQL92 中采用&#xff08;&#xff…

从入门到精通:Postman 实用指南

Postman 是一款超棒的 API 开发工具&#xff0c;能用来测试、调试和管理 API&#xff0c;大大提升开发效率。下面就给大家详细讲讲它的安装、使用方法&#xff0c;再分享些实用技巧。 一、安装 Postman 你能在 Postman 官网&#xff08;https://www.postman.com &#xff09;下…

零基础学QT、C++(一)安装QT

目录 如何快速学习QT、C呢&#xff1f; 一、编译器、项目构建工具 1、编译器&#xff08;介绍2款&#xff09; 2、项目构建工具 二、安装QT 1、下载QT安装包 2、运行安装包 3、运行QT creator 4、导入开源项目 总结 闲谈 如何快速学习QT、C呢&#xff1f; 那就是项目驱动法&…

【Zookeeper如何实现分布式锁?】

Zookeeper如何实现分布式锁? 一、ZooKeeper分布式锁的实现原理二、ZooKeeper分布式锁的实现流程三、示例代码四、总结一、ZooKeeper分布式锁的实现原理 ZooKeeper是一个开源的分布式协调服务,它提供了一个分布式文件系统的接口,可以用来存储和管理分布式系统的配置信息。 …