kafka学习笔记--如何保证生产者数据可靠、不重复、有序

本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)


PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白

文章目录

  • 数据可靠性
  • 数据不重复
  • 数据有序性

数据可靠性

首先数据的可靠性指的是:

  • 消息不会意外丢失
  • 消息不会重复传递

那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
在这里插入图片描述
1: 同理,leader收到了,还没应答时挂了,也会丢数据
在这里插入图片描述
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
在这里插入图片描述
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

这就引出了ISR队列的概念了

ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的


那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?


主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据不重复

上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
在这里插入图片描述
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

kafka默认启用数据幂等性,即设置 enable.idempotence = true

在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条

PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID

Partition:又叫分区编号,即这条消息要发往的分区的paritionid

SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)

这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。

但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?

这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释

我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送

注意:开启事务,必须开启幂等性。

请添加图片描述

kafka使用事务,有5个API

// 初始化事务
void initializeTransactions ();

// 开启事务
void beginTransaction () throws ProducerFencedException;

// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction () throws ProducerFencedException;

// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;

举个例子:

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Test {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息
        properties.put("bootstrap.servers", "hadoop102:9092");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("transactional.id", "transaction_id_0");

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 初始化事务
        kafkaProducer.initTransactions();

        // 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 4. 调用 send 方法,发送消息
            // 发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
            }

            // int i = 1 / 0;

            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
}

数据有序性

如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列

如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的

我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/233882.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制

🗝️改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制 代码ACmixBiFormerBAMBlock加入方法各种yaml加入结构本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更有效加入YOLOv8中的yaml结构,读者可以获…

【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点&#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分&#xff0c;比如术语、架构、编程模型、编程指南、基本的…

Linux系统编程:进程间通信总结

管道 在Linux中&#xff0c;管道是一种进程间通信方式&#xff0c;它允许一个进程&#xff08;写入端&#xff09;将其输出直接连接到另一个进程&#xff08;读取端&#xff09;的输入。从本质上说&#xff0c;管道也是一种文件&#xff0c;但它又和一般的文件有所不同。 具体…

SpringMvc入坑系列(一)----maven插件启动tomcat

springboot傻瓜式教程用久了&#xff0c;回过来研究下SSM的工作流程&#xff0c;当然从Spring MVC开始&#xff0c;从傻瓜式入门处理请求和页面交互&#xff0c;再到后面深入源码分析。 本人写了一年多的后端和半年多的前端了。用的都是springbioot和vue&#xff0c;源码一直来…

【视频笔记】古人智慧与修行

古人的智慧 相由心生、老子悟道、佛祖成佛 多一些思考&#xff0c;多一些精神修炼。 除非我们今天能够产生与人类科技发展相并行的精神变革&#xff0c;否则永远可能也无法跳脱出历史的轮回。 视频来源

爬虫学习-基础库的使用(urllib库)

目录 一、urllib库介绍 二、request模块使用 &#xff08;1&#xff09;urlopen ①data参数 ②timeout参数 &#xff08;2&#xff09;request &#xff08;3&#xff09;高级用法 ①验证 ②代理 ③Cookie 三、处理异常 ①URLError ②HTTPError 四、解析链接 ①urlparse ②…

【C语言】结构体内存对齐

目录 引入结构体 结构的声明 创建和初始化 内部元素的使用&#xff1b; 特殊声明&#xff1a; 结构体在内存中的对齐 练习&#xff1a; 引入结构体 C语言有各种数据类型&#xff0c;我们已经对一些数据类型很熟悉&#xff1a; 整型&#xff08;int&#xff09;- 存储整…

108. 将有序数组转换为二叉搜索树

给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 高度平衡 二叉搜索树。 高度平衡 二叉树是一棵满足「每个节点的左右两个子树的高度差的绝对值不超过 1 」的二叉树。 示例 1&#xff1a; 输入&#xff1a;nums [-10,-3,0,5,9] 输…

06-React组件 Redux React-Redux

React组件化&#xff08;以Ant-Design为例&#xff09; 组件化编程&#xff0c;只需要去安装好对应的组件&#xff0c;然后通过各式各样的组件引入&#xff0c;实现快速开发 我们这里学习的是 Ant-design &#xff08;应该是这样&#xff09;&#xff0c;它有很多的组件供我们…

kali linux使用Proxmark3

