防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot对接Kafka
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关


在这里插入图片描述
在上一章内容中,我们解析了Kafka在读写层面上的原理,介绍了很多Kafka在读出与写入时的各种设计,初步理解了Kafka大吞吐量的原因,本期我们将带领大家从另一个角度,即从可靠性方面来分析Kafka的机制与原理

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、可靠性的考量角度

其实我们在《RabbitMQ 能保证消息可靠性吗》 一文中,我们已经阐述了对于MQ类主键,可靠性应该从哪些角度去判断。总结下来其实就是:

  • 消息不会意外丢失
  • 消息不会重复传递

那么本次我们也将从这两方面来看看 Kafka 都做了哪些工作来提升可靠性。

二、分区副本

1. 分区副本的含义

在这里插入图片描述

我们之前了解到:一个Kafka的主题被分为了若干个分区,每个分区都是一个有序的消息队列。如上图,我们就把topicA分成了1\2\3\4 四个分区(实线圆柱),但是我们还看到了更多的“虚线圆柱”,这些其实是 1\2\3\4 分区的副本,在Kafka中,每个分区都有多个副本。副本是指一个分区在其他Broker上的备份副本的作用是提高消息的可靠性和容错性,一旦某个Broker宕机,其他Broker上的副本就可以接替宕机的Broker继续提供服务

我们可以看到不同的Broker之间。互相存储着对方的副本分区,比如Partition1 存在Broker1 上,但Partition1 的副本可以放在Broker2 ,同理 Partition2 的副本也可以放在Broker1 ,如果我们专注于一个分区,那么其情况如下:

在这里插入图片描述

为了区分,我们一般把“实线圆柱”的分区称为 Leader,“虚线圆柱”的分区称为FollowerLeader副本负责读写请求,而Follower副本则只负责复制Leader副本的数据

2. AR 与 ISR机制

上面我们讲了副本分区的Follower,所有副本的合集统称为AR(Assigned Replicas),但是不同副本和Leader到底一致性有多高呢?会不会出现有的副本同步得及时,有的副本因为网络原因同步得很慢呢? 这里又引出了一个重要的概念叫做ISR(In-Sync Replica)机制。

ISR,是一个机制,也代表着一个同步合集,顾名思义,它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的

在这里插入图片描述

那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?

其实,其主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

在这里插入图片描述

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

三、ACKS设置

如果说备份机制是保障消息不会在Kafka服务器丢失,那么消息丢失的另一个重要原因就是消息在发送中丢失。

这种场景下,我们就需要利用消息确认机制了,此时我们也会利用到ISR,比如我们在发送消息时,可以通过设置ACK的值,来决定同步的情况:

  • acks=0,如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,不能保证服务器已经收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。

  • acks=1,这意味着领导者会将记录写入其本地日志,但不会等待所有追随者的完全确认。在这种情况下,如果领导者在确认记录后立即失败,但在追随者复制之前,记录将丢失。

  • acks=all,这意味着leader将等待所有ISR来确认记录。这保证了只要至少有一个副本处于ISR内,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。

当我们设置acks=all 或者 -1 的时候, broker 的 min.insync.replicas 参数起作用(一个典型的场景是,topic 有 3个 副本,客户端设置 acks = -1,服务端设置 topic level 的 min.insync.replicas = 2,这样至少有 2 个副本写入后,broker 才会返回;但是如果 topic 只有 1 个副本,而 acks = all,min.insync.replicas = 2,就会报 NOT_ENOUGH_REPLICAS 错误);

在这里插入图片描述

四、重试机制

发消息不可能万无一失,当Kafka在发送或接收消息时发生错误时,可以通过重试来解决这些问题,提高系统的可靠性。这点不用多说,那么在生产端我们可以配置哪些重试的参数呢?

  • retries:重试次数,默认为0,表示不启用重试机制,但建议开启

  • retry.backoff.ms:每次重试的时间间隔,默认为100ms。

  • retry.buffer.records:每个分区的缓冲区中可以存储的最大重试消息数。

在这里插入图片描述

