Kafka之Producer原理

1. 生产者发送消息源码分析

public class  SimpleProducer {
    public static void main(String[] args) {
        Properties pros=new Properties();
        pros.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        pros.put("bootstrap.servers","192.168.8.147:9092");
        pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        // 0 发出去就确认 | 1 leader 落盘就确认| all(-1) 所有Follower同步完才确认
        pros.put("acks","1");
        // 异常自动重试次数
        pros.put("retries",3);
        // 多少条数据发送一次,默认16K
        pros.put("batch.size",16384);
        // 批量发送的等待时间
        pros.put("linger.ms",5);
        // 客户端缓冲区大小,默认32M,满了也会触发消息发送
        pros.put("buffer.memory",33554432);
        // 获取元数据时生产者的阻塞时间,超时后抛出异常
        pros.put("max.block.ms",3000);

        // 创建Sender线程
        Producer<String,String> producer = new KafkaProducer<String,String>(pros);

        for (int i =0 ;i<1000000;i++) {
            producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));
            // System.out.println("发送:"+i);
        }

        //producer.send(new ProducerRecord<String,String>("mytopic","1","1"));
        //producer.send(new ProducerRecord<String,String>("mytopic","2","2"));

        producer.close();
    }

a. 首先我们是创建一些kafka的连接配置以及参数配置,然后先new出来一个生产者,创建一个sender线程,由下图源码可以看出,我们在new生产者的时候,kafak会帮我们船舰一个sender线程,并进行了命名和启动

 b.随后我们的main线程中,进行批量send发送,那么接下来我们看下send方法

可以看到,在send方法中,还有一个interceptors做了一个拦截器的处理, 那么拦截器应该怎么使用的呢?

我们只需要实现ProducerInterceptor中的onsend方法,并且在kafka send消息前进行配置就可以了

public class ChargingInterceptor implements ProducerInterceptor<String, String> {
    // 发送消息的时候触发
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        System.out.println("1分钱1条消息,不管那么多反正先扣钱");
        return record;
}
    

带有拦截器的kafka demo

 public static void main(String[] args) {

        Properties props=new Properties();
        props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        props.put("bootstrap.servers","192.168.8.147:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        // 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认
        props.put("acks","1");
        // 异常自动重试次数
        props.put("retries",3);
        // 多少条数据发送一次,默认16K
        props.put("batch.size",16384);
        // 批量发送的等待时间
        props.put("linger.ms",5);
        // 客户端缓冲区大小,默认32M,满了也会触发消息发送
        props.put("buffer.memory",33554432);
        // 获取元数据时生产者的阻塞时间,超时后抛出异常
        props.put("max.block.ms",3000);

        // 添加拦截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.zsc.mq.kafka.javaapi.interceptor.ChargingInterceptor");
        // 这个键就是拦截器的配置,因为拦截器是个list,因此可以实现多个拦截器
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String,String> producer = new KafkaProducer<String,String>(props);

        producer.send(new ProducerRecord<String,String>("mytopic","1","1"));
        producer.send(new ProducerRecord<String,String>("mytopic","2","2"));

        producer.close();

    }

}

 c.  send方法走完拦截器后,我们进入到dosend方法中,接着看

 

可以看到,kafka对我们的消息进行了一个序列化,那么序列化方式就是在我们初始配置参数的时候进行配置的,可以指定不同的序列化方式,并且也可以自定义序列化方式,实现序列化接口,增加到配置类中即可

d. 看完序列化,我们的消息发送接着往下面走, 进入到分区器流程

 

 

 由上面可知,我们的分区器如果指定了分区就会走我们指定的分区;消息没有指定分区但是自定义了消息分区器,就会走到消息分区器中,自定义消息器代码如下(实现partitioer接口即可):

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String k = (String) key;
        System.out.println(k);
        if (Integer.parseInt(k) % 2 == 0){
            return 0;
        }else{
            return 1;
        }
        // return Integer.parseInt(k)%2;

    }

    @Override
    public void close() {
    }

 还有第三个情况,既没有指定也没有自定义分区器。那么key不为空,那就是走hash取模算法;key也会空的话,就是采用粘连策略(根据topic来确定在哪里存储)

e. 当我们消息的分区器走完之后,就进入到我们的累加器,在上篇博客MQ之初识kafka-CSDN博客我们介绍组件的时候就提到过,kafka为了提升高吞吐,查询效率快,消息并不是堆积在一起的,而是一批一批去放的,因此经过一个累加器。

可以看到 按批次添加到累加器中,那么添加到累加器之后,是怎么触发流程的呢?

f . 顺着源码再往下看,可以看到一个判断,当累加器的批次满了的话或者是刚创建的批次,就会去唤醒sender线程,向Broker中发送消息。

生产者发送消息的整个流程图如下所示:

 

