Kafka 幂等性与事务

文章目录

  • 幂等性
    • 实现机制
    • 配置使用
    • 局限性
  • 事务
    • 使用场景
    • 配置使用
    • 实现机制
    • 事务过程
      • 事务初始化
      • 事务开始
      • 事务提交
      • 事务取消
      • 事务消费

幂等性

Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。

实现机制

通过引入ProducerID和SequenceNumber来实现Broker对于每条接收的消息都会验证PID,同时会检查SeqNumber是否比Broker维护的SeqNumber值严格+1,只有符合要求的才是合法的,其他情况都会丢弃。

  • ProducerID:Producer初始化时由Broker分配,作为每个Producer会话的唯一标识
  • SequenceNumber:Producer发送的每条消息的标识(更准确地说是每一个消息批次,即ProducerBatch),从0开始单调递增。Broker根据它来判断写入的消息是否可接受。

配置使用

Producer设置

  • enable.idempotence=true:表示使用幂等性生产者。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
  • acks=all

局限性

  • 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态无法同步。

事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

使用场景

  • 对多个 Topic 、多个 Partition 的原子性的写入
  • Consumer-Transform-Producer模式下,将消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。避免重复消费

配置使用

Producer设置

  • transactional.id:事务id,类型为string,客户端自定义

Consumer设置

  • isolation.level:read_committed。事务隔离级别,默认为空。

实现机制

引入以下组件:

  • Transactional Coordinator‌:负责管理和协调事务。每个Kafka broker上都会运行一个Transactional Coordinator实例。
  • Transaction Log‌:这是一个内部Topic(__transaction_state),用于存储事务的元数据信息,包括事务的状态、参与的分区等。
  • Control Messages:由Transactional Coordinator‌写入topic的一种特殊消息,但对于Consumer来说不可见。是用来让Broker告知consumer拉取的消息是否已被原子性提交。
  • TransactionId:事务ID,类型为String字符串,由Producer客户端自定义。提供稳定不变的ID意义在于可以在异常后重启从断点进行恢复。
  • Epoch:单调递增的事务Id标识,可以保证具有相同TransactionId的Producer,旧的无法写入。
  • ProducerID、SequenceNumber:标记生产者、消息的唯一标识

事务过程

事务初始化

所有的事务操作都需要Transactional Coordinator‌管理和协调
1.获取Transactional Coordinator‌地址
Producer发送携带Transactionid的请求到任意一个Broker,Broker对获取到Transactionid做hashcode后对topic(__transaction_state)默认分区(50)取模,所得分区主副本所在的Broker作为TransactionalCoordinator‌
2.获取ProducerID和Epoch
Producer对TransactionalCoordinator‌发送请求,此时会分配ProducerId及Epoch,并将信息持久化。最后向Producer返回ProducerId+Epoch。之后的每次请求都会携带ProducerId和Epoch。
(__transaction_state中信息格式为key-value,key为Transactionid,value包含ProducerID、Epoch、事务和分区信息等)

事务开始

3.消息写入
Producer开始事务写入,先将本地事务状态更改为IN_TRANSACTION,然后发送消息之前,Producer会将topic-partition相关的信息发送给TransactionalCoordinator‌,由它完成持久化(更新__transaction_state)。之后Producer开始对相关topic-partition发送消息

事务提交

4.Producer触发事务提交
Producer首先发送请求给TransactionalCoordinator‌,由它更新__transaction_state将事务状态更改为PrepareCommit,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages(会持续重试,直到成功)给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteCommit。

事务取消

5.Producer或Coordinator触发事务取消
事物取消可以由Producer发起取消或者TransactionalCoordinator‌检测到事务超时而取消,此时均会更新__transaction_state更改为PrepareAbort,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteAbort。

取消的事务会记录在.txnindex文件中,主要包含以下信息:currentVersion、producerId、firstOffset(当前事务的开始offset)、lastOffset(当前事务的结束offset)、lastStableOffset(存储时的LSO)

事务消费

正常消费时
读隔离级别为 read-committed, 在内部会使用存储在topic-partition中的Control messgae,来过滤掉没有提交的消息。(回滚的消息也没有删除,只是在读数据时过滤该数据)

对于Consumer-Transform-Producer下,会通过groupId算出__consumer_offsets topic中对应的partition,然后加该partition的信息也加入到Transaction Log‌中,最终在统一取消或提交。同样也会将Control message写入__consumer_offsets对应的分区。

  • 需要将enable.auto.commit设置为false
  • 使用producer.sendOffsetsToTransaction()来提交offset

在这里插入图片描述

参考
https://z.itpub.net/article/detail/F86DD78AECAC4DEC92468DEFFEB4ED0D
https://www.cnblogs.com/hongdada/p/16945086.html
学习笔记之Kafka幂等和事务_transaction.state.log.replication.factor-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/945613.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LVS 负载均衡原理 | 配置示例

注:本文为 “ LVS 负载均衡原理 | 配置” 相关文章合辑。 部分内容已过时,可以看看原理实现。 未整理去重。 使用 LVS 实现负载均衡原理及安装配置详解 posted on 2017-02-12 14:35 肖邦 linux 负载均衡集群是 load balance 集群的简写,翻…

CannotRetrieveUpdates alert in disconnected OCP 4 cluster解决

环境: Red Hat OpenShift Container Platform (RHOCP) 4 问题: Cluster Version Operator 不断发送警报,表示在受限网络/断开连接的 OCP 4 集群中无法接收更新。 在隔离的 OpenShift 4 集群中看到 CannotRetrieveUpdates 警报: …

