RocketMQ源码学习笔记:Broker接受消息和发送消息

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、技术亮点
    • 2.1、消息写入时的自旋锁和可重入锁
    • 2.2、堆外内存机制
      • 2.2.1、Overview
      • 2.2.2、源码
        • 2.2.2.1、开启堆外内存的条件
        • 2.2.2.2、堆外内存的初始化
        • 2.2.2.3、写消息到堆外内存
        • 2.2.2.4、堆外内存同步数据到磁盘

1、Overview

这是Broker中类的架构图。

在这里插入图片描述

发送和接收消息的代码流程是从上到下的,比如接受消息的流程就是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage

2、技术亮点

2.1、消息写入时的自旋锁和可重入锁

CommitLog的构造方法中,初始化了这么一个锁。在写入消息时会调用这个锁的lock()unlock()方法。

this.putMessageLock = defaultMessageStore.getMessageStoreConfig()
.isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() 
: new PutMessageSpinLock();

默认情况下是自旋锁,我们也可以配置成可重入锁。

我们看看PutMessageSpinLock怎么实现的。

public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
    @Override
    public void lock() {
        boolean flag;
        do {
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        }
        while (!flag);
    }
    @Override
    public void unlock() {
        this.putMessageSpinLock.compareAndSet(false, true);
    }
}

这个自旋实现的很简单,就是不断地循环然后通过CAS加锁解锁,所以这个锁不会阻塞线程,不涉及操作系统上下文切换,只是CPU空转。

PutMessageReentrantLock则更简单,它直接使用ReentrantLock来加锁解锁。所以可能会导致线程阻塞或者挂起。

官方文档建议,异步刷盘时使用自旋锁,同步刷盘使用可重入锁。

因为异步刷盘速度快,消息到Borker内存就可以返回发送成功,占有锁的时间较少,自旋锁能有最大的效率。

同步刷盘需要等到消息写入磁盘后才能返回发送成功,占有所得时间较长,用自旋锁会导致大量线程空转占用CPU。所以需要用可重入锁将获取锁失败的线程挂起。

2.2、堆外内存机制

2.2.1、Overview

堆外内存机制用于高并发的场景。

因为高并发会在JVM中产生大量的对象,很可能会频繁地触发GC导致STW暂停业务线程。

堆外内存是指从内存中开辟一个新的空间,这个空间的回收不受GC的控制,完全交给开发者。

这片堆外内存会被当成一个缓存,Broker接受到的消息对象会存放到堆外内存中,然后定时从把消息从堆外内存中刷到磁盘。

因为堆外内存的垃圾回收不受GC控制,而是交给开发者,所以就能保证垃圾回收的频率够低,保证业务线程尽可能少地暂停。

这是消息写入时,普通模式和开启堆外内存时的流程图。
在这里插入图片描述


2.2.2、源码

2.2.2.1、开启堆外内存的条件

MessageStoreConfig中可以看到什么情况才会被RocketMQ认为当前开启了堆外内存。

public boolean isTransientStorePoolEnable() {
    return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
        && BrokerRole.SLAVE != getBrokerRole();
}

可以看到,三个条件同时满足才能开启堆外内存。

  1. Broker.conf中设置transientStorePoolEnable=true
  2. 刷盘方式是异步刷盘:第二个刷盘方式必须是异步刷盘。这是因为同步刷盘要求数据写到磁盘后才返回ACK给生产者,这需要较长的时间。但堆外内存的意义就是为了满足高并发,同步刷盘与之相违背,所以只能是异步刷盘。
  3. 当前的Broker必须是Master:因为主从架构中,从节点只能只能被消费者读消息不能被生产者写消息,而堆外内存只是一个写数据时的缓存,读数据还是得从磁盘中读。所以从节点开启堆外内存没意义,反而会占用内存影响性能。

2.2.2.2、堆外内存的初始化

初始化的内容比较简单,靠外部配置就足够的话,一般是在BrokerStartup#createBrokerController中。比较复杂的则是在BrokerController#createBrokerController中。

堆外内存和DefaultMessageStore有关,初始化在DefaultMessageStore的构造方法。下面是相关代码。

if (messageStoreConfig.isTransientStorePoolEnable()) {
    this.transientStorePool.init();
}
public void init() {
    // poolSize = 5 by default
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

        availableBuffers.offer(byteBuffer);
    }
}

从代码中可以看到,初始化具体做的事是新建默认5个ByteBuffer对象,然后存放在availableBuffers中,availableBuffers是一个队列。

这5个ByteBuffer是映射到堆外内存的缓存,后续将通过这几个缓存向堆外内存中放数据。

之后在初始化BrokerController#intialize()中,会通过线程AllocateMappedFileService调用MappedFile#init()方法,将上面初始化好的availableBuffers通过transientStorePool#borrowBuffer()传给MappedFilewriteBuffer。这样写消息时,MappedFile就可以通过writeBuffer向堆外内存写数据。