在实际应用中,一些小故障是可以通过重试来解决的,但是重试次数过多也会增加网络通信的负担,甚至会导致消息堵塞。所以建议将其设置为1或2,这样可以在第一次发送失败后进行重试,从而提高消息的可靠性。但是,如果网络状况很差,或者需要处理重要的消息,可以适当增加retries的值。

五、幂等性设计

如果你仔细观察上面的ACKS的设置,相信你会发现,这并不完美:如果你将ACKs设置为-1(all),可以保证Producer到Server之间不会丢失数据,即At Least Once(最少一次),但不能保证数据不重复。而如果你将ACKs级别设置为0(不需要等写log),则可以保证生产者每条消息只会被发送一次,即At Most Once(最多一次),但不能保证不会发生数据丢失。

难道就没有一种既不会丢失,也不会重复的方案吗?其实是有的,这个时候我们可以使用 ‘ack = all’ + 幂等性来解决,而开启幂等性 ,即设置 enable.idempotence = true

请注意,启用幂等性要求
max.in.flight.requests.per.connection小于或等于5(为任何允许的值保留消息顺序),
retries重试次数大于0,
acks必须为“all”。
PS:我发现不少文章把开启幂等性参数写成 enable.idompotence,不知是笔误还是什么原因

开启幂等性的意义在于生产者将确保在流中每条消息正好只会发送一次,那么它是怎么实现的呢?Kafka生产者的幂等性算法主要包括以下三个方面的实现:

  1. 生产者编号: Producer在初始化的时候(只有初始化的时候会随机生成PID)会被分配一个PID(producerid),
  2. 分区编号:不同的分区有自己的paritionid(即分区号),
  3. 序列号:发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息),这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

六、消费偏移量

前面说了很多,都是生产者与Kafka的,其实对于消费者,Kakfa也有同步提交偏移量的设计。在 Kafka 的消费者 API 中,同步提交偏移量可以通过设置 enable.auto.commit 参数为 false ,然后在消费者应用中手动控制提交偏移量的时机来实现。

具体来说,可以使用 commitSync() 方法提交偏移量,该方法会一直阻塞直到提交偏移量成功,或抛出异常。示例代码如下:

Properties props = new Properties();
// 设置 Kafka 集群地址等配置信息
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 禁止自动提交偏移量
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
        }
        consumer.commitSync(); // 手动提交偏移量
    }
} finally {
    consumer.close();
}

通过这种方式,可以确保消息在被消费者处理后再提交偏移量,从而避免了消息丢失或者重复处理的问题。同时,手动提交偏移量也提高了消费者的可控性,可以根据自身情况自由地设置提交偏移量的时机。

七、可靠性不足分析

尽管我们在上面讲了一些,Kafka为了实现可靠性而做的设计,一般情况下,这种程度的可靠性足以应付了。但在实际应用过程中,Kafka仍然可能会面临以下几个可靠性问题:

  1. 生产者重复发送:尽管开启了幂等性,但不要忘记幂等性设置仅表示生产者对同一个分区的消息的写入是有序的、幂等的,如果producer挂了,重启之后,producer会重新生成producerid,此时幂等性校验就不准了。
  2. 消费者重复消费:如果消费者在提交偏移量前宕机了,将导致Kafka认为该消息没有被消费,在消费者重启后,又会消费该消息,导致重复消费。

这些情况,Kafka自身已经无法解决。我们的解决策略只能契合在我们的业务处理上,目前一个通用的方案是全局性ID+生产/消费两端校验:
在这里插入图片描述

我们可以先根据业务对消息进行去重,然后使用诸如雪花算法等方案为每一条去重后的消息生产全局性的唯一ID,并在发送和消费之前在redis或其他较快的存储件中进行标记,这样当发生重复发送/消费时,就能及时发现了。此时你可以选择放弃本次发送/消费,也可以将该异常情况上报,由人工来进行检查与处理

总结