其实kali linux下已经集成了Proxmark3命令&#xff0c;但是由于Proxmark3是开源设备&#xff0c;有时候系统默认安装的版本并不能很好的使用&#xff0c;因此需要手动编译最新的版本。 step 1 准备Proxmark3编译环境&#xff0c;因为kali linux比较激进&#xff0c;很多老旧的…

【EI会议征稿中】第三届信号处理与通信安全国际学术会议(ICSPCS 2024)

第三届信号处理与通信安全国际学术会议&#xff08;ICSPCS 2024&#xff09; 2024 3rd International Conference on Signal Processing and Communication Security 信号处理和通信安全是现代信息技术应用的重要领域&#xff0c;近年来这两个领域的研究相互交叉促进&#xf…

基于YOLOv7算法的高精度实时海上船只目标检测识别系统(PyTorch+Pyside6+YOLOv7)

摘要&#xff1a;基于YOLOv7算法的高精度实时海上船只目标检测系统可用于日常生活中检测与定位海上船只目标&#xff0c;此系统可完成对输入图片、视频、文件夹以及摄像头方式的目标检测与识别&#xff0c;同时本系统还支持检测结果可视化与导出。本系统采用YOLOv7目标检测算法…

配置BFD多跳检测示例

BFD简介 定义 双向转发检测BFD&#xff08;Bidirectional Forwarding Detection&#xff09;是一种全网统一的检测机制&#xff0c;用于快速检测、监控网络中链路或者IP路由的转发连通状况。 目的 为了减小设备故障对业务的影响&#xff0c;提高网络的可靠性&#xff0c;网…

【亲测有效】支持横竖屏 微信小程序video禁止进度条拖动,微信小程序遮罩进度条,

背景&#xff1a;部分课程禁止客户拖动视频进度条直至播放结束 红色是遮罩区域遮罩区域 实际遮罩效果&#xff08;有一个很浅的阴影区域&#xff09; 实现代码 .wxml文件 <video enable-progress-gesture"false" ><cover-view class"cover">…

JAVA全栈开发 day18MySql03

一、复习 为什么要用数据库数据库好处数据库的发展史​ 层次模型​ 网状模型​ 关系模型&#xff08;二维表专门存储数据&#xff0c; 表与表的关联&#xff09;​ 表与表的关系&#xff1a; 1对1 &#xff0c;1对多&#xff0c;多对多​ 非关系模型关系模…

Linux常见压缩指令小结

为什么需要压缩技术 我们都知道文件是以byte作为单位的&#xff0c;如果我们的文件仅仅在低位占一个1 0000 0001这种情况我们完全可以压缩一下&#xff0c;将高位的0全部抹掉即可。 如上所说是一种压缩技术&#xff0c;还有一种就是将1111(此处省略96个)一共100个1&#xff0…

Unity中Shader黑白阀值后处理效果

文章目录 前言一、我们先来PS看一下黑白阀值的效果二、使用step(a,b)函数实现效果三、实现脚本控制黑白阀值1、在Shader属性面板定义控制阀值变量2、把step的a改为_Value3、在后处理脚本设置公共成员变量,并且设置范围为&#xff08;0&#xff0c;1&#xff09;4、在Graphics.B…

vulnhub靶机Prime-2

下载地址&#xff1a;Prime (2021): 2 ~ VulnHub 主机发现 目标145 端口扫描 端口服务扫描 漏洞扫描 先去看一下80 扫目录 发现点不一样的 Wordpress&#xff08;记住还是要用api&#xff09; 好洞出来了看看利用方法 192.168.21.145/wp/wp-content/plugins/gracemedia-media…

【刷题篇】动态规划(六)

文章目录 1、最大子数组和2、环形子数组的最大和3、乘积最大子数组4、乘积为正数的最长子数组长度5、 等差数列划分6、最长湍流子数组 1、最大子数组和 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&…

Proteus仿真--基于12864的自制硬件汉字库的应用

本文介绍基于12864的自制硬件汉字库的应用设计&#xff08;完整仿真源文件及代码见文末链接&#xff09; 其中12864LCD中显示的汉字是存储在2块AT24C1024芯片中 仿真图如下 仿真运行视频 Proteus仿真--基于12864的自制硬件字库的应用 附完整Proteus仿真资料代码资料 链接&am…