RabbitMQ 模拟实现【三】:存储设计

文章目录

  • 数据库设计
    • SQLite
    • 配置数据库
    • 实现 数据库
    • 关于哈希表等复杂类的存储
    • 启动数据库
  • 文件设计
    • 消息持久化
    • 消息属性格式
    • 核心方法
    • 消息序列化
    • 消息文件回收
  • 统一硬盘存储管理
  • 内存存储管理
    • 线程安全
    • 数据结构实现

数据库设计

数据库主要存储交换机、队列、绑定

SQLite

此处考虑的是更轻量的数据库SQLite, 因为⼀个完整的 SQLite 数据库,只有⼀个单独的不到1M的可执⾏⽂件,在Java中使用SQLite,不需要额外安装,只需要引入依赖即可,同时采用 mybatis 来管理数据库,完成我们数据存储方面的需求
SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径

配置数据库

  1. 直接在pom.xml⽂件中引⼊
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.42.0.0</version>
</dependency>
  1. 然后在 application.yml 配置⽂件中
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC

实现 数据库

1.此处我们根据之前的需求分析,对 application.yml 添加如下配置:

mybatis:
mapper-locations: classpath:mapper/**Mapper.xml

2.创建一个对应的 interface 实现包括但不限于建表的方法来操作数据库,同时注入Spring 容器中
在这里插入图片描述3.创建 mapper⽬录和⽂件 MetaMapper.xml 并在 MetaMapper.xml 中利用 MyBits 实现 后续会用到的数据库 CRUD 功能
在这里插入图片描述

关于哈希表等复杂类的存储

  • 说明:转成 json 格式的字符串来表示,在数据库中直接利用 varchar 类型即可
  • 转换思想:
  • 比如 MyBatis 往数据库中写数据, 就会调用对象的 getter 方法,拿到属性的值,再往数据库中写.如果这个过程中,让 getArguments 得到的结果是 String 类型的,此时,就可以直接把这个数据写到数据库了
  • 比如 MyBatis 从数据库读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中,如果这个过程中,让 setArguments,参数是一个 String,并且在setArquments 内部针对字符串解析,解析成一个 Map 对象
  • 具体实现
public String getArguments(){
    // Map 类型转换为 String(json)
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        return objectMapper.writeValueAsString(arguments);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }

    return "{}";
}

public void setArguments(String arguments){
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {
        });
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
}

启动数据库

  • 在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
  1. 如果数据库存在,表也都有了,不做任何操作
  2. 如果数据库不存在,则创建库,创建表,构造默认数据
  • 依据下列框图构造⼀个类 DataBaseManager 来管理数据库
    在这里插入图片描述
    我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文
    在这里插入图片描述
    在这里插入图片描述

后续,我们就可以通过
MqApplication.context = SpringApplication.run(MqApplication.class);
来直接直接拿到 Spring 对象

文件设计

文件这一块主要存储的是消息

消息持久化

消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
在这里插入图片描述

消息属性格式

使用两个文件:

  • queue_data.txt 保存消息的内容
  • queue.stat.txt 保存消息的统计内容(总消息 \t 有效消息
    在这里插入图片描述

核心方法

  • 垃圾回收
  • 统计文件读写
  • 创建消息目录和文件
  • 删除消息目录和文件
  • 消息序列化
  • 把消息写入文件中
  • 从文件中删除消息(逻辑删除)
  • 从硬盘中恢复数据到内存

消息序列化

我们知道在存储时,我们需要保存到文件,而文件只能存储字符串/二进制数据,无法直接存储消息对象,同时通过socket套接字在网络中传输时,也需要转为二进制,因此消息的序列化与反序列化尤为重要

tip:此处不使⽤ json 进⾏序列化,由于 Message,⾥⾯存储是⼆进制数据。⽽jason序列化得到的结果是⽂本数据,JSON格式中有很多特殊符号,:"{}这些符号会影响 json
格式的解析如果存文本,你的键值对中不会包含上述特殊符号,如果存二进制,那就不好说.万一某个二进制的字节正好就和 上述特殊符号 的ascii样了,此时就可能会引起 json 解析的格式错误~~

实现如下:

// 把一个对象序列化成一个字节数组
    public static byte[] toBytes(Object object) throws IOException {
        // 这个流对象相当于一个变长的字节数组
        // 可以把 object 序列化的数据逐渐写入该流对象,再转为 byte[]
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                // 把该对象序列化, 写入objectOutputStream中,因为其关联byteArrayOutputStream
                // 所以相当于写入了 byteArrayOutputStream 中
                objectOutputStream.writeObject(object);
            }
            return byteArrayOutputStream.toByteArray();
        }
    }

    // 把一个字节数组,反序列化成一个对象
    public static Object fromByte(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
            try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
                object = objectInputStream.readObject();
            }
        }
        return object;
    }

消息文件回收

由于当前会不停的往消息⽂件中写⼊消息,并且删除消息只是逻辑删除,这就可能导致消息⽂件越来越⼤,并且包含⼤量⽆⽤的消息。我们需要实现垃圾文件的回收

  • 此处使⽤的是复制算法。如下:
    在这里插入图片描述
  • 此处就要⽤到我们每个队列⽬录中,所对应的另⼀个⽂件 queue_stat.txt了,使⽤这个⽂件来保存消息的统计信息
  • 只存⼀⾏数据,⽤ \t 分割, 左边是 queue_data.txt 中消息的总数⽬,右边是 queue_data.txt中有
    效的消息数⽬。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
    所以此处我们就约定,当消息总数超过2000条,并且有效消息数⽬低于总消息数的50%,就处理⼀次垃圾回收GC
    具体实现代码:
// 检查是否需要进行GC
    public boolean checkGC(String queueName){
        // 判断
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){
            return true;
        }
        return false;
    }

    // GC操作使用复制算法,会创建一个新的文件出来,这里约定新文件的位置
    public String getQueueDataNewPath(String queueName){
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }

    // 垃圾回收机制
    public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
        synchronized (queue){
            // 统计花费的时间
            long gcBeg = System.currentTimeMillis();

            // 1.创建新文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()){
                throw new MqException("[MessageFileManager] gc 时发现该队列queue_data_new 已经存在");
            }
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok){
                throw new MqException("[MessageFileManager] 创建文件失败 queueName=" + queueDataNewFile.getAbsolutePath());
            }
            // 2.读取有效消息
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
            // 3.将有效消息写入文件
            try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    for(Message message : messages){
                        byte[] buffer = BinaryTool.toBytes(message);
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4.删除旧的文件
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok){
                throw new MqException("[MessageFileManager] 删除旧的文件内容失败 queueDataOldFile="
                + queueDataOldFile.getAbsolutePath());
            }

            // 5.重命名
            ok =  queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok){
                throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }

            //6.更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(),stat);
            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName()
                        + "time=" + (gcEnd - gcBeg) + "ms");
        }
    }

统一硬盘存储管理

上述我们存储在硬盘中的数据,分为了两个,⼀个是存放数据库中,⼀个是存放在⽂件中。
我们需要统⼀封装⼀个类对上⾯硬盘数据进⾏管理

package com.example.demo.mqsever.datacenter;

import com.example.demo.common.MqException;
import com.example.demo.mqsever.core.Binding;
import com.example.demo.mqsever.core.Exchange;
import com.example.demo.mqsever.core.MSGQueue;
import com.example.demo.mqsever.core.Message;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

// 整合 数据库:交换机、绑定、队列 + 数据文件:消息
public class DiskDataCenter {
    // 管理数据库的实例
    private DataBaseManager  dataBaseManager = new DataBaseManager();
    // 管理数据文件中的实例
    private MessageFileManager messageFileManager = new MessageFileManager();

    public void init(){
        dataBaseManager.init();
        messageFileManager.init();
    }

    // 交换机操作
    public void insertExchange(Exchange exchange){
        dataBaseManager.insertExchange(exchange);
    }

    public void deleteExchange(String exchangeName){
        dataBaseManager.deleteExchange(exchangeName);
    }

    public List<Exchange> selectAllExchanges(){
        return dataBaseManager.selectAllExchanges();
    }

    // 队列操作
    public void insertQueue(MSGQueue queue) throws IOException {
        dataBaseManager.insertQueue(queue);
        // 创建目录的同时,也要创建文件和目录
        messageFileManager.createQueueFiles(queue.getName());
    }

    public void deleteQueue(String queueName) throws IOException {
        dataBaseManager.deleteQueue(queueName);
        // 删除目录的同时,也要删除文件和目录
        messageFileManager.destroyQueueFiles(queueName);
    }

    public List<MSGQueue> selectAllQueues(){
        return dataBaseManager.selectAllQueue();
    }

    // 绑定操作
    public void insertBinding(Binding binding){
        dataBaseManager.insertBinding(binding);
    }

    public void deleteBinding(Binding binding){
        dataBaseManager.deleteBinding(binding);
    }

    public List<Binding> selectAllBindings(){
        return dataBaseManager.selectAllBindings();
    }

    // 消息操作
    public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
        messageFileManager.sendMessage(queue,message);
    }

    public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {
        messageFileManager.deleteMessage(queue,message);
        // 判断是否需要进行 GC 操作
        if (messageFileManager.checkGC(queue.getName())){
            messageFileManager.gc(queue);
        }
    }

    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        return messageFileManager.loadAllMessageFromQueue(queueName);
    }

}

内存存储管理

借助内存中的⼀些列数据结构 ,保存 交换机、队列、绑定、消息,⼴泛使⽤了 哈希表、链表、嵌套的数据结构等使
⽤内存管理上述的数据,对于MQ来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)

线程安全

此处为了保证线程安全,统一使用 线程安全的 ConcurrentHashMap.同时再编写相关代码的时候,要考虑:要不要加锁?锁加到哪⾥?

数据结构实现

    // key 是 exchangeName, value 是 Exchange 对象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是 MSGQueue 对象
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
    // 第一个 key 是 exchangeName, 第二个 key 是 queueName
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    // key 是 messageId, value 是 Message 对象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是一个 Message 的链表
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    // 第一个 key 是 queueName, 第二个 key 是 messageId
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/455882.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…

基于springboot实现数据资产管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现数据资产管理系统演示 摘要 固定资产管理系统主要是完成对系统用户管理、资产信息管理、资产变更管理、资产用途管理、资产类别管理和资产增减管理。因为利用本系统管理员可以直接录入信息&#xff0c;修改信息&#xff0c;删除信息&#xff0c;并且若在录入…

使用docker-compose部署MySQL三主六从半同步集群(MMM架构)

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容部署MySQL三主六从半同步集群一主二从同步集群规划需要安装docker和docker-compose命令形式安装安装docker安装docker-compose 宝塔面板形式安装 部署node1节点的master1docker-compose.yaml文件my.cnf文件授权启动 部署no…

ACM中Java输入输出

ACM中Java输入输出 最初写算法时&#xff0c;是用Scanner的。因为当时接触的测试数据基本都是以算法的复杂度为主&#xff0c;但是后面遇到大量的输入数据时。发现Scanner远远不能满足条件。下面列出几种常用的输入输出方式。(输出统一用printwriter&#xff0c;系统的system.o…

海豚调度系列之:认识海豚调度

海豚调度系列之&#xff1a;认识海豚调度 一、海豚调度二、特性三、建议配置四、名次解释 一、海豚调度 Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景&#xff0c;提供了一个可视化操作任务、工作流和全生命周期数据处理过…

Prompt Engineering(提示工程)

Prompt 工程简介 在近年来&#xff0c;大模型&#xff08;Large Model&#xff09;如GPT、BERT等在自然语言处理领域取得了巨大的成功。这些模型通过海量数据的训练&#xff0c;具备了强大的语言理解和生成能力。然而&#xff0c;要想充分发挥这些大模型的潜力&#xff0c;仅仅…

2024/3/14打卡k倍区间(8届蓝桥杯)——前缀和+优化***

题目 给定一个长度为 N 的数列&#xff0c;A1,A2,…AN&#xff0c;如果其中一段连续的子序列 Ai,Ai1,…Aj 之和是 K 的倍数&#xff0c;我们就称这个区间 [i,j] 是 K 倍区间。 你能求出数列中总共有多少个 K 倍区间吗&#xff1f; 输入格式 第一行包含两个整数 N 和 K。 以下 N…

O2OA(翱途)开发平台系统安全-用户登录IP限制

O2OA(翱途)开发平台[下称O2OA开发平台或者O2OA]支持对指定的用户设置可以连接的客户端计算机的IP地址&#xff0c;以避免用户在不安全的环境下访问系统。本篇主要介绍如何开启O2OA用户登录IP限制。 一、先决条件&#xff1a; 1、O2Server服务器正常运行&#xff0c;系统安装部…

0基础学习VR全景平台篇第145篇:图层控件功能

大家好&#xff0c;欢迎观看蛙色VR官方——后台使用系列课程&#xff01;这期&#xff0c;我们将为大家介绍如何使用图层控件功能。 一.如何使用图层控件功能&#xff1f; 进入作品编辑页面&#xff0c;点击左边的控件后就可以在右边进行相应设置。 二.图层控件有哪些功能&am…

跨境电商SaaS独立站的真面目...(网站建站)

跨境电商独立站自外贸交易开始&#xff0c;就一直存在&#xff0c;接触过电商的朋友应该都听过&#xff0c;但大部分人仅仅只是停留在听过的阶段&#xff0c;并没有真正的去了解它&#xff1b;独立站&#xff0c;顾名思义就是一个独立的网站&#xff0c;不依附任何平台&#xf…

基于Java+SpringBoot+vue+element实现物流管理系统

基于JavaSpringBootvueelement实现物流管理系统 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 ** 作者主页 央顺技术团队** 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式 文章目录 基于JavaSpr…

OSCP靶场--Exfiltrated

OSCP靶场–Exfiltrated 考点(1.cms 站点地图插入php反弹shell 2. CVE-2021-4034提权 3.root定时任务提权[CVE-2021-22204]) 1.nmap扫描 ┌──(root㉿kali)-[~/Desktop] └─# nmap -sV -sC -p- 192.168.155.163 --min-rate 2500 Starting Nmap 7.92 ( https://nmap.org ) a…

Prometheus 监控告警配置

文章目录 一、告警通知1.邮件通知2.钉钉通知2.1.获取钉钉机器人webhook2.2.prometheus-webhook-dingtalk2.3.配置信息2.4.自定义模板 3.自定义 二、告警规则1.Prometheus2.Linux3.Docker4.Nginx5.Redis6.PostgreSQL7.MySQL8.RabbitMQ9.JVM10.Elasticsearch 开源中间件 # Prome…

元宇宙崛起:区块链与金融科技共绘数字新世界

文章目录 一、引言二、元宇宙与区块链的深度融合三、区块链在元宇宙金融中的应用四、金融科技在元宇宙中的创新应用五、面临的挑战与机遇《区块链与金融科技》亮点内容简介获取方式 一、引言 随着科技的飞速发展&#xff0c;元宇宙概念逐渐走进人们的视野&#xff0c;成为数字…

linux环境下安装运行环境JDK、Docker、Maven、MySQL、RabbitMQ、Redis、nacos、Elasticsearch

安装JDK 1、提前下载好jdk 官网&#xff1a;点击下载 2、将下载的文件放到自己喜欢的目录下 然后使用下面命令进行解压 tar -zxvf jdk-8u161-linux-x64.tar.gz3、配置环境变量 使用命令 vim /etc/profile在文件的最后插入 export JAVA_HOME/source/java/jdk1.8.0_161 #…

EditText不显示系统键盘,可用来显示自定义的键盘

系统键盘 包含普通键盘和现在很多ROM定制的密码安全键盘 调用已下方法即可解决: https://developer.android.google.cn/reference/android/widget/TextView#setShowSoftInputOnFocus(boolean) 但是,此方法是API 21Android 5.0加入的, 所以为了兼容低版本, 建议使用已下方法: p…

【C++ 学习】内存管理

1. new / delete 和 malloc / free 的区别? malloc / free 和 new / delete 的共同点&#xff1a;都是从堆上申请空间&#xff0c;并且需要用户手动释放。不同的地方是&#xff1a; malloc 和 free 是函数&#xff0c;new 和 delete 是操作符&#xff1b; malloc 申请的空间不…

0103n阶行列式-行列式-线性代数

文章目录 一 n阶行列式二 三阶行列式三 特殊行列式结语 一 n阶行列式 ∣ a 11 a 12 ⋯ a 1 n a 21 a 22 ⋯ a 2 n ⋯ ⋯ ⋯ ⋯ a n 1 a n 2 ⋯ a n n ∣ \begin{vmatrix}a_{11}&a_{12}&\cdots&a_{1n}\\a_{21}&a_{22}&\cdots&a_{2n}\\\cdots&\cdots…

如何开发一款高质量的短剧系统,短剧系统的框架设计与实现

项目背景 短剧系统正处于当下的火爆之中。随着社交媒体和短视频平台的兴起&#xff0c;人们对于快节奏、轻松有趣的短剧内容的需求也越来越大。短剧系统不仅可以提供快速、精彩的娱乐体验&#xff0c;还可以在碎片化时间里为用户带来欢乐。 适用人群非常广泛&#xff0c;尤其…

【论文速读】| MOCK:上下文依赖引导的内核模糊测试

本次分享论文为&#xff1a;MOCK: Optimizing Kernel Fuzzing Mutation with Context-aware Dependency 基本信息 原文作者&#xff1a;Jiacheng Xu&#xff0c;Xuhong Zhang&#xff0c;Shouling Ji&#xff0c;Yuan Tian&#xff0c;Binbin Zhao&#xff0c; Qinying Wang&a…