【Kafka系列 07】Kafka 如何保证消息不丢失

一、Kafka 消息不丢失的边界

一直以来,很多人对于 Kafka 丢失消息这件事情都有着自己的理解,因而也就有着自己的解决之道。在讨论具体的应对方法之前,我觉得我们首先要明确,在 Kafka 的世界里什么才算是消息丢失,或者说 Kafka 在什么情况下能保证消息不丢失。这点非常关键,因为很多时候我们容易混淆责任的边界,如果搞不清楚事情由谁负责,自然也就不知道由谁来出解决方案了。

那 Kafka 到底在什么情况下才能保证消息不丢失呢?

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

第一个核心要素是 已提交的消息。什么是已提交的消息?当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。

那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是 有限度的持久化保证,也就是说 Kafka 不可能保证在任何情况下都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署 Kafka Broker 服务器了。

现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

二、消息丢失案例

案例1:生产者程序丢失数据

Producer 程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。我来描述一个场景:你写了一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存,于是大骂:“Kafka 真烂,消息发送居然都能丢失,而且还不告诉我?!”如果你有过这样的经历,那么请先消消气,我们来分析下可能的原因。

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。

这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。调用 producer.send(msg) 就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个 API 发送消息。

如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?其实原因有很多,例如网络抖动,导致消息压根就没有发送到 Broker 端;或者消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等。这么来看,让 Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka 不认为消息是已提交的,因此也就没有 Kafka 丢失消息这一说了。

不过,就算不是 Kafka 的“锅”,我们也要解决这个问题吧。实际上,解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

你可能会问,发送失败真的没可能是由 Broker 端的问题造成的吗?当然可能!如果你所有的 Broker 都宕机了,那么无论 Producer 端怎么重试都会失败的,此时你要做的是赶快处理 Broker 端的问题。但之前说的核心论据在这里依然是成立的:Kafka 依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。

案例2:消费者程序丢失数据

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。下面这张图来自于官网,它清晰地展示了 Consumer 端的位移数据。

 比如对于 Consumer A 而言,它当前的位移值就是 9;Consumer B 的位移值是 11。

正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第 90 页,我先将书签放到第 100 页上,之后开始读书。当阅读到第 95 页时,我临时有事中止了阅读。那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第 96~99 页的内容,即这些消息就丢失了。

同理,Kafka 中 Consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。

当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。在专栏后面的内容中,我会跟你分享如何应对重复消费的问题。

除了上面所说的场景,其实还存在一种比较隐蔽的消息丢失场景。

我们依然以看书为例。假设你花钱从网上租借了一本共有 10 章内容的电子书,该电子书的有效阅读时间是 1 天,过期后该电子书就无法打开,但如果在 1 天之内你完成阅读就退还租金。

为了加快阅读速度,你把书中的 10 个章节分别委托给你的 10 个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这 10 个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这 10 个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。

对于 Kafka 而言,这就好比 Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。

这里的关键在于 Consumer 自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。

这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移

三、最佳实践

看完这两个案例之后,总结一下 Kafka 无消息丢失的最佳实践配置,每一个其实都能对应上面提到的问题。

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数,用来设置分区的副本数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

四、小结

今天,我们讨论了 Kafka 无消息丢失的方方面面。我们先从什么是消息丢失开始说起,明确了 Kafka 持久化保证的责任边界,随后以这个规则为标尺衡量了一些常见的数据丢失场景,最后通过分析这些场景,我给出了 Kafka 无消息丢失的“最佳实践”。总结起来,我希望你今天能有两个收获:

  • 明确 Kafka 持久化保证的含义和限定条件。
  • 熟练配置 Kafka 无消息丢失参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/440529.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

IQmath库移植至ST系列单片机实战教程1

使用注意事项: 1.注意IQmath库使用的数据范围,如果使用IQ24格式,其范围不能超过-128~128; 如果输入的时候不注意其使用范围,会导致数据溢出,出现一直为0的情况。 如定义 _iq24 a 0; a _IQ24(380)其结果…

史上最细,接口自动化测试用例设计编写总结,一篇带你打通...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 说到自动化测试&a…

[MYSQL]当数据库被攻破如何重新恢复

前情提要:mysql数据库默认密码、默认端口没有改,也没做安全防护,导致被攻破被索要比特币。 那我们自然是不能给他们的,下面罗列我的补救方法。 密码修改相关 第一步大家自然都会想到先去修改密码: mysqladmin -u roo…

【C语言】操作符相关知识点

移位操作符 << 左移操作符 >>右移操作符 左移操作符 移位规则&#xff1a; 左边抛弃、右边补0 右移操作符 移位规则&#xff1a; 首先右移运算分两种&#xff1a; 1.逻辑移位 左边用0填充&#xff0c;右边丢弃 2.算术移位 左边用原该值的符号位填充&#xff0c;…

ES分布式搜索-IK分词器

ES分词器-IK 1、为什么使用分词器&#xff1f; es在创建倒排索引时需要对文档分词&#xff1b;在搜索时&#xff0c;需要对用户输入内容分词。但默认的分词规则对中文处理并不友好。 我们在kibana的DevTools中测试&#xff1a; GET /_analyze {"analyzer": "…

最简k8s部署(AWS Load Balancer Controller使用)