本次我们对Kafka的可靠性进行了分析和优化实践。一般来说,我们可以通过设置acks、开启幂等性,消费端手动提交偏移量等方式来保证可靠性,也足以应付大部分场景。而且实际应用过程中,还可以配合全局ID等手段完善可靠场景。当然,架构服务于业务需求,所以最终还是需要结合具体的业务需求和场景来选择合适的部署方式和配置参数,在后面我们还会继续进行Kafka的深入解析,如果你对此有兴趣,可以直接订阅本 kafka 专栏

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/104831.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】进程优先级|进程并发概念|在vim中批量化注释

文章目录 前言tips——如何在vim中批量化注释进程更深度理解一、什么是进程优先级二、 为什么要有优先级三、Linux怎么设置优先级查看进程优先级的命令PRI and NI用top命令更改已存在进程的nice&#xff1a; 如何根据优先级开展调度呢&#xff1f;五、其他概念并发&#xff08;…

css四种导入方式

1 行内样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <h1 style"color: blue">我是标题</h1> </body> </htm…

【Objective-C】浅析Block及其捕获机制

目录 Block的基本使用Block的声明Block的实现Block的调用 Block作为形参使用Block作为属性使用给Block起别名Block的copy Block的捕获机制auto类型的局部变量__block浅析static类型的局部变量全局变量 其他问题 Block的基本使用 什么是Block&#xff1f; Block &#xff08;块…

超市商品管理系统 毕业设计 JAVA+Vue+SpringBoot+MySQL

项目下载地址 目录 项目下载地址 一、摘要1.1 简介1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 超市区域模块2.3 超市货架模块2.4 商品类型模块2.5 商品档案模块 三、系统设计3.1 用例图3.2 时序图3.3 类图3.4 E-R图 四、系统实现4.1 登录4.2 注册4.3 主页4.4 超市区域管理4…

【嵌入式开发学习】__单片机中容易造成内存泄露的几个痛点

目录 前言 一、程序运行 二、什么是内存泄露&#xff1f; 三、内存泄露的严重后果&#xff01; 四、如何定位到泄露的要点&#xff1f; 五、三大痛点 1. 访问越界 2. 栈 3. 堆 六、泄露常见的场景 1. 重新赋值 2. 首先释放父块 3. 返回值的不正确处理 七、常见的…

Azure - 机器学习:创建机器学习所需资源,配置工作区

目录 一、Azure机器学习工作区与计算实例简要介绍工作区计算实例 二、创建工作区1. 登录到 Azure 机器学习工作室2. 选择“创建工作区”3. 提供以下信息来配置新工作区&#xff1a;4. 选择“创建”以创建工作区 三、创建计算实例四、工作室实战4.1 工作室快速导览4.2 从示例笔记…

常用linux命令 linux_cmd_sheet

查看文件大小 ls -al 显示每个文件的kb大小 查看系统日志 dmesg -T | tail 在 top 命令中&#xff0c;RES 和 VIRT&#xff08;或者 total-vm&#xff09;是用来表示进程内存使用的两个不同指标&#xff0c;它们之间有以下区别&#xff1a; RES&#xff08;Resident Set Size…

TWAS模型数据*.wgt.RDat查看及导入R

TWAS模型数据*.wgt.RDat查看及导入R 1 数据导入R load(“./SLC7A8.wgt.RDat”) as.data.frame(snps) -> snp snp %>% head()V1 V2 V3 V4 V5 V6 1 12 rs10747759 0 55683634 C T 2 12 rs2293409 0 55684180 A C 3 12 rs1048103 0 55684405 C …

Pytorch代码入门学习之分类任务(二):定义数据集

一、导包 import torch import torchvision import torchvision.transforms as transforms 二、下载数据集 2.1 代码展示 # 定义数据加载进来后的初始化操作&#xff1a; transform transforms.Compose([# 张量转换&#xff1a;transforms.ToTensor(),# 归一化操作&#x…

FLStudio2024汉化破解版在哪可以下载?

水果音乐制作软件FLStudio是一款功能强大的音乐创作软件,全名:Fruity Loops Studio。水果音乐制作软件FLStudio内含教程、软件、素材,是一个完整的软件音乐制作环境或数字音频工作站... FL Studio21简称FL 21&#xff0c;全称 Fruity Loops Studio 21&#xff0c;因此国人习惯叫…

