消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
     // intercept the record, which can be potentially modified; this method does not throw exceptions
     // 这里给开发者提供了前置处理的勾子
     ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
     // 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的
     return doSend(interceptedRecord, callback);
 }

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {
     serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in key.serializer", cce);
 }
 byte[] serializedValue;
 try {
     serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in value.serializer", cce);
 }
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
   Integer partition = record.partition();
   return partition != null ?
           partition :
           partitioner.partition(
                   record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      // 唤醒sender 去发送数据
      this.sender.wakeup();
  }
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
      addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);
 InFlightRequest inFlightRequest = new InFlightRequest(
         clientRequest,
         header,
         isInternalRequest,
         request,
         send,
         now);
 this.inFlightRequests.add(inFlightRequest);
 selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
// 如果有返回
if (response.hasResponse()) {
          ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
          for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
              TopicPartition tp = entry.getKey();
              ProduceResponse.PartitionResponse partResp = entry.getValue();
              ProducerBatch batch = batches.get(tp);
              completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
          }
          this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
      } 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
       // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
       produceFuture.set(baseOffset, logAppendTime, exception);

       // execute callbacks
       for (Thunk thunk : thunks) {
           try {
               if (exception == null) {
                   RecordMetadata metadata = thunk.future.value();
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(metadata, null);
               } else {
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(null, exception);
               }
           } catch (Exception e) {
               log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
           }
       }

       produceFuture.done();
   }

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/440816.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【CSP试题回顾】202104-2-邻域均值

CSP-202104-2-邻域均值 关键点&#xff1a;二维差分数组 详见&#xff1a;【CSP考点回顾】差分数组 解题思路 初始化矩阵和参数&#xff1a;首先&#xff0c;代码接收矩阵的大小&#xff08;n x n&#xff09;&#xff0c;每个元素的亮度值&#xff08;位于[0, L]区间&…

基于Vue的体育汇App设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 核心技术的理论与分析 3 1.1 客户端技术 3 1.1.1 Vue.js框架 3 1.1.2 Vue.js路由管理 3 1.1.3 Vuex状态管理 3 1.1.4 MVVM开发模式 4 1.1.5 Vant组件库 5 1.2 服务端技术 5 1.2.1 Node.js 5 1.2.2 Egg.js框架 5 1.3 数据库技术 6 1.4 本章…

webUI自动化测试框架

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

【LeetCode】升级打怪之路 Day 16:二叉树题型 —— 二叉树的构造

今日题目&#xff1a; 654. 最大二叉树105. 从前序与中序遍历序列构造二叉树106. 从中序与后序遍历序列构造二叉树889. 根据前序和后序遍历构造二叉树 目录 LC 654. 最大二叉树 【easy】 Problem&#xff1a;根据遍历序列来还原二叉树 【classic】 ⭐⭐⭐⭐⭐LC 105. 从前序与中…

数据库原理实验课(1)

目录 实验内容 安装头歌中的相关内容 具体过程 完结撒花~ 我也是第一次接触oracle的相关软件和操作&#xff0c;所以是一次傻瓜式教学记录 实验内容 安装头歌中的相关内容 具体过程 这是我在百度网盘中下载解压出来的oracle文件夹内的全部内容&#xff08;可能有因为安装完…

使用Portainer让测试环境搭建飞起来

Docker的用处不多加赘述&#xff0c;Docker目前有以下应用场景&#xff1a; 测试&#xff1a;Docker很适合用于测试发布&#xff0c;将 Docker 封装后可以直接提供给测试人员进行运行&#xff0c;不再需要测试人员与运维、开发进行配合&#xff0c;进行环境搭建与部署。 测试…

【技术】基于Github Pages搭建个人博客静态网页

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 文章目录 一、技术基础二、新建特殊仓库三、上传网页文件四、Github Pages设置 个人网页作为仅服务个人的网页&#xff0c;一…

Grafana变量默认全选

注&#xff1a;本文基于Grafana v9.2.8编写 1 问题 EKS集群里的node按照不同label被分为几类&#xff0c;我需要对这几类的node做一些统计。我希望当我使用lable选择时&#xff0c;node的值自动设置为该lable的所有node集合&#xff0c;而不需要再手动全选。 2 解决方案 变…

微信小程序(五十四)腾讯位置服务示范(2024/3/8更新)

教程如下&#xff1a; 上一篇 1.先在官网注册一下账号&#xff08;该绑定的都绑定一下&#xff09; 腾讯位置服务官网 2.进入控制台 3.创建应用 3. 额度分配 4.下载微信小程序SDK 微信小程序SDK下载渠道 5.解压将俩js文件放在项目合适的地方 6.加入安全域名or设置不验证合…

