【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07

  • 1. 数据重复
    • 1.1 数据传递语义
    • 1.2 幂等性
      • 1.2.1 如何开启幂等性
      • 1.2.2 同一个消息,多个分区都会存在吗?
    • 1.3 事务
      • 1.3.1 Kafka 事务原理
      • 1.3.2 Kafka事务的作用和意义
        • 作用
        • 具体应用场景
  • 2. 数据有序
  • 3. 数据乱序

1. 数据重复

1.1 数据传递语义

  • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  • 最多一次(At Most Once)= ACK级别设置为0

  • 总结:
    At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    At Most Once可以保证数据不重复,但是不能保证数据不丢失。

  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

1.2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID(Producer Id)是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

当幂等性Producer开启时,Kafka通过以下机制来保证消息的幂等性:

  1. Producer ID(PID)和Sequence Number:
    每个幂等性Producer在初始化时都会分配一个唯一的Producer ID(PID)。
    每条消息在发送时会被分配一个递增的Sequence Number(序列号)。
    Kafka Broker通过PID和Sequence Number来判断消息是否重复。
  2. 去重机制:
    当Broker收到一条消息时,会检查消息的PID和Sequence Number。如果消息的PID和Sequence Number已经存在,Broker会认为这是一个重复的消息,并且不会再次写入。
    这种机制只在单个分区内有效。如果消息发送到不同的分区,Kafka无法保证幂等性。

在这里插入图片描述

1.2.1 如何开启幂等性

开启方法:

  1. 二次开发代码中添加 “props.put(“enable.idempotence”,true)”。
  2. 客户端配置文件中添加 “enable.idempotence = true”。
// 初始化配置,开启事务特性
Properties props = new Properties();
props.put("enable.idempotence", true);
props.put("transactional.id", "transaction1");
...

KafkaProducer producer = new KafkaProducer<String, String>(props);

1.2.2 同一个消息,多个分区都会存在吗?

在Kafka中,同一个消息在多个分区中一般不会存在。Kafka的设计原则之一是消息在分区间是分布的,而不是复制的。以下是一些关键点:

Kafka消息分区

  1. 分区(Partition):
    每个Kafka主题(Topic)可以有多个分区(Partitions),消息在这些分区之间分布。每个消息会被发送到一个特定的分区,而不是所有分区。
    分区可以提高并行处理能力和扩展性,因为不同的分区可以由不同的消费者并行处理。

  2. 消息键(Message Key):
    当你向Kafka发送消息时,可以指定一个键(Key)。Kafka使用这个键来决定消息应该被写入哪个分区。相同键的消息会被写入同一个分区,从而保证了消息的顺序性。
    如果没有指定键,Kafka会使用轮询(Round-Robin)或者其他算法来将消息分配到不同的分区。

  3. 副本(Replica):
    虽然同一个消息不会被写入多个分区,但Kafka有一个副本机制(Replication),用于提高数据的可靠性和容错性。每个分区有一个主副本(Leader)和多个从副本(Follower),这些副本会在不同的Broker上保存相同的数据。
    当Producer发送消息到一个分区的主副本时,主副本会将消息复制到从副本中,以保证数据的高可用性。

1.3 事务

1.3.1 Kafka 事务原理

在这里插入图片描述