// init with off-heap
public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}

2.2.2.3、写消息到堆外内存

写消息的流程是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage -> MappdFile#appendMessagInner,即使开启堆外内存也是一样。

所以我们可以来看看MappedFile#appendMessagInner怎么实现写消息到堆外内存。这里截取关键片段。

 // 如果writeBuffer不为空,则开启了堆外内存,否则用正常的mappedByteBuffer
 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
 byteBuffer.position(currentPos);
 AppendMessageResult result;
 if (messageExt instanceof MessageExtBrokerInner) {
     result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
             (MessageExtBrokerInner) messageExt, putMessageContext);
 } else if (messageExt instanceof MessageExtBatch) {
     result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
             (MessageExtBatch) messageExt, putMessageContext);
 } else {
     return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
 }

代码中可以看到,关键点是writeBuffer。之前提到开启了堆外内存,那初始化时会将堆外内存的映射缓存传给MappedFile中的workBuffer;如果没开启堆外内存则writeBuffer为null。

所以开启堆外内存就向writeBuffer写数据到堆外内存;没有开启就向mappedByteBuffer写数据到磁盘。


2.2.2.4、堆外内存同步数据到磁盘

堆外内存只是一个缓存,最终数据应该同步到磁盘。RocketMQ设置了一个定时线程做这个工作,叫CommitRealTimeService

默认200ms同步一次。因为MQ就是用于异步的场景,所以写完数据至少200ms后才能读到也是可以忍耐的。

具体的代码不展示,没什么值得说的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/756959.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

医院消防设施设备管理系统

医院为人员密集场所&#xff0c;且多为各类病患及其陪护人员&#xff0c;一旦发生火灾&#xff0c;人员疏散逃生困难&#xff0c;容易造成较严重的生命与财产损失。为规范医院的消防设施设备管理&#xff0c;通过凡尔码系统对医院消防设施设备进行信息化管理&#xff0c;提高医…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-21多输入多输出通道

21多输入多输出通道 import torch from d2l import torch as d2ldef corr2d(X, K):"""计算二维互相关运算"""h, w K.shapeY torch.zeros((X.shape[0] - h 1, X.shape[1] - w 1))for i in range(Y.shape[0]):for j in range(Y.shape[1]):Y[i,…

大创项目推荐 题目:基于机器视觉opencv的手势检测 手势识别 算法 - 深度学习 卷积神经网络 opencv python

文章目录 1 简介2 传统机器视觉的手势检测2.1 轮廓检测法2.2 算法结果2.3 整体代码实现2.3.1 算法流程 3 深度学习方法做手势识别3.1 经典的卷积神经网络3.2 YOLO系列3.3 SSD3.4 实现步骤3.4.1 数据集3.4.2 图像预处理3.4.3 构建卷积神经网络结构3.4.4 实验训练过程及结果 3.5 …

HarmonyOS开发:应用完整性校验

简介 为了确保应用的完整性和来源可靠&#xff0c;OpenHarmony需要对应用进行签名和验签。 应用开发阶段&#xff1a; 开发者完成开发并生成安装包后&#xff0c;需要开发者对安装包进行签名&#xff0c;以证明安装包发布到设备的过程中没有被篡改。OpenHarmony的应用完整性校…

【Docker0】网络更改

目录 1. 停止docker服务 2. 关闭docker默认桥接网络接口 3. 从系统删除docker0接口 4. 创建一个名为bridge0的新接口 5. 添加ip地址和子网掩码 6. 启用bridge0接口 7. &#xff08;如果没起来就执行该句&#xff09; 8. 查看ip 1. 停止docker服务 sudo service docker…

SpringBoot: Eureka入门

1. IP列表 公司发展到一定的规模之后&#xff0c;应用拆分是无可避免的。假设我们有2个服务(服务A、服务B)&#xff0c;如果服务A要调用服务B&#xff0c;我们能怎么做呢&#xff1f;最简单的方法是让服务A配置服务B的所有节点的IP&#xff0c;在服务A内部做负载均衡调用服务B…

Linux之进程控制(上)

目录 进程创建 进程终止 进程退出码 进程终止的方式 进程等待 进程等待的方式 status概述 总结 上期我们学习了Linux中进程地址空间的概念&#xff0c;至此进程的所有基本概念已经全部学习完成&#xff0c;今天我们将开始学习进程相关的操作。 进程创建 进程创建其实…

7 个不容忽视的开源安全工具

专业人士选择的第一个工具通常是开源选项,因为它们得到了广泛社区的保证和支持。此代码是支持安全可靠的互联网的基础的一部分。 最近,XZ Utils 等丑闻让用户犹豫不决。开放性是否是攻击的危险载体?还有其他问题在等着他们吗? 辩护者指出,虽然开放性可以让某些攻击变得更…

刷代码随想录有感(120):贪心算法——买卖股票的最佳时机

题干&#xff1a; 代码&#xff1a; class Solution { public:int maxProfit(vector<int>& prices) {int low INT_MAX;int res INT_MIN;for(int i 0; i < prices.size(); i){low min(low, prices[i]);res max(res, prices[i] - low);}return res;} }; 贪心…

在本地和Linux之间传输文件

1.打开本地的cmd窗口 2. 然后按这个链接的说法在cmd中远程连接Linux&#xff08;技术|如何在 Linux 中使用 sFTP 上传或下载文件与文件夹&#xff09; 3. 看这个链接里面的sftp命令进行本地和Linux之间的文件互传 &#xff08;https://www.cnblogs.com/niuben/p/13324099.htm…

动态规划基础练习

我们需要先从数组较大的开始进行处理&#xff0c;每次考察上下左右的&#xff0c;比较当前存储的最大值和转移来的值&#xff0c;哪一个大一点 #define _CRT_SECURE_NO_WARNINGS #include<bits/stdc.h> using namespace std;int n, m; int a[105][105]; int addx[] { 0,…

远程过程调用协议gRPC及在go环境下的使用

1. 远程过程调用协议 1.1 定义 远程过程调用(Remote Procedure Call&#xff0c;PRC是一种进程间通信技术&#xff0c;它使得程序可以像调用本地函数一样调用远程服务器上的函数。RPC 屏蔽了底层的通信细节&#xff0c;让开发者能够更专注于业务逻辑&#xff0c;而无需关心网络…

C语言力扣刷题4——删除链表的倒数第 N 个结点[双指针],只遍历一遍

力扣刷题4——删除链表的倒数第 N 个结点[双指针] 一、博客声明二、题目描述三、解题思路1、思路说明 四、解题代码&#xff08;附注释&#xff09; 一、博客声明 找工作逃不过刷题&#xff0c;为了更好的督促自己学习以及理解力扣大佬们的解题思路&#xff0c;开辟这个系列来记…

“用友审批,兴业付款”,YonSuite让企业资金调拨更高效

随着企业规模的扩大和经营的不断深入&#xff0c;企业的分支机构分布各地&#xff0c;在多银行多账户的资金管理上面临着诸多挑战。传统的资金管理方式往往效率低下、风险较高&#xff0c;难以满足企业快速发展的需求。 为了解决这些痛点&#xff0c;用友网络科技股份有限公司…

MathType2024最新破解版在哪里可以下载?

在当今科技日益发展的时代&#xff0c;我们每个人都可能遇到需要在电子文档、网页或其他平台中输入复杂数学公式的情况。这时&#xff0c;一个强大且易用的数学公式编辑器就成了我们迫切需要的工具。而MathType&#xff0c;作为一款专业、精准的数学公式编辑器&#xff0c;无疑…

Redis为什么设计多个数据库

​关于Redis的知识前面已经介绍过很多了,但有个点没有讲,那就是一个Redis的实例并不是只有一个数据库,一般情况下,默认是Databases 0。 一 内部结构 设计如下: Redis 的源码中定义了 redisDb 结构体来表示单个数据库。这个结构有若干重要字段,比如: dict:该字段存储了…

人生最有力,最棒的十句话!

人生最有力&#xff0c;最棒的十句话 1、允许一切事发生&#xff0c;所有一切发生的事不是你能阻挡了的&#xff0c;你接受&#xff0c;他也发生&#xff0c;你不接受&#xff0c;他也发生&#xff0c;你还不如坦然面对接受现实。 2、你焦虑的时候千万不要躺着啥也不干&#xf…

机器人控制系列教程之关节空间运动控制器搭建(1)

机器人位置控制类型 机器人位置控制分为两种类型&#xff1a; 关节空间运动控制—在这种情况下&#xff0c;机器人的位置输入被指定为一组关节角度或位置的向量&#xff0c;这被称为机器人的关节配置&#xff0c;记作q。控制器跟踪一个参考配置&#xff0c;记作 q r e f q_{re…

2SK241 LTSpice模型及仿真

2SK241是东芝生产的一款NMOS&#xff0c;早已停产&#xff0c;但是在收音机圈子里还是有很多死忠粉&#xff0c;所以在淘宝上也是一堆打磨改标的假货。 言归正传&#xff0c;在矿坛上找到了2SK241的模型&#xff1a; .model M2SK241bottom NMOS(Level1 Rd1 Rs10 Rg50 Kp8mV…

《数据结构与算法基础 by王卓老师》学习笔记——类C语言有关操作补充

1.元素类型说明 2.数组定义 3.C语言的内存动态分配 4..C中的参数传递 5.传值方式 6.传地址方式 例子