2. ACK应答机制与ISR机制

2.1 服务器端响应策略的必要性

如图所示,我们正常的执行流程是生产者producer向leader中发送消息,然后leader同步到两个follower副本中,那么当发送消息的过程中服务异常的话,我们的leader就接收不到消息了,因此需要一个应答机制来保证我们能够接收到消息,如果leader没有接收到消息,就触发重发机制,让producer重新发送消息给leader

 2.2 ACK应答机制

kafka中提供了三种可靠性级别,可以根据对可靠性和延迟性的要求进行选择

1.acks = 0  producer 不等待 broker动作、直接返回ack
2.acks = 1(默认) producer等待broker 动作、 leader 落盘成功、返回 ack
3.acks=-1(all)  producer 等待 broker 动作、 leader&follower 全部落盘成功、返回 ack

 props.put("ack","0");  不等待ACK

这种情况是说我们的producer发送消息给leader,leader异步返回ack给peoducer告诉消息已经发送成功了,这种正常存储的情况下肯定是没有问题的。但是如果还没有同步副本的情况下,我们的leader此时挂掉了,而producer已经收到了应答,因此不会再重发消息。当再次重启leader所在的服务器时,数据就丢失了

props.put("ack","1");  Leader落盘、返回ACK(默认)

这种下,我们peoducer确定了leader已经落盘了,但是如果极端情况下,leader还没有同步副本给follower,那么此时leader服务器挂了,数据是不是也就丢失了,因为也还没有进行备份

 props.put("ack","-1");  Leader和全部Follower落盘、返回ACK

这种情况是我们的producer等待leader和follower全部落盘成功后,进行ack响应,这种策略的可靠性最高,但是吞吐量是最低的,因此要根据具体业务具体配置。那么这种策略是不是就没有什么问题了呢?当然也有,比如当leader和follower都落盘后,再返回应答信号时,leader挂了,那么peoducer没有收到消息,就会任务leader没有接收到消息,还是会对消息进行重发,那这个问题怎么解决呢? 可以用消息幂等性(在第三章进行赘述说明)

 应答异常

如上图,当一个flower挂了的情况下,是不是我们的leader就没法同步了,没法同步,就会造成整个链路的阻塞,peoducer没收到应答信息还啥也不知道,又往leader发消息,如果这样持续下去,服务是不是就该崩了,因此引入了一个ISR机制。

2.3 ISR机制

 ISR是一组动态维护的同步副本集合,它的作用就是把leader和follower同时放到一个ISR队列中,比如上面的P0_R0挂掉了,同步不积极,那么就把它移除ISR队列,默认为30s,可以经过replica.lag.time.max.ms进行配置,当ISR中的队列都同步完了的话,就返回ACK应答信号

AR = ISR+OSR

3. 消息幂等性

发送消息情形-1: 正常发送

发送消息情形-2:消息发送失败,触发消息重发,造成消息重发写入

 

 发送消息情形-3:消息发送失败,触发消息重发,消息不重复写入

如上图所示,是怎么保证消息不被重复写入的呢?利用幂等性,在发送消息的时候新增两个参数PID与Sequence Number分别代表生产者ID和消息的编码,那么Broker存储的时候也会多加一点空间存储这两个值,当ack应答异常时,再次重发消息到队列中时,就会进行一次判断a.如果PID和sequence Number都相等,则消息写入队列失败,b.如果Sequence Number为1 则顺序写入 c.如果Sequence Number为2,则抛出异常,表示数据有丢失

幂等性生产者发送消息流程总结:

1 Producer 端发送消息(消息本身、 PID Sequence Number
2 Broker 端接收到消息(将消息和 PID Sequence Number 一起保存)
3 、若 ACK 响应失败,生产者重试,再次发送消息

 kafka是在Broker端完成的去重处理

4. Kafka生产者事务

生产者的幂等性只能保证在单分区单会话的场景下有效,因此对于多分区来说,kafka事务就提供了对多个分区写入操作的原子性。但是kafka事务的前提是开启幂等性。

kafka事务API的相关方法

initTransactions() 初始化事务
beginTransaction() 开启事务
commitTransaction() 提交事务
abortTransaction() 中止事务
sendOffsetsToTransaction()

事务的一个demo

    public static void main(String[] args) {
        Properties props=new Properties();
        //props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
        props.put("bootstrap.servers","192.168.8.147:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        // 0 发出去就确认 | 1 leader 落盘就确认| all或-1 所有Follower同步完才确认
        props.put("acks","all");
        // 异常自动重试次数
        props.put("retries",3);
        // 多少条数据发送一次,默认16K
        props.put("batch.size",16384);
        // 批量发送的等待时间
        props.put("linger.ms",5);
        // 客户端缓冲区大小,默认32M,满了也会触发消息发送
        props.put("buffer.memory",33554432);
        // 获取元数据时生产者的阻塞时间,超时后抛出异常
        props.put("max.block.ms",3000);

        props.put("enable.idempotence",true);
        // 事务ID,唯一
        props.put("transactional.id", UUID.randomUUID().toString());

        Producer<String,String> producer = new KafkaProducer<String,String>(props);

        // 初始化事务
        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));
            producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
//            Integer i = 1/0;
            producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));
            // 提交事务
            producer.commitTransaction();
        } catch (KafkaException e) {
            // 中止事务
            producer.abortTransaction();
        }


        producer.close();
    }
}