Kafka 的事务一共有如下5个API

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerTranactions {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
		
		// 初始化事务
        kafkaProducer.initTransactions();
		// 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }
			
			// 模拟失败
            int i = 1 / 0;
			
			// 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
        	// 放弃事务
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

1.3.2 Kafka事务的作用和意义

作用
  1. 保证消息的原子性:
    事务可以保证一组消息的写入要么全部成功,要么全部失败。对于需要在多个分区或多个主题上写入数据的场景,事务能够确保数据的原子性。

  2. 避免数据丢失和重复:
    通过事务机制,Kafka可以避免消息在网络或系统故障时出现丢失或重复的情况。事务保证了每条消息的唯一性和可靠性。

  3. 支持跨分区和跨主题的操作:
    事务支持跨多个分区和多个主题的原子操作,使得Kafka在处理复杂数据流时更加灵活和可靠。

  4. 简化一致性处理:
    使用事务,开发者可以更简单地实现分布式系统中的数据一致性,而不需要手动处理分布式事务协调和一致性检查。

  5. 支持幂等性:
    事务机制基于幂等性,确保每条消息在分区内唯一,不会因重试操作导致重复消息。

具体应用场景
  1. 金融交易:
    在金融系统中,事务可以确保交易数据的完整性和一致性,避免资金损失和数据错乱。
  2. 订单处理:
    电商平台中的订单处理需要保证多个步骤(如库存检查、支付处理、订单确认)的原子性,事务可以确保订单处理的可靠性。
  3. 日志聚合:
    在日志收集和处理系统中,事务可以保证多条相关日志的完整性,避免丢失或重复。
  4. 数据同步:
    在多数据中心或多系统的数据同步中,事务可以确保数据的同步操作原子性,避免数据不一致。

2. 数据有序

在这里插入图片描述

3. 数据乱序

  1. kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
  1. kafka在1.x及以后版本保证数据单分区有序,条件如下:
    a.未开启幂等性 : max.in.flight.requests.per.connection需要设置为1
    b.开启幂等性: max.in.flight.requests.per.connection需要设置小于等于5
    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,
    故无论如何,都可以保证最近5个request的数据都是有序的。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/727071.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker 部署项目,真的太雅了~

大家好&#xff0c;我是南城余&#xff01; 最近在找工作&#xff0c;正好手里有台服务器&#xff0c;之前项目上线用的宝塔部署项目上线&#xff0c;在公司实习了一年后&#xff0c;发现如今项目部署都使用的是容器化部署方案&#xff0c;也就是类似于和 Docker 一样的部署方案…

鸿蒙开发通信与连接:【@ohos.nfc.cardEmulation (标准NFC-cardEmulation)】

标准NFC-cardEmulation 本模块主要用于操作及管理NFC卡模拟。 说明&#xff1a; 本模块首批接口从API version 8开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import cardEmulation from ohos.nfc.cardEmulation;cardEmulation.is…

全网最易懂,开源时序数据库influxDB,实际应用评测

前言&#xff1a; 当今是信息爆炸的时代&#xff0c;在处理高频数据时&#xff0c;关系型数据库oracle/mysql明显表现出乏力&#xff0c;因秒级、毫秒级高频数据&#xff0c;分分钟可以把关系型数据库的表塞爆。在日常生活工作中&#xff0c;我们经常会遇到哪些需要高频分析的场…

NXP实战笔记(十四):32K3xx基于RTD-SDK在S32DS实现HSE的安装。

目录 1、概述 1.1、什么是HSE&#xff1f; 1.2、如何实现HSE的OTA功能 1.3、S32K3放置HSE的地址 2、通过调试器安装HSE 3、通过IVT方式安装HSE 4、坑点慎重踩 4.1、优化等级 4.2、Flash放RAM 4.3、C40_Ip配置更改 4.4、程序烧录 5、测试结果 6、代码链接 1、概述 首…

innovus:如何设置timing报告格式

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 在flow中添加如下设置即可设置好timing report的格式。 set report_timing_format [list timing_point arc net cell fanout load slew incr_delay delay arrival total_derate…

0 简单的图像分类

本文主要针对交通标识图片进行分类&#xff0c;包含62类&#xff0c;这个就是当前科大讯飞比赛&#xff0c;目前准确率在0.94左右&#xff0c;难点如下&#xff1a; 1 类别不均衡&#xff0c;有得种类图片2百多&#xff0c;有个只有10个不到&#xff1b; 2 像素大小不同&…

超越Llama3,多模态比肩GPT4V:GLM-4智能体,新一代语言处理利器

在人工智能领域&#xff0c;自然语言处理技术一直备受关注。就在昨日,今年备受关注的国内AI公司北京智谱AI发布了第四代 GLM 系列开源模型&#xff1a;GLM-4-9B。这是一个集成了先进自然语言处理技术的创新平台&#xff0c;它凭借清华大学KEG实验室提出的GLM模型结构&#xff0…

18V-180V降12V500mA恒压模块WT5118

18V-180V降12V500mA恒压模块WT5118 WT5118是一款能够将输入电压范围从18V至180V降低至12V并保持恒定输出电流500mA的恒压模块。 WT5118 是一款专为开关电源设计的集成了 180V 高电压 MOSFET 的 DC-DC 控制器。这个设备具备内置高压启动和自供电功能&#xff0c;能够满足快速启…

OpenAI 推出“模型规范”:塑造责任制的人工智能的框架

为了提升人工智能开发的责任性和透明度&#xff0c;OpenAI 最近发布了一份名为“模型规范”的初步草案。这份文件首次明确了其 API 和 ChatGPT 模型行为的指导原则&#xff0c;并通过博客形式对外公布。 OpenAI 在博客中解释说&#xff1a;“我们之所以发布此文档&#xff0c;…

「Python-docx 专栏」docx 设置页面边距、页眉页脚高度

本文目录 前言一、docx 页面边距在哪里二、对 <w:pgMar> 的详细说明1、上边距的说明2、右边距的说明3、下边距的说明4、左边距的说明5、页眉高度的说明6、页脚高度的说明三、设置 docx 页边距、页眉页脚高度1、完整代码2、代码执行效果图四、补充一些内容1、页面边距的两…

图解Sieve of Eratosthenes(埃拉托斯特尼筛法)算法求解素数个数

1.素数的定义 素数又称质数。质数是指在大于1的自然数中&#xff0c;除了1和它本身以外不再有其他因数的自然数。一个大于1的自然数&#xff0c;除了1和它自身外&#xff0c;不能被其他自然数整除的数叫做质数&#xff1b;否则称为合数&#xff08;规定1既不是质数也不是合数&…

【绝对有用】刚刚开通的GPT-4o计算这种数学题目出现问题了

欢迎关注如何解决以上问题的方法&#xff1a;查看个人简介中的链接的具体解决方案

Nvidia Isaac Sim搭建仿真环境 入门教程 2024(4)

Nvidia Isaac Sim 入门教程 2024 版权信息 Copyright 2023-2024 Herman YeAuromix. All rights reserved.This course and all of its associated content, including but not limited to text, images, videos, and any other materials, are protected by copyright law. …

【iOS】#include、#import、@class、@import

文章目录 #include#importclassimport总结 #include #include是c\c中的预处理器指令&#xff0c;用于包含头文件的内容 但是使用#include可能会出现重复包含文件的问题&#xff0c;因此需要使用&#xff08;#ifndef/#define/#endif&#xff09;。 #import //导入系统头文件…

候选键的确定方法-如何判断属性集U的子集K是否为候选键、如何找到关系模式的候选键

一、候选键的定义 在关系模式R(U,F)中&#xff0c;若&#xff0c;且K满足&#xff0c;则K为关系模式R的候选键 关系模式R的候选键必须满足以下两个条件&#xff1a; &#xff08;1&#xff09;必须是属性集U的子集 &#xff08;2&#xff09;完全函数决定属性集U 二、如何…

【网络安全的神秘世界】已解决Failed to start proxy service on 127.0.0.1:8080

&#x1f31d;博客主页&#xff1a;泥菩萨 &#x1f496;专栏&#xff1a;Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 解决burpsuite无法在 127.0.0.1&#xff1a;8080 上启动代理服务端口被占用以及抓不到本地包的问题 Burpsuite无法启动proxy…

令人震撼的人类智慧的科学领域-AI技术

AI&#xff0c;全称为人工智能&#xff08;Artificial Intelligence&#xff09;&#xff0c;是一门致力于让机器模仿人类智慧的科学领域。其核心技术涵盖了机器学习、自然语言处理、计算机视觉及专家系统等多个方面。AI旨在开发能够感知环境、进行逻辑推理、自主学习并做出决策…

Python数据可视化:直方图、核密度估计图、箱线图、累积分布函数图

本文使用数据来源自2023年数学建模国赛C题&#xff0c;以附件1、附件2数据为基础&#xff0c;通过excel的数据透视表等功能重新汇总了一份新的数据表&#xff0c;从中截取了一部分数据为例用于绘制图表。绘制的图表包括一维直方图、一维核密度估计图、二维直方图、二维核密度估…

godot所有2D节点介绍

五十个2D节点介绍 2D节点介绍 前言一、Node2D二、sprite2D三、AnimatedSprite2D四、Camera2D五、PhysicsBody2D六、 RigidBody2D七、CharacterBody2D八、StaticBody2D九、joint2D十、DampedSpringJoint2D十一、GrooveJoint2D十二、PinJoint2D十三、Area2D十四、AnimatableBody2…

cloud_enum:一款针对不同平台云环境安全的OSINT工具

关于cloud_enum cloud_enum是一款功能强大的云环境安全OSINT工具&#xff0c;该工具支持AWS、Azure和Google Cloud三种不同的云环境&#xff0c;旨在帮助广大研究人员枚举目标云环境中的公共资源&#xff0c;并尝试寻找其中潜在的安全威胁。 功能介绍 当前版本的cloud_enum支…