图像数据噪音种类以及Python生成对应噪音

前言 当涉及到图像处理和计算机视觉任务时&#xff0c;噪音是一个不可忽视的因素。噪音可以由多种因素引起&#xff0c;如传感器误差、通信干扰、环境光线变化等。这些噪音会导致图像质量下降&#xff0c;从而影响到后续的图像分析和处理过程。因此&#xff0c;对于从图像中获…

0027Java程序设计-房屋出租管理系统

文章目录 摘 要目 录系统设计开发环境 摘 要 随着我国市场经济的快速发展和人们生活水平的不断提高&#xff0c;简单的房屋出租服务已经不能满足人们的需求。如何利用先进的管理手段&#xff0c;提高房屋出租的管理水平&#xff0c;是当今社会所面临的一个重要课题。 本文采用…

I/O 模型学习笔记【全面理解BIO/NIO/AIO】

文章目录 I/O 模型什么是 I/O 模型Java支持3种I/O模型BIO&#xff08;Blocking I/O&#xff09;NIO&#xff08;Non-blocking I/O&#xff09;AIO&#xff08;Asynchronous I/O&#xff09; BIO、NIO、AIO适用场景分析 java BIOJava BIO 基本介绍Java BIO 编程流程一个栗子实现…

【2024秋招】小米中间件后端开发一面2023-9-13-base武汉

1 自我介绍 2 快手实习 2.1 讲讲你写的curd启动器&#xff0c;做了哪些工作呢 答&#xff1a; 2.2 网上也有一些开源的curd代码生成器&#xff0c;你为什么需要自研呢&#xff08;重要&#xff09; 答&#xff1a; &#xff08;1&#xff09;这个必须得自研&#xff0c;因…

『第三章』雨燕栖息地:Swift 开发环境

在本篇博文中,您将学到如下内容: 1. Swift 开发平台2. Swift 集成开发环境 Xcode&#xff1f;3. 原型试验场&#xff1a;Playground4. 另一种尝试&#xff1a;iPad 上的 Swift Playgrounds5. Swift 交互实验室&#xff1a;Swift REPL总结 咫尺春三月&#xff0c;寻常百姓家。为…

【RTOS学习】软件定时器 | 中断处理

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《RTOS学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 软件定时器 | 中断处理 &#x1f3c0;软件定时器⚽守护任务守护任务的调度 ⚽使用软件定时器的函数…

多级缓存入门

文章目录 什么是多级缓存JVM进程缓存环境准备安装MySQL导入Demo工程导入商品查询页面 初识Caffeine Lua语法初识Lua第一个lua程序变量和循环Lua的数据类型声明变量循环 条件控制、函数函数条件控制 多级缓存安装OpenRestyOpenResty快速入门反向代理流程OpenResty监听请求编写it…

【linux】麒麟v10安装Redis哨兵集群(ARM架构)

安装redis单示例的请看&#xff1a;麒麟v10安装Redis&#xff08;ARM架构&#xff09; 安装服务器 ​Hostname​IP addressmaster,sentinel192.168.0.1slave1,sentinel192.168.0.2slave2,sentinel192.168.0.3 下载安装包 &#xff08;三台都操作&#xff09; wget https://re…

施工进度难以把控,项目失控怎么办?

在施工项目中&#xff0c;施工进度的控制是非常重要的&#xff0c;它直接关系到项目是否能够按时完成以及项目成本的控制。然而&#xff0c;有时候由于各种原因&#xff0c;施工进度可能会难以把控&#xff0c;导致项目失控。那么&#xff0c;当项目失控时&#xff0c;我们应该…

Ansible上通过roles简化playbook演示介绍

目录 一.roles介绍 1.作用 2.role的目录结构 3.role和tasks的执行优先级顺序 二.自定义一个httpd的角色 1.完整目录结构展示 2.主要的各个目录配置 &#xff08;1&#xff09;vars目录和templates目录 &#xff08;2&#xff09;tasks目录和handlers目录 &#xff08…