kafka事务操作的基本流程

最后标记消费状态后,就可以进行消费了

 kafka的事务细节流程:

5. 总结

        本文主要是介绍了kafka生产者端的一些原理,先是从源码出发,介绍了生产者发送消息到Broker经历的一系列过程:先是创建了一个sender线程,然后在发送消息的过程中一次经过拦截器、累加器、分区器最后根据分区的批量消息是否新建或者满了来触发sender线程发送到Broker服务器中。随后我们介绍了,peoducer跟broker服务器之间的交互采用的是应答机制,在这里有3种配置,可根据业务需要来具体配置,当配置-1的时候,我们分析了为什么会出现重发消息的问题,通过幂等性来保证,follower从节点挂了的情况下,应答异常,采用ISR队列机制进行避免。但是幂等性只能保证单分区单会话的场景,而针对多分区的情况下,kafka主要是采用分布式事务来解决,利用分布式ID,事务coordinattor和事务日志分二PC提交,并且对事务的状态进行存储标记,当事务的状态更改为可消费的时候,才会进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/687577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【必会面试题】ThreadLocal的底层原理及其使用场景

目录 原理应用场景优势1. 避免线程安全问题2. 提高性能3. 简化代码 注意事项权衡决策 ThreadLocal是Java中用于创建线程局部变量的一个类&#xff0c;它提供了一种将变量绑定到当前线程的技术&#xff0c;使得每个线程都拥有该变量的独立副本&#xff0c;即使是在多线程环境下也…

GitHub生成SSH密钥,使用SSH进行连接

目录 一、生成新的SSH密钥 二、添加新的SSH密钥 三、测试SSH连接 四、SSH密钥密码 五、创建新仓库并推送到github 说明 使用 SSH URL 将 git clone、git fetch、git pull 或 git push 执行到远程存储库时&#xff0c; 须在计算机上生成 SSH 密钥对&#xff0c;并将公钥添加到…

keil program algorithm 出错

前段时间 在 调试下载算法时&#xff0c;遇到一个奇怪的问题 就是 加载下载算法后&#xff0c; 下载算法的RAM空间 大小不能修改为 单片机的最大RAM&#xff0c;只能改到最大4KB的空间大小, 再大就报错 刚开始报错 一直不知道原因&#xff0c;走了很多弯路&#xff0c; 到最…

SharePoint:智能内容管理,释放数据价值

在Microsoft 365的庞大生态系统中&#xff0c;SharePoint常常被忽视&#xff0c;但它却是整个平台的核心。SharePoint不仅承载着OneDrive、Teams、Power Platform等所有Microsoft 365产品的内容存储&#xff0c;更是企业协作和内容管理的基石。助AI技术的加持&#xff0c;Share…

Microbiome | binning+转录组→首个草鱼肠道基因集目录发布啦

草鱼便宜又好吃 但是你了解草鱼吗&#xff1f; 草鱼的肠道里定殖着成千上万的共生微生物&#xff0c;它们与草鱼共同生存&#xff0c;相互影响。这些微生物在草鱼的新陈代谢、免疫调节等方面发挥着重要作用。 虽然同为经济作物&#xff0c;鱼类的微生态相关研究远远…

U盘未安全退出后提示格式化:原因分析与数据恢复策略

在日常工作和生活中&#xff0c;U盘作为便携式存储设备的代表&#xff0c;因其小巧、方便携带和存储容量大等特点而广受欢迎。然而&#xff0c;不少用户在使用U盘的过程中都遇到过一个令人头疼的问题&#xff1a;U盘在没有安全退出的情况下被直接拔出&#xff0c;再次插入时系统…

[职场] 研究生面试自我介绍_1 #经验分享#知识分享

研究生面试自我介绍 想要进入职场&#xff0c;面试是必不可少的。然而想要面试成功&#xff0c;就需要一个让人印象深刻的自我介绍&#xff0c;好的自我介绍可以让面试官&#xff0c;快速了解自己&#xff0c;快速记住自己。 一、范文1 我是一名硕士研究生&#xff0c;即将毕业…

SOLIDWORKS认证考试的目的

