消息中间件之RocketMQ源码分析(十三)

Broker消息存储机制

RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,整个过程如图
在这里插入图片描述

Commit目录下有多个CommitLog文件,其实CommitLog只有一个文件,
为了方便保存和读写,被切分为多个子文件,所有的子文件通过其保存的
第一个和最后一个消息的物理位点进行连接。Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要加锁
在这里插入图片描述

1.Broker接收客户端发送消息的请求并做预处理

SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。
该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、执行发送处理后的Hook
随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据不同的客户端请求协议版本做不同处理
在这里插入图片描述
在这里插入图片描述

2.Broker存储前预处理消息

预处理方法为SendMessageProcessor.sendMessage()
Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以通过返回标志来判断。
其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():该方法的主要功能如下:
a.校验Broker是否配置可写
b.校验Topic名字是否为默认值
c.校验Topic配置是否存在
d.校验queueId与读写队列数是否匹配
e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
在这里插入图片描述
在这里插入图片描述

3.执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查

在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等
总结如下:
a.校验存储模块是否已经关闭
b.校验Broker是否是Slave
c.校验存储模块运行标记
d.校验Topic长度
e.校验扩展信息的长度
f.校验操作系统Page Cache是否繁忙
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
begin:CommitLog加锁开始时间,写CommitLog成功后,该值为0
diff:当前时间和CommitLog持有锁时间的差值
如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,并且持有锁的时间超过设置的阈值

4.执行CommitLog.putMessage()方法,后面版本中将默认异步保存

存储消息的核心处理过程如下:
a.设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
b.延迟消息处理.如果发送的消息是延迟消息,这里会单独设置延迟消息的
数据字段,比如修改Topic为延迟消息特有的Topic–SCHEDULE_TOPIC_XXX,并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费
c.获取最后一个CommitLog文件实例MappedFile。锁住该MappedFile.默认为自旋锁,也可以通过useReetrantLockWhenPutMessage进行配置、修改和使用ReentrantLock
d:校验最后一个MappedFile,如果结果为空或已写满,则新创建一个MappedFile返回
e:调用MappedFile.appendMEssage()方法,将消息写入MappedFile
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend()方法,
并将消息写入PageCache,该方法的功能包含以下几点:
1.查找即将写入的消息物理Offset
2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0
3.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,
DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,所以这个机制也称为"读写分离"
4.更新消息所在Queue的位点
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
以上代码中,CommitLog.this.TopicQueueTable类型是HashMap<String/* topic-queueid /, Long/ offset */>,
CommitLog.this.TopicQueueTable的key是Topic名字与消息所在的Queue的QueueId的构成,value是消息位点值
在这里插入图片描述
在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用(有些版本是handleDiskFlush()方法和handleHA()方法)
CommitLog.submitFlushRequest()和submitReplicaRequest()
在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
从以上代码可以看到,每间隔10ms会执行一次cleanExpiredRequest()方法,清理一些非法过期的请求。
第一种,系统繁忙时发送消息请求快速失败处理。
当操作系统PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作队列中取出来,直接返回SYSTEM_BUSY。如果此种情况持续发生说明系统已经不堪重负,需要增加系统资源或者扩容来减轻当前Broker的压力
第二种,发送请求超时处理
第三种,拉取消息请求超时处理
第二种和第三种的代码逻辑与第一种代码逻辑的处理类似,如果出现了,说明请求在线程池的工作队列中排队时间超过预期配置的时间,那么增加排队等待时间即可。如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/402475.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端构建效率优化之路

项目背景 我们的系统&#xff08;一个 ToB 的 Web 单页应用&#xff09;前端单页应用经过多年的迭代&#xff0c;目前已经累积有大几十万行的业务代码&#xff0c;30 路由模块&#xff0c;整体的代码量和复杂度还是比较高的。 项目整体是基于 Vue TypeScirpt&#xff0c;而构…

PostgreSQL与MySQL,谁更胜一筹

前言 PostgreSQL与MySQL都是优秀的开源数据库。在日常学习中&#xff0c;新手可能接触最多的是MySql,但是实际工作中&#xff0c;两者的应用场景其实都很广。我之前的做过上网流量销售业务&#xff0c;用的是MySQL,现在接触广告业务&#xff0c;用的是pg数据库&#xff0c;每天…

C语言:指针(一)

目录 1.内存和地址2. 指针变量和地址2.1 取地址操作符&#xff08;&&#xff09;2.2 指针变量和解引用操作符&#xff08;*&#xff09;2.2.1 指针变量2.2.2 解引用操作符&#xff08;*&#xff09; 2.3 指针变量的大小 3.指针变量的类型和意义3.1 指针的解引用3.2 指针 -指…

SQL注入漏洞解析--less-3

1.首先我们打开第三关看一下 2.这个和之前1.2关提示都是一样&#xff0c;所以我们先输入id数字看一下显示什么 3.这里正常回显&#xff0c;当我们后边加上时可以看到页面报错信息。可推断sql语句是单引号字符型且有括号&#xff0c;所以我们需要闭合单引号且也要考虑括号。 4…

FISCO BCOS(十七)利用脚本进行区块链系统监控

要利用脚本进行区块链系统监控&#xff0c;你可以使用各种编程语言编写脚本&#xff0c;如Python、Shell等 利用脚本进行区块链系统监控可以提高系统的稳定性、可靠性&#xff0c;并帮助及时发现和解决潜在问题&#xff0c;从而确保区块链网络的正常运行。本文可以利用脚本来解…

java使用File创建空文件和创建单级文件、多级文件、删除、获得文件夹下的文件和文件夹