详解从输入url到页面渲染

当你在浏览器中输入一个 URL 并按下回车键,浏览器会经历一系列步骤来加载并渲染页面。这些步骤包括 DNS 解析、缓存处理、建立连接、发送请求、接收响应、解析 HTML、构建 DOM 树和 CSSOM 树、执行 JavaScript、布局和绘制等。以下是这些步骤的详细解释,…

Linux(Centos 7.6)目录结构详解

Linux(Centos 7.6)是一个操作系统,其核心设计理念是将一切资源抽象为文件,即一切皆文件。比如系统中的硬件设备硬盘、网络接口等都被视为文件。Windows系统一般是分为C、D、E盘。而Linux(Centos 7.6)是以斜线"/"作为文件系统的开始目录&#x…

【蓝桥杯选拔赛真题85】python摆放箱子 第十五届青少年组蓝桥杯python选拔赛真题 算法思维真题解析

目录 python摆放箱子 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 七、 推荐资料 1、蓝桥杯比赛 2、考级资料 3、其它资料 python摆放箱子 第十五届蓝桥杯青少年组python比赛选拔赛真题详细解析 一…

数据分析思维(六):分析方法——相关分析方法

数据分析并非只是简单的数据分析工具三板斧——Excel、SQL、Python,更重要的是数据分析思维。没有数据分析思维和业务知识,就算拿到一堆数据,也不知道如何下手。 推荐书本《数据分析思维——分析方法和业务知识》,本文内容就是提取…

小程序中引入echarts(保姆级教程)

hello hello~ ,这里是 code袁~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 🦁作者简介:一名喜欢分享和记录学习的在校大学生…

【SQLi_Labs】Basic Challenges

什么是人生?人生就是永不休止的奋斗! Less-1 尝试添加’注入,发现报错 这里我们就可以直接发现报错的地方,直接将后面注释,然后使用 1’ order by 3%23 //得到列数为3 //这里用-1是为了查询一个不存在的id,好让第一…

基于JAVA+SpringBoot+Vue的校园二手书交易平台

基于JAVASpringBootVue的校园二手书交易平台 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末附源码下载链接🍅 …

快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

Elasticsearch8.17.0在mac上的安装 Kibana8.17.0在mac上的安装 Elasticsearch检索方案之一:使用fromsize实现分页 1、滚动查询的使用场景 滚动查询区别于上一篇文章介绍的使用from、size分页检索,最大的特点是,它能够检索超过10000条外的…

【C++】深入理解 break 和 continue 语句

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯break 和 continue 介绍**break** 的作用**continue** 的作用注意事项 💯break 示例代码示例**执行结果****解析过程** 💯continue 示例代码示例&am…

高效使用AI完成编程项目任务的指南:从需求分析到功能实现

随着人工智能工具的普及,即便是零编程基础或基础薄弱的用户,也可以借助AI完成许多技术任务。然而,要高效地使用AI完成编程任务,关键在于如何清晰表达需求,并逐步引导AI实现目标。 在本文中,我们将通过开发…

算法每日双题精讲 —— 滑动窗口(水果成篮,找到字符串中所有字母异位词)

🌟快来参与讨论💬,点赞👍、收藏⭐、分享📤,共创活力社区。 🌟 别再犹豫了!快来订阅我们的算法每日双题精讲专栏,一起踏上算法学习的精彩之旅吧!💪…

基于Qt事件机制中的定时器事件的闹钟设计

目标 代码 pro文件 QT core gui texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on …

后台管理系统DEMO

该项目后端使用SpringBootMyBatisPlusJWT,前端使用Vue3Vite2TSPiniaAxiosElementPlus等简单技术栈,实现了一个简约精致版的后台管理系统,包含非常基础的rbac权限功能,可以增删改查角色、用户、权限,角色添加权限、添加…

数据结构之线性表之链表(附加一个考研题)

链表的定义 链表的结构: 单链表-初始化 代码实现: 单链表-头插法 代码实现: 这里我给大家分析一下 我们每创建一个新的节点都要插在头节点的后面,我们一定要注意顺序 一定要先让新节点指向头节点指向的下一个节点,…

Python爬取城市天气信息,并存储到csv文件中

1.爬取的网址为:天气网 (weather.com.cn) 2.需要建立Weather.txt文件,并在里面加入如下形式的字段: 101120701济宁 101010100北京 3.代码运行后,在命令行输入Weather.txt文件中添加过的城市,如:济宁。 …

工厂+策略模式之最佳实践(疾病报卡维护模块API设计)

目录 💻业务场景 🔧应用技术 ⚙概要流程 ❗开发注意 服务类上标注了 自定义注解 却无法直接利用getDeclaredAnnotation 获取 *Spring代理机制 代理机制的工作原理 代理的工作机制 代理的使用场景 已获取EmrXXXServiceImpl 的Class,…

【智行安全】基于Synaptics SL1680的AI疲劳驾驶检测方案

随著车载技术的快速进步,驾驶安全越来越受到重视,而疲劳驾驶是造成交通事故的重要原因之一。传统的驾驶监控技术因精度不足或反应迟缓,无法满足实时监测需求。因此,结合人工智能技术的疲劳驾驶检测系统成为行业新方向,…

Go-知识 注释

Go-知识 注释 行注释块注释包注释结构体&接口注释函数&方法注释废弃注释文档 在 go 语言中注释有两种,行注释和块注释 行注释 使用双斜线 // 开始,一般后面紧跟一个空格。行注释是Go语言中最常见的注释形式,在标准包中,…