问题 我需要在k8s集群里面部署springboot服务&#xff0c;通过k8s ingress访问集群内部的springboot服务&#xff0c;应该怎么做&#xff1f; 这里假设已经准备好k8s集群&#xff0c;而且也准备好springboot服务的运行镜像了。这里我们将精力放在k8s服务编排上面。 一图胜千言…

Supplementary Influence Maximization Problem in Social Networks

本论文发表于 IEEE TRANSACTIONS ON COMPUTATIONAL SOCIAL SYSTEMS, VOL. 11, NO. 1, FEBRUARY 2024 Abstract 由于在病毒式营销中的重要应用&#xff0c;影响力最大化&#xff08;IM&#xff09;已成为一个经过充分研究的问题。它的目的是找到一小部分初始用户&#xff0c;以…

智能问数,让数据对话变得如此简单

——用自然语言点亮数据智慧&#xff0c;让深度分析触手可及&#xff0c;让每个人都拥有私人数据分析师。 想象一下&#xff0c;曾经的数据查询&#xff0c;意味着面对着密密麻麻的电子表格&#xff0c;手动筛选、匹配与解读&#xff0c;耗费大量的时间与精力&#xff0c;或者…

【今日面经】24/3/8 又是Java后端面经啊啊啊啊啊啊啊

目录 1.osi七层模型&#xff1f;数据链路层是干什么的&#xff1f;2.tcp三次握手过程&#xff0c;tcp报文头部的结构&#xff1f;里面都有什么&#xff1f;3.讲讲超时重传和快重传&#xff0c;怎么等待的超时重传&#xff08;Timeout Retransmission&#xff09;快速重传&#…

高清数学公式视频素材、科学公式和方程式视频素材下载

适用于科普、解说的自媒体视频剪辑素材&#xff0c;黑色背景数学、科学公式和方程式视频素材下载。 视频编码&#xff1a;H.264 | 分辨率&#xff1a;3840x2160 (4K) | 无需插件 | 文件大小&#xff1a;16.12MB 来自PR视频素材&#xff0c;下载地址&#xff1a;https://prmuban…

Redis持久化机制之RDB内存快照

1、引言 我们经常在数据库层上加一层缓存&#xff08;如Redis&#xff09;&#xff0c;来保证数据的访问效率。 这样性能确实也有了大幅度的提升&#xff0c;因为从内存中取数远比从磁盘中快的多&#xff0c;但是本身Redis也是一层服务&#xff0c;也存在宕机、故障的可能性。…

蓝色经典免费wordpress模板主题

蓝色经典配色的免费wordpress建站主题&#xff0c;万能的wordpress建站主题。 https://www.wpniu.com/themes/24.html

【好书推荐-第十期】《AI绘画教程:Midjourney使用方法与技巧从入门到精通》

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究生。公众号&#xff1a;洲与AI。 &#x1f388; 本文专栏&#xff1a;本文收录…

O2OA(翱途)开发平台如何在流程表单中使用基于Vue的ElementUI组件?

本文主要介绍如何在O2OA中进行审批流程表单或者工作流表单设计&#xff0c;O2OA主要采用拖拽可视化开发的方式完成流程表单的设计和配置&#xff0c;不需要过多的代码编写&#xff0c;业务人员可以直接进行修改操作。 在流程表单设计界面&#xff0c;可以在左边的工具栏找到Ele…

Take-home questions——L3

Match the spatial domain image to the Fourier magnitude image 1—D 2—B 3—A 4—E 5—C

STM32F4串口波特率相关时钟

在main中调用的 Stm32_Clock_Init(336, 8, 2, 7); /* 设置时钟,168Mhz *///8*336/8/2168 时钟源,PLL寄存器配置函数: HAL_StatusTypeDef HAL_RCC_OscConfig(RCC_OscInitTypeDef *RCC_OscInitStruct) 系统时钟,总线寄存器配置,及HCLK时钟计算函数: HAL_StatusTyp…

用于回归的概率模型

机器学习中的回归方法&#xff1a; 机器学习中的概率模型 机器学习&#xff5c;总结了11种非线性回归模型&#xff08;理论代码可视化&#xff09; 高斯过程回归&#xff1a; Gaussian Processes for Machine Learning GPML——Datasets and Code Gaussian Processes 学…

力扣-数组题

1. 两数之和 找出map中是否有target-nums[i]&#xff0c; class Solution { public:vector<int> twoSum(vector<int>& nums, int target) {unordered_map<int, int> hash;for(int i 0 ;i < nums.size(); i){if(hash.find(target - nums[i]) ! hash…

Jmeter事务控制器实战

在性能测试工作中&#xff0c;我们往往只测试业务功能相关主要接口的数据请求和返回。然而实际上用户在使用web应用时&#xff0c;可能会加载诸多资源&#xff1a;htmldom、cssdom、javaScript、ajax请求、图片等。 从打开一个页面到界面渲染完成需要一定的加载时间&#xff0…

每日一题——1636.按照频率将数组升序排序

方法一 个人方法 用数组的键值对形式保存每个数字和他出现的次数&#xff0c;将对象的键值对转为数组&#xff0c;对数组进行自定义sort()排序&#xff0c;优先使用出现频次排序&#xff0c;如果出现频次一样就用大小就行排序。 排序完后按照出现频次拼接成字符串再转为数组 …