扩展CArray类,增加Contain函数

CArray不包含查找类的函数&#xff0c;使用不便。考虑扩展CArray类&#xff0c;增加Contain函数&#xff0c;通过回调函数暴露数组元素的比较方法&#xff0c;由外部定义。该方法相对重载数组元素的“”符号更加灵活&#xff0c;可以根据需要配置不同的回调函数进行比较 //类型…

AD20中关于“failed to add class member”的解决方法

目录 问题描述&#xff1a; 解决方法&#xff1a; 1.切换至PCB界面-选择工具栏的设计-类 2.把Component classes中的所有的类全部按照图中删除&#xff0c;保存 3.重新返回原理图界面转换PCB即可成功 问题描述&#xff1a; failed to add class member&#xff1a;未能添加…

解答关于:水牛社软件是做什么的?水牛社软件靠谱么?

很多对我们软件感兴趣但是没有入手的观望者都会有这样的疑问&#xff1a;水牛社软件具体是做什么的&#xff1f;水牛社软件靠谱么&#xff1f; 其实软件的简介已经讲的很清楚了&#xff0c;但是软件不是功能性软件&#xff0c;所以不能给大家免费试用&#xff0c;短期任务版块…

智能驾驶规划控制理论学习08-自动驾驶控制模块(轨迹跟踪)

目录 一、基于几何的轨迹跟踪方法 1、基本思想 2、纯追踪 3、Stanly Method 二、PID控制器 三、LQR&#xff08;Linear Quadratic Regulator&#xff09; 1、基本思想 2、LQR解法 3、案例学习 基于LQR的路径跟踪 基于LQR的速度跟踪 4、MPC&#xff08;Mode…

Python通过SFTP实现网络设备配置备份

一、背景 为了防止网络设备意外损坏&#xff0c;导致配置文件无法恢复&#xff0c;可以通过将网络设备的配置文件备份到本地电脑上。 一般情况下&#xff0c;设备支持通过FTP、TFTP、FTPS、SFTP和SCP备份配置文件。其中使用FTP和TFTP备份配置文件比较简单&#xff0c;但是存在…

JAVA虚拟机实战篇之内存调优[4](内存溢出问题案例)

文章目录 版权声明修复问题内存溢出问题分类 分页查询文章接口的内存溢出问题背景解决思路问题根源解决思路 Mybatis导致的内存溢出问题背景问题根源解决思路 导出大文件内存溢出问题背景问题根源解决思路 ThreadLocal占用大量内存问题背景问题根源解决思路 文章内容审核接口的…

尚硅谷JavaScript高级学习笔记

01 准备 JavaScript中函数是对象。我们后续描述构造函数的内存模型时&#xff0c;会将构造函数称为构造函数对象。 02 数据类型 typeof 运算符来查看值的类型&#xff0c;它返回的是类型的字符串值 会做数据转换 03 相关问题 04数据_变量_内存 05相关问题1 06相关问题2 …

办公电脑换成MacBookPro半年之后……

小白是从2008年开始接触电脑的&#xff0c;当时朋友给我注册的第一个QQ账号是2008年4月。 从此&#xff0c;小白一直认为电脑全部都是Windows系统。直到上大学那年&#xff0c;看到了外教老师的MacBookPro…… 折腾电脑的开始居然是起源于诺基亚手机&#xff0c;给半智能S40的…

Igraph入门指南 3

4、图转换到其他R数据结构 图是对实体关系的表达&#xff0c;在igraph中&#xff0c;图可以转换为三种数据结构。 4-1 图转邻接矩阵&#xff1a;as_adjacency_matrix | as_adj&#xff0c;结果是矩阵 邻接矩阵又分为有向图邻接矩阵和无向图邻接矩阵&#xff0c;但本函数使用…

老司机都懂的!【打赏】完美运营的最新视频打赏系统

完美运营的最新视频打赏系统优于市面上95%的打赏系统&#xff0c;与其他打赏系统相比&#xff0c;功能更加强大&#xff0c;完美运营且无bug。支付会调、短链接生成、代理后台、价格设置和试看功能等均没有问题。 以上为原简介&#xff0c;经测试验证。成功搭建并可以正常进入…

Linux学习之线程

目录 线程概念 1.什么是线程&#xff1f; 2.线程的优缺点 3.线程异常 4.线程用途 线程操作 1.如何给线程传参 2.线程终止 3.获取返回值 4.分离状态 5.退出线程 线程的用户级地址空间&#xff1a; 线程的局部存储 线程的同步与互斥 互斥量mutex 数据不一致的主要过…