RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
        return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/259628.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

go语言函数二、init函数定义与作用

go语言init函数定义与作用 在go语言中&#xff0c;每一个源文件都可以包含一个init函数&#xff0c;这个函数会在main函数执行前&#xff0c;被go运行框架调用&#xff0c;注意是在main函数执行前。 package main import ("fmt" )func init() {fmt.Println("i…

石器时代H5小游戏架设教程

本文讲解石器时代 H5 之恐龙宝贝架设教程&#xff0c;想研究 H5 游戏如何实现&#xff0c;那请跟着此次教程学习在拥有小游戏源码的情况下该如何搭建起来 开始架设 1. 架设条件 石器时代架设需要准备&#xff1a; 一台linux 服务器&#xff0c;建议 CentOs 7.6 版本&#xf…

数据安全扫描仪荣膺网络安全优秀创新成果大赛优胜奖 - 凸显多重优势

近日&#xff0c;由中国网络安全产业联盟&#xff08;CCIA&#xff09;主办、CCI数据安全工作委员会中国电子技术标准化研究院等单位承办的“2023年网络安全优秀创新成果大赛”获奖名单公布。天空卫士数据安全扫描仪&#xff08;DSS&#xff09;产品获得创新成果大赛优胜奖。 本…

C++中的RAII机制及其智能指针的应用

一、引言 C是一种高效且功能强大的编程语言&#xff0c;但内存管理一直是其一大挑战。为了简化资源管理&#xff0c;C引入了RAII&#xff08;Resource Acquisition Is Initialization&#xff09;机制。本文将深入探讨RAII的原理&#xff0c;并通过智能指针这一具体实现来展示…

云原生之深入解析Thanos在EKS多集群架构上存储多个集群Prometheus

一、前言 随着 HiredScore 的产品和客户群越来越大&#xff0c;已经开始向 Kubernetes 过渡并迅速采用它&#xff0c;它是我们重要的障碍之一&#xff0c;也可能是最大的监控基础设施。我们在使用 Prometheus / Grafana 堆栈进行监控方面有一些经验&#xff0c;了解到希望创建…

如何用 CleanMyMac 来保护 Mac 隐私

大家早上好&#xff0c;中午好&#xff0c;下午好&#xff0c;晚上好。 在我们使用MacBook上的自带浏览器-Safari&#xff08;或者一些其他浏览器&#xff09;进行网页浏览的时候&#xff0c;往往会留下一些痕迹。如果这些痕迹涉及一些敏感数据信息的话&#xff0c;那么我们肯…

1_js基本简介数据类型变量的使用

1. 编程语言简介 1.1 计算机编程语言 计算机编程语言是程序设计的最重要的工具&#xff0c;它是指计算机能够接受和处理的、具有一定语法规则的语言。从计算机诞生&#xff0c;计算机语言经历了机器语言、汇编语言和高级语言几个阶段。 高级语言&#xff1a;JavaScript&#x…

Flink Table API 与 SQL 编程整理

Flink API总共分为4层这里主要整理Table API的使用 Table API是流处理和批处理通用的关系型API&#xff0c;Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的&#xff0c;Table API是Scala和Java语言集成式…

Android Studio: 解决Gradle sync failed 错误

文章目录 1. 前言2. 错误情况3. 解决办法3.1 获取gradle下载地址3.2 获取gradle存放目录3.3 替换并删除临时文件3.4 触发Try Again 4. 执行成功 1. 前言 今天调试项目&#xff0c;发现新装的AS&#xff0c;在下载gradle的过程中&#xff0c;一直显示连接失败&#xff0c;Gradl…

【MyBatis学习笔记】MyBatis基础学习

MyBatis基础 MyBatis简介MyBatis特性MyBatis下载和其他持久化层技术对比 核心配置文件详解默认的类型别名 搭建MyBatis开发环境创建maven工程创建MyBatis的核心配置文件创建mapper接口创建MyBatis的映射文件通过junit测试功能加入log4j日志功能 MyBatis获取参数值的两种方式&am…

欧美电商平台Depop如何入驻?