1、使用createNewFile创建文件 package com.controller;import org.springframework.web.bind.annotation.*;import java.io.File; import java.io.IOException; import java.util.LinkedList;RestController CrossOrigin RequestMapping("/admin") public class Ad…

IO进程线程作业day5

1> 将互斥机制的代码实现重新敲一遍 #include <myhead.h> int num520;//定义一个全局变量 pthread_mutex_t mutex;//创建锁 //线程1任务 void *task1(void *arg) {puts("任务1");pthread_mutex_lock(&mutex);//上锁num1314;sleep(1);printf("tas…

Liunx使用nginx和http搭建yum-server仓库

文章目录 1. yum-server的搭建方式2. nginx搭建yum-server仓库2.1. 安装配置nginx2.2 配置yum-server的rpm2.3. 同步yum源相关包2.3.1 rsync同步源3.3.1 reposync同步源 2.4. 配置客户端访问yum配置2.5. 验证测试 3. http服务搭建yum-server仓库3.1. 安装配置http3.2 配置yum-s…

代码随想录算法训练营第一天

● 今日学习的文章链接和视频链接 ● 自己看到题目的第一想法 1. 704二分法&#xff1a; 方法一&#xff1a; 整个数组是 左闭右闭区间 [ ] left指针指向数组开始下标&#xff0c; right 指针指向数组最后下表nums.size()-1, mid为 (leftright) /2循环条件 left<rightnu…

论文精读--Noisy Student

一个 EfficientNet 模型首先作为教师模型在标记图像上进行训练&#xff0c;为 300M 未标记图像生成伪标签。然后将相同或更大的 EfficientNet 作为学生模型并结合标记图像和伪标签图像进行训练。学生网络训练完成后变为教师再次训练下一个学生网络&#xff0c;并迭代重复此过程…

unity学习(34)——角色选取界面(跨场景坑多)

先把SelectMenu中的camera的audio listener去掉。 现在还是平面&#xff0c;直接在camera下面添加两个panel即可&#xff0c;应该是用不到canvas了&#xff0c;都是2D的UI。 加完以后问题来了&#xff0c;角色选择界面的按钮跑到主界面上边了&#xff0c;而且现在账号密码都输…

国外创意品牌案例:英国北方铁路公司发布“Try the train”活动

近期&#xff0c;英国北方铁路公司&#xff08;Northern Trains&#xff09;发起了一项名为“Try the train” 的活动&#xff0c;旨在帮助那些对火车感到恐惧的人在搭乘火车时感到更舒适&#xff0c;以解锁公司业务新的增长领域&#xff0c;吸引更多的人在通勤、上学、出游、参…

【蓝桥杯单片机入门记录】静态数码管

目录 一、数码管概述 &#xff08;1&#xff09;认识数码管 &#xff08;2&#xff09;数码管的工作原理 &#xff08;3&#xff09;LED数码管驱动方式-静态显示 二、数码管电路图 三、静态数码管显示例程 &#xff08;1&#xff09;例程1&#xff1a;数码管显示某一位&a…

发布 rust 源码包 (crates.io)

rust 编程语言的包 (或者 库, library) 叫做 crate, 也就是软件中的一个组件. 一个完整的软件通常由多个 crate 组成, rust 编译器 (rustc) 一次编译一整个 crate, 不同的 crate 可以同时并行编译. rust 官方有一个集中发布开源包的网站 crates.io. 发布在这上面的 crate 可以…

个性化纹身设计,Midjourney带你探索独一无二的艺术之美

hello,大家好&#xff0c;欢迎回来。 在当今社会&#xff0c;纹身已经变得非常常见。 在寻求与众不同的个性化纹身时&#xff0c;你是否曾经为了找不到独特的设计而苦恼&#xff1f; 现在&#xff0c;Midjourney将为你打开一扇全新的艺术之门&#xff0c;引领你探索纹身设计…

LaWGPT—基于中文法律知识的大模型

文章目录 LaWGPT&#xff1a;基于中文法律知识的大语言模型数据构建模型及训练步骤两个阶段二次训练流程指令精调步骤计算资源 项目结构模型部署及推理 LawGPT_zh&#xff1a;中文法律大模型&#xff08;獬豸&#xff09;数据构建知识问答模型推理训练步骤 LaWGPT&#xff1a;基…

vue:find查找函数实际开发的使用

find的作用&#xff1a; find 方法主要是查找数组中的属性&#xff0c;会遍历数组&#xff0c;对每一个元素执行提供的函数&#xff0c;直到找到使该函数返回 true 的元素。然后返回该元素的值。如果没有元素满足测试函数&#xff0c;则返回 undefined。 基础使用&#xff1a…

Java入门-可重入锁

可重入锁 什么是可重入锁? 当线程获取某个锁后&#xff0c;还可以继续获取它&#xff0c;可以递归调用&#xff0c;而不会发生死锁&#xff1b; 可重入锁案例 程序可重入加锁 A.class,没有发生死锁。 sychronized锁 package com.wnhz.lock.reentrant;public class Sychroniz…

Stable Diffusion 模型分享:Indigo Furry mix(人类与野兽的混合)

本文收录于《AI绘画从入门到精通》专栏,专栏总目录:点这里。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十

HQYJ 2024-2-21 作业

复习课上内容&#xff08;已完成&#xff09;结构体字节对齐&#xff0c;64位没做完的做完&#xff0c;32位重新都做一遍&#xff0c;课上指定2字节对齐的做一遍&#xff0c;自己验证&#xff08;已完成&#xff09;两种验证大小端对齐的代码写一遍复习指针内容&#xff08;已完…