在当今日益发展的工程设计和制造领域&#xff0c;SOLIDWORKS作为一款功能强大的三维CAD设计软件&#xff0c;已经得到了广泛的认可和应用。为了评估和提升用户在使用SOLIDWORKS软件时的专业技能和能力&#xff0c;SOLIDWORKS公司推出了认证考试项目。本文将深入探讨SOLIDWORKS认…

.net 下的身份认证与授权的实现

背景 任何一个系统&#xff0c;都需要对于底层访问的页面和接口进行安全的处理&#xff0c;其中核心就是认证和授权。 另外一个问题就是在实际编程过程中&#xff0c;我们的代码有不同的模式&#xff0c;不同的分层或者在不同的项目之中&#xff0c;如何在不同的地方取得用户…

ACDSee Photo Studio Ultimate v17 解锁版安装教程 (图片编辑器)

前言 ACDSee Photo Studio Ultimate 2024&#xff0c;一款适合各类摄影师和创意人士的综合解决方案&#xff0c;具备了经过省时的本地人工智能 (AI) 强化的全新特性和改进功能&#xff0c;使您能够以最小的投入获得最大的控制&#xff0c;从而更轻松地管理、检索和编辑您的照片…

免费分享一套SpringBoot+Vue校园论坛(微博)系统【论文+源码+SQL脚本】,帅呆了~~

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的SpringBootVue校园论坛(微博)系统&#xff0c;分享下哈。 项目视频演示 【免费】SpringBootVue校园论坛(微博)系统 Java毕业设计_哔哩哔哩_bilibili【免费】SpringBootVue校园论坛(微博)系统 Java毕业设计…

Django学习二:配置mysql,创建model实例,自动创建数据库表,对mysql数据库表已经创建好的进行直接操作和实验。

文章目录 前言一、项目初始化搭建1、创建项目&#xff1a;test_models_django2、创建应用app01 二、配置mysql三、创建model实例&#xff0c;自动创建数据库表1、创建对象User类2、执行命令 四、思考问题&#xff08;****&#xff09;1、是否会生成新表呢&#xff08;答案报错&…

数据结构——哈希表、哈希桶

哈希概念 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找一个元素时&#xff0c;必须要经过关键码的多次比较&#xff0c;顺序查找时间复杂度为O(N),平衡树中为树的高度,即O(logN),搜索的效率取决于搜索过程种元素的比较次…

Java递归删除文件夹

Java可以直接删除文件或者空文件夹&#xff0c;但是当文件夹不为空时&#xff0c;就不能直接删除了&#xff0c;这时候可以使用递归将文件夹直接删除 首先我们假设在D盘创建a文件夹&#xff0c;a中有一个b文件夹&#xff0c;b中有一个c文件夹&#xff0c;c中有三个文本文件&…

22. 计算机网络 - 物理层

通信方式带通调制 通信方式 根据信息在传输线上的传送方向&#xff0c;分为以下三种通信方式&#xff1a; 单工通信&#xff1a;单向传输半双工通信&#xff1a;双向交替传输全双工通信&#xff1a;双向同时传输 带通调制 模拟信号是连续的信号&#xff0c;数字信号是离散的…

新Docker镜像代理地址!

针对近期国内Docker镜像代理地址不能用,新的替换地址&#xff1a; 除了阿里自己账号申请的镜像加速地址外&#xff0c;下面的也可以用 "https://docker.m.daocloud.io", "https://docker.nju.edu.cn", "https://dockerproxy.com" systemctl d…

搭建自己的DNS服务器

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

Unity 资源 之 风格化地形纹理(Stylized Terrain Textures)免费领取

风格化地形纹理&#xff1a;Stylized Terrain Textures 前言资源包内容领取兑换码 前言 亲爱的 Unity 游戏开发者们&#xff0c;我们自豪地为大家推荐最新的每周免费资源&#xff1a;风格化地形纹理&#xff01;这些令人惊叹的纹理将为你的游戏世界带来独特而引人入胜的视觉体…

走进高等学府,ATFX再度亮相雅尔穆克大学,共绘市场发展新蓝图

自2022年成功获得约旦证券委员会&#xff08;JSC&#xff09;颁发牌照后&#xff0c;ATFX植根约旦本土并于同年设立约旦办事处&#xff0c;不断深化与当地各界合作伙伴间的沟通协作&#xff0c;矢志为每一位客户打造优质、便捷、高效的投教服务体验。近日&#xff0c;ATFX再度登…

人工智能系统越来越擅长欺骗我们?

人工智能系统越来越擅长欺骗我们&#xff1f; 一波人工智能系统以他们没有被明确训练过的方式“欺骗”人类&#xff0c;通过为他们的行为提供不真实的解释&#xff0c;或者向人类用户隐瞒真相并误导他们以达到战略目的。 发表在《模式》(Patterns)杂志上的一篇综述论文总结了之…