Kafka之消费者客户端

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/903003.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python 中的 @ 符号是如何工作的!

写在前面 Python 中的 符号是一个非常强大而又灵活的功能,它代表一个叫做"装饰器"的"语法糖"。在本文中,我们将一步步地了解它的工作原理,并通过示例代码加深理解。 基本概念 在 Python 中, 符号通常用于…

2024年9月电子学会青少年软件编程Python等级考试(三级)真题试卷

2024年9月青少年软件编程Python等级考试(三级)真题试卷 选择题 第 1 题 单选题 以下python表达式的值为True的是?( ) A.all( ,1,2,3) B.any([]) C.bool(abc) D.divmod(6,0) 第 2 题 单选题 下列python代码的…

python项目实战——多协程下载美女图片

协程 文章目录 协程协程的优劣势什么是IO密集型任务特点示例与 CPU 密集型任务的对比处理 I/O 密集型任务的方式总结 创建并使用协程asyncio模块 创建协程函数运行协程函数asyncio.run(main())aiohttp模块调用aiohttp模块步骤 aiofiles————协程异步函数遇到的问题一 await …

【Linux系统编程】——探索Shell:工作原理与运行机制以及Linux的权限管理

文章目录 1. 什么是 Shell?2. Shell 的工作原理3. Shell 的运行机制4. Shell 的应用场景5. Shell 脚本的优缺点Linux权限的概念Linux权限管理文件权限值的表示方法文件访问权限的相关设置方法 目录的权限粘滞位关于权限的总结 1. 什么是 Shell? Shell 是…

Linux下的文件系统(进程与文件)

windows下的文件构成 .内容 .属性 所以, 文件的构成为内容和属性。 文件 内容 属性 推此即彼, linux下的文件构成也是如此。 liunx下,文件 文件的内核数据结构(属性)内容 深入理解c语言中的文件操作 在c语言中如…

【笔记】LLM位置编码之标准位置编码

标准位置编码 起源原理证明:对于任何固定的偏移量 k k k, P E p o s k PE_{posk} PEposk​可以表示为 P E p o s PE_{pos} PEpos​的线性函数。计算 P E p o s k 与 P E p o s PE_{posk} 与PE_{pos} PEposk​与PEpos​的内积结论 通俗理解缺点 起源 由…

论文笔记:LaDe: The First Comprehensive Last-mile Delivery Dataset from Industry

2023 KDD 1 intro 1.1 背景 随着城市化进程的加快和电子商务的发展,最后一公里配送已成为一个关键的研究领域 最后一公里配送,如图1所示,是指连接配送中心和客户的包裹运输过程,包括包裹的取件和配送除了对客户满意度至关重要外…

诺基亚的裁员风暴

大家好,我是鸭鸭! 不知道 80、90 后还记得童年神机诺基亚吗? 虽然诺基亚早就把自家手机业务出售,但依然是一代通信巨头。 鸭鸭最近看到新闻,诺基亚已经在大中华区裁减了近 2000 名员工 。 根据 2023 年底&#xff0…

YOLOv8实战野生动物识别

本文采用YOLOv8作为核心算法框架,结合PyQt5构建用户界面,使用Python3进行开发。YOLOv8以其高效的实时检测能力,在多个目标检测任务中展现出卓越性能。本研究针对野生动物数据集进行训练和优化,该数据集包含丰富的野生动物图像样本…

9.Linux按键驱动-工作队列

1.思路 1.1在gpio结构体中定义工作队列 1.2 在probe函数中初始化工作队列 1.3.在中断服务程序中调度工作队列 1.4工作队列处理函数&#xff1a; 2.编程 程序&#xff1a; #include <linux/module.h> #include <linux/fs.h> #include <linux/errno.h> #…

C语言程序设计:现代设计方法习题笔记《chapter6》下篇

第七题 square3.c代码 #include<stdio.h>int main() { int i, n, odd, square;printf("This program prints a table of squares.\n");printf("Enter number of entries in table: ");scanf_s("%d", &n);i 1;odd 3;for (square 1;…

数据库课程 第一周

1.数据库的安装与卸载 1.1数据库的卸载&#xff1a; &#xff08;1&#xff09;第一种卸载方式&#xff1a;删除文件目录 &#xff08;2&#xff09;第二种卸载方式&#xff1a;在控制面版中卸载&#xff0c;然后在c盘里找到mysql文件删除 1. 2.在隐藏目录programdata里 1.2…

新王Claude 3.5的6大应用场景

Anthropic AI深夜发布了备受期待的Claude 3.5系列更新&#xff0c;包括了全新升级的Claude 3.5 Sonnet和首发的Claude 3.5 Haiku。 Claude 3.5 Sonnet能够理解细微的指令和上下文&#xff0c;识别并纠正自身错误&#xff0c;还能从复杂数据中生成深入的分析和洞察。 结合最先进…

从零入门扣子Bot开发

从零入门扣子Bot开发 工作流简单介绍问题思考工作流实例 图像流简单介绍瘦脸图像流的设计创建图像流设计流程 总结参考链接 工作流简单介绍 工作流起源于生产组织和办公自动化领域&#xff0c;是指在计算机应用环境下&#xff0c;对业务过程的部分或整体进行自动化处理。它通过…

文理学院数据库应用技术实验报告0

文理学院数据库应用技术实验报告0 实验内容 打开cmd,利用MySQL命令连接MySQL服务器。 mysql -u root -p查看当前MySQL服务实例使用的字符集(character)。 SHOW VARIABLES LIKE character_set_server;查看当前MySQL服务实例支持的字符序(collation)。 SHOW VARIABLES LIKE c…

Unity编辑器 连接不到SteamVR问题记录

问题表现&#xff1a;之前正常的工程&#xff0c;某天打开后运行&#xff0c;在SteamVR未打开时&#xff0c;Unity工程运行后无法调用起来Steam VR&#xff0c;无任何反应&#xff0c;但用其他软件则可以调用起来SteamVR&#xff0c;并且运行正常&#xff0c;在重装了XR的一些插…

【网络面试篇】从输⼊ URL 到⻚⾯展示到底发⽣了什么?

目录 一、大致流程 1. 流程概述 2. 全流程描述 二、流程解析 1. URL 解析 2. DNS 查询 3. TCP 连接 4. 渲染页面 一、大致流程 1. 流程概述 &#xff08;1&#xff09;URL 解析 &#xff08;2&#xff09;DNS 查询 &#xff08;3&#xff09;TCP 连接 &#xff08…

「C/C++」C/C++标准库之#include<cstdlib>通用工具库

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

消息会话—发送消息自动滚动到最底部

背景 在项目开发中&#xff0c;实现用户友好的输入交互是提升用户体验的关键之一。例如&#xff0c;在消息会话页面中&#xff0c;为了确保用户在发送新消息后页面能自动滚动到最底部&#xff0c;从而始终保持最新消息的可见性&#xff0c;需要实现自动滚动功能。这不仅提升了…

【教程】如何查看IEEE会员证书Membership Card

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 1、先打开以下网站&#xff0c;并登录你的账号&#xff1a; https://www.ieee.org/profile/myprofile/myprofile.html 2、如果你没有缴费注册会员&…