对标美国二手闲鱼平台Mercia,PoshMark、东南亚Etsy&#xff0c;Depop是英国的一个面向创意人群的二手时尚市场&#xff0c;类似于Instagram&#xff0c;但更专注于买卖二手服装、配饰和艺术品。 有研究显示,由于购物预算减少,高达65%的受访者表示乐意在圣诞节购买或收到二手礼…

【⭐️大厂干货】阿里程序员私藏提效IntelliJ IDEA插件大放送️ ~

️ ❗文末有&#x1f308;&#x1f95a;&#xff08;彩蛋&#xff09; 要问对后端程序员最重要的软件是哪个&#xff1f;IntelliJ IDEA说第二&#xff0c;估计没有其他软件可以称第一。在工作过程中我发现对于这么重要的软件&#xff0c;有些开发同学竟然把它“打扮”的甚是简…

AI日报:信用公司转型人工智能的长采访...或许能给你一些启发

欢迎订阅专栏 《AI日报》 获取人工智能邻域最新资讯 文章目录 总览什么是FICOFICO的转型如何转型人工智能平台功能 构建人工智能平台中遇到的问题关于生成式人工智能银行客户的反馈内部情况 一些社会问题 总览 FICO软件执行副总裁Stephanie Covert加入人工智能商业播客&#x…

MongoDB的原子操作findAndModify和findOneAndUpdate

本文主要介绍MongoDB的原子操作findAndModify和findOneAndUpdate。 目录 MongoDB的原子操作一、findAndModify二、findOneAndUpdate MongoDB的原子操作 MongoDB的原子操作指的是在单个操作中对数据库的数据进行读取和修改&#xff0c;并确保操作是原子的&#xff0c;即要么完全…

linux实现shell脚本监控磁盘内存达到阈值时清理catalina.out日志

想在服务器上写一个shell脚本&#xff0c;在磁盘使用率达到80%时&#xff0c;自动清理掉一些没有用的日志文件&#xff0c;根据这个想法&#xff0c;在生产环境上写了一个以下脚本&#xff0c;按照该流程&#xff0c;可实现在linux环境做一个定时任务来执行shell脚本&#xff0…

Pixel Nerf代码阅读

Input&#xff1a; 图像的 分辨率是 300*400&#xff1b; 每个场景里面有 49张 Training 的图像。 SB&#xff1a; scene batch 场景的个数&#xff1b; 4 NV&#xff1a; number input &#xff0c;每个场景的视角&#xff0c;也就是图像的数量&#xff1b; 49 每条光线首先…

JS逆向实战——开发者工具检测

说明&#xff1a;仅供学习使用&#xff0c;请勿用于非法用途&#xff0c;若有侵权&#xff0c;请联系博主删除 作者&#xff1a;zhu6201976 一、背景 在JS逆向领域&#xff0c;Chrome开发者工具是核心&#xff0c;抓包、调试、看调用栈等都离不开它。可以说&#xff0c;逆向人…

blast安装及简单使用

一、安装blast 1.Ubuntu环境 # 下载blast wget https://ftp.ncbi.nlm.nih.gov/blast/executables/blast/2.9.0/ncbi-blast-2.9.0-x64-linux.tar.gz # 解压blast压缩包 tar -xzvf ncbi-blast-2.9.0-x64-linux.tar.gz # 测试 ./bin/blastp -h 这里就代表安装成功了&#xff0c;…

Android 架构 - 组件化

一、概念 组件化是对单个功能进行开发&#xff0c;使得功能可以复用。将多个功能组合起来就是一个业务模块&#xff0c;因此去除了模块间的耦合&#xff0c;使得按业务划分的模块成了可单独运行的业务组件。&#xff08;一定程度上的独立&#xff0c;还是依附于整个项目中&…

如何免费搭建私人电影网站(三)

接上一篇文章&#xff1a; 网站模版上传到空间后就进行安装网站了操作如下图&#xff1a; 打开链接地址&#xff1a; 输入前面设置好的FTP密码 进入安装界面 点同意后下一步 需要填入数据库的账号和密码 返回虚拟主机界面进行设置 如下图点初始化 修改数据库的密码 然后…