kafka精准一次、事务、幂等性

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;

import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

public class TransactionConsumeTransformProduce {

    public static final String brokerList = "localhost:9092";

    public static Properties getConsumerProps(){
        Properties props =new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        return props;
    }

    public static Properties getProducerProps(){
        Properties props =new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");

        return props;
    }

    public static void main(String[] args) {
        //初始化生产者和消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());
        consumer.subscribe(Collections.singletonList("topic-source"));
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());

        //初始化事务
        producer.initTransactions();

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            if(!records.isEmpty()){
                HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                //开启事务
                producer.beginTransaction();

                try {
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.println("获取到了topic-source发送过来的数据"+record.value());
                            System.out.println("do some ");

                            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());
                            producer.send(producerRecord);
                        }
                        // 获取最近一次的消费位移
                        long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                        offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                    }
                    //提交消费位移
                    producer.sendOffsetsToTransaction(offsets,"groupId");
                    //提交事务
                    producer.commitTransaction();
                } catch (ProducerFencedException e) {
                    System.out.println("异常了");
                    producer.abortTransaction();
                }
            }
        }
    }


}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/188348.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机中由于找不到vcruntime140.dll无法继续执行代码无法打开软件怎么解决分享

关于如何解决vcruntime140.dll无法继续执行代码的6个教程。在这个科技日新月异的时代&#xff0c;电脑已经是我们日常和工作中必不可少的电子产品&#xff0c;然后我们在使用过程中经常会遇到不一样的问题&#xff0c;比如vcruntime140.dll文件丢失&#xff0c;那么vcruntime14…

selenium的基础语法

&#x1f4d1;打牌 &#xff1a; da pai ge的个人主页 &#x1f324;️个人专栏 &#xff1a; da pai ge的博客专栏 ☁️山水速疾来去易&#xff0c;襄樊镇固永难开 ☁️定位页面的元素 参数:抽象类By里…

实验题【网关设置+VRRP+OSPF】(H3C模拟器)

嘿&#xff0c;这里是目录&#xff01; ⭐ H3C模拟器资源链接1. 实验示意图2. 要求和考核目标3. 当前配置3.1 PC1、PC2、PC3、PC4和PC5配置3.2 SW配置3.2.1 SW2配置3.2.2 SW3配置3.2.3 SW4配置3.2.4 SW1配置 3.2. R配置3.2.1 R1配置3.2.2 R2配置 ⭐ H3C模拟器资源链接 H3C网络…

Cesium-terrain-builder编译入坑详解

本以为编译cesium-terrian-tools编译应该没那么难&#xff0c;不想问题重重&#xff0c;不想后人重蹈覆辙&#xff0c;也记录下点点滴滴。 目前网上存在的cesium代码版本主要有两个分支&#xff1a; 原始网站【不能生成layer文件&#xff0c;且经久不更新&#xff0c;使用gdal…

计算机应用基础_错题集_PPT演示文稿_操作题_计算机多媒体技术操作题_文字处理操作题---网络教育统考工作笔记007

PPT演示文稿操作题 提示:PPT部分操作题 将第2~第4张幻灯片背景效果设为渐变预置的“雨后初晴”效果(2)设置幻灯片放映方式

【小沐学写作】免费在线AI辅助写作汇总

文章目录 1、简介2、文涌Effidit&#xff08;腾讯&#xff09;2.1 工具简介2.2 工具功能2.3 工具体验 3、PPT小助手&#xff08;officeplus&#xff09;3.1 工具简介3.2 使用费用3.3 工具体验 4、DeepL Write&#xff08;仅英文&#xff09;4.1 工具简介4.2 工具体验 5、天工AI…

【负载均衡】这些内容你需要知道下

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。 &#x1f60a; 座右铭&#xff1a;不…

Golang并发模型:Goroutine 与 Channel 初探

文章目录 goroutinegoexit() channel缓冲closerangeselect goroutine goroutine 是 Go 语言中的一种轻量级线程&#xff08;lightweight thread&#xff09;&#xff0c;由 Go 运行时环境管理。与传统的线程相比&#xff0c;goroutine 的创建和销毁的开销很小&#xff0c;可以…

Python基于jieba+wordcloud实现文本分词、词频统计、条形图绘制及不同主题的词云图绘制

目录 序言&#xff1a;第三方库及所需材料函数模块介绍分词词频统计条形图绘制词云绘制主函数 效果预览全部代码 序言&#xff1a;第三方库及所需材料 编程语言&#xff1a;Python3.9。 编程环境&#xff1a;Anaconda3&#xff0c;Spyder5。 使用到的主要第三方库&#xff1a;…

【Leetcode】【实现循环队列】【数据结构】

代码实现&#xff1a; typedef struct {int front;int back;int k;int* a;} MyCircularQueue;bool myCircularQueueIsEmpty(MyCircularQueue* obj) {return obj->frontobj->back; }bool myCircularQueueIsFull(MyCircularQueue* obj) {return (obj->back1)%(obj->…

位图和布隆过滤器

目录 一. 位图 1.题目&#xff1a; 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数是否在这40亿个数中&#xff1f; 2.解析题目&#xff1a; 3.位图 4.代码以及测试 5.其他题目 二.布隆过滤器 1.介绍 2.实现 …

Vue服务端渲染——同构渲染

Vue.js 可以用于构建客户端应用程序&#xff0c;组件的代码在浏览器中运行&#xff0c;并输出 DOM 元素。同时&#xff0c;Vue.js 还可以在 Node.js 环境中运行&#xff0c;它可以将同样的组件渲染为字符串并发送给浏览器。这实际上描述了 Vue.js 的两种渲染方式&#xff0c;即…

【云原生】什么是 Kubernetes ?

什么是 Kubernetes &#xff1f; Kubernetes 是一个开源容器编排平台&#xff0c;管理着一系列的 主机 或者 服务器&#xff0c;它们被称作是 节点&#xff08;Node&#xff09;。 每一个节点运行了若干个相互独立的 Pod。 Pod 是 Kubernetes 中可以部署的 最小执行单元&#x…

电脑技巧:U盘运用小技巧,提升U盘运用寿命

目录 1、注意清洁&#xff0c;防止污染 2、别随意插拔 3、文件多时分段写入 4、U盘传输数据中切记拔掉U盘 5、建议不要长期将U盘插在电脑上 6、杜绝别频繁将U盘格式化 7、U盘中毒怎么办 U盘是大家日常办公经常用得到的便携式文件储存工具&#xff0c;因为其小巧便携、方…

5.3每日一题(不确定正负号的级数敛散性:和一个正项级数比较判定)

比较判别法和比较判别法的极限形式是对正项级数而言的&#xff0c;若一个级数和p级数比较&#xff0c;结果>0&#xff0c;则同敛散&#xff1b;若结果<0&#xff0c;则结果乘以-1 结果又同敛散了&#xff1b;所以只要比值不等于0&#xff0c;则同敛散&#xff1b; 所以当…

鸿蒙(HarmonyOS)应用开发——生命周期、渲染控制、状态管理装饰器

生命周期 任何程序都是有一定的生命周期的。生命周期是记录从产生到销毁的过程&#xff1b;如果熟悉前端vue.js的话&#xff0c;就可以很好的理解生命周期。 自定义组件生命周期 ArkTS中&#xff0c;自定义组件提供了两个生命周期函数&#xff1a;aboutToAppear() 和aboutTo…

代码随想录算法训练营第四十七天|198. 打家劫舍、213. 打家劫舍II、337. 打家劫舍III

LeetCode 198. 打家劫舍 题目链接&#xff1a;198. 打家劫舍 - 力扣&#xff08;LeetCode&#xff09; 第一次打家劫舍&#xff0c;来个简单一些的&#xff0c;无非就是偷了当前这家偷不了下一家&#xff0c;因此dp[n]代表&#xff0c;偷前n家的时候所能偷到的最高金额&#x…

记一次RocketMQ线上broker内存持续升高问题排查

RocketMQ 版本 5.1.0 jdk版本 1.8 JVM启动参数 -Xms46g -Xmx46g -XX:MetaspaceSize1259m -XX:MaxMetaspaceSize2517m -XX:UseG1GC -XX:G1HeapRegionSize16m -XX:G1ReservePercent25 -XX:InitiatingHeapOccupancyPercent30 -XX:SoftRefLRUPolicyMSPerMB0 -verbose:gc -Xlog…

【Docker】Docker与Kubernetes:区别与优势对比

前言 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。   kubernetes&#xff0c;简称K8s&a…

Nacos身份绕过漏洞复现(QVD-2023-6271)

Nacos身份绕过漏洞复现&#xff08;QVD-2023-6271&#xff09; 环境配置 该漏洞主要用了win10_JAVA的环境&#xff0c;参考网上已有的复现文章&#xff0c;使用jdk-11.0.2_windows-x64_bin.exe 由于2.2.0之后的nacos已将本漏洞修复&#xff0c;所以本次复现使用2.2.0的包 下…