文章目录
- 数据库设计
- SQLite
- 配置数据库
- 实现 数据库
- 关于哈希表等复杂类的存储
- 启动数据库
- 文件设计
- 消息持久化
- 消息属性格式
- 核心方法
- 消息序列化
- 消息文件回收
- 统一硬盘存储管理
- 内存存储管理
- 线程安全
- 数据结构实现
数据库设计
数据库主要存储交换机、队列、绑定
SQLite
此处考虑的是更轻量的数据库SQLite, 因为⼀个完整的 SQLite 数据库,只有⼀个单独的不到1M的可执⾏⽂件,在Java中使用SQLite,不需要额外安装,只需要引入依赖即可,同时采用 mybatis 来管理数据库,完成我们数据存储方面的需求
SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径
配置数据库
- 直接在pom.xml⽂件中引⼊
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.42.0.0</version>
</dependency>
- 然后在 application.yml 配置⽂件中
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
实现 数据库
1.此处我们根据之前的需求分析,对 application.yml 添加如下配置:
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
2.创建一个对应的 interface 实现包括但不限于建表的方法来操作数据库,同时注入Spring 容器中
3.创建 mapper⽬录和⽂件 MetaMapper.xml 并在 MetaMapper.xml 中利用 MyBits 实现 后续会用到的数据库 CRUD 功能
关于哈希表等复杂类的存储
- 说明:转成 json 格式的字符串来表示,在数据库中直接利用 varchar 类型即可
- 转换思想:
- 比如 MyBatis 往数据库中写数据, 就会调用对象的 getter 方法,拿到属性的值,再往数据库中写.如果这个过程中,让 getArguments 得到的结果是 String 类型的,此时,就可以直接把这个数据写到数据库了
- 比如 MyBatis 从数据库读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中,如果这个过程中,让 setArguments,参数是一个 String,并且在setArquments 内部针对字符串解析,解析成一个 Map 对象
- 具体实现
public String getArguments(){
// Map 类型转换为 String(json)
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "{}";
}
public void setArguments(String arguments){
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
启动数据库
- 在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:
- 如果数据库存在,表也都有了,不做任何操作
- 如果数据库不存在,则创建库,创建表,构造默认数据
- 依据下列框图构造⼀个类 DataBaseManager 来管理数据库
我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文
后续,我们就可以通过
MqApplication.context = SpringApplication.run(MqApplication.class);
来直接直接拿到 Spring 对象
文件设计
文件这一块主要存储的是消息
消息持久化
消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
在 data 中创建⼀些⼦⽬录,每个队列对应⼀个⼦⽬录,⼦⽬录名就是队列名
消息属性格式
使用两个文件:
- queue_data.txt 保存消息的内容
- queue.stat.txt 保存消息的统计内容(总消息 \t 有效消息
核心方法
- 垃圾回收
- 统计文件读写
- 创建消息目录和文件
- 删除消息目录和文件
- 消息序列化
- 把消息写入文件中
- 从文件中删除消息(逻辑删除)
- 从硬盘中恢复数据到内存
消息序列化
我们知道在存储时,我们需要保存到文件,而文件只能存储字符串/二进制数据,无法直接存储消息对象,同时通过socket套接字在网络中传输时,也需要转为二进制,因此消息的序列化与反序列化尤为重要
tip:此处不使⽤ json 进⾏序列化,由于 Message,⾥⾯存储是⼆进制数据。⽽jason序列化得到的结果是⽂本数据,JSON格式中有很多特殊符号,:"{}这些符号会影响 json
格式的解析如果存文本,你的键值对中不会包含上述特殊符号,如果存二进制,那就不好说.万一某个二进制的字节正好就和 上述特殊符号 的ascii样了,此时就可能会引起 json 解析的格式错误~~
实现如下:
// 把一个对象序列化成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组
// 可以把 object 序列化的数据逐渐写入该流对象,再转为 byte[]
try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
// 把该对象序列化, 写入objectOutputStream中,因为其关联byteArrayOutputStream
// 所以相当于写入了 byteArrayOutputStream 中
objectOutputStream.writeObject(object);
}
return byteArrayOutputStream.toByteArray();
}
}
// 把一个字节数组,反序列化成一个对象
public static Object fromByte(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
object = objectInputStream.readObject();
}
}
return object;
}
消息文件回收
由于当前会不停的往消息⽂件中写⼊消息,并且删除消息只是逻辑删除,这就可能导致消息⽂件越来越⼤,并且包含⼤量⽆⽤的消息。我们需要实现垃圾文件的回收
- 此处使⽤的是复制算法。如下:
- 此处就要⽤到我们每个队列⽬录中,所对应的另⼀个⽂件 queue_stat.txt了,使⽤这个⽂件来保存消息的统计信息
- 只存⼀⾏数据,⽤ \t 分割, 左边是 queue_data.txt 中消息的总数⽬,右边是 queue_data.txt中有
效的消息数⽬。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
所以此处我们就约定,当消息总数超过2000条,并且有效消息数⽬低于总消息数的50%,就处理⼀次垃圾回收GC
具体实现代码:
// 检查是否需要进行GC
public boolean checkGC(String queueName){
// 判断
Stat stat = readStat(queueName);
if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){
return true;
}
return false;
}
// GC操作使用复制算法,会创建一个新的文件出来,这里约定新文件的位置
public String getQueueDataNewPath(String queueName){
return getQueueDir(queueName) + "/queue_data_new.txt";
}
// 垃圾回收机制
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
synchronized (queue){
// 统计花费的时间
long gcBeg = System.currentTimeMillis();
// 1.创建新文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if (queueDataNewFile.exists()){
throw new MqException("[MessageFileManager] gc 时发现该队列queue_data_new 已经存在");
}
boolean ok = queueDataNewFile.createNewFile();
if (!ok){
throw new MqException("[MessageFileManager] 创建文件失败 queueName=" + queueDataNewFile.getAbsolutePath());
}
// 2.读取有效消息
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 3.将有效消息写入文件
try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
for(Message message : messages){
byte[] buffer = BinaryTool.toBytes(message);
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 4.删除旧的文件
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();
if (!ok){
throw new MqException("[MessageFileManager] 删除旧的文件内容失败 queueDataOldFile="
+ queueDataOldFile.getAbsolutePath());
}
// 5.重命名
ok = queueDataNewFile.renameTo(queueDataOldFile);
if (!ok){
throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
//6.更新统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(),stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName()
+ "time=" + (gcEnd - gcBeg) + "ms");
}
}
统一硬盘存储管理
上述我们存储在硬盘中的数据,分为了两个,⼀个是存放数据库中,⼀个是存放在⽂件中。
我们需要统⼀封装⼀个类对上⾯硬盘数据进⾏管理
package com.example.demo.mqsever.datacenter;
import com.example.demo.common.MqException;
import com.example.demo.mqsever.core.Binding;
import com.example.demo.mqsever.core.Exchange;
import com.example.demo.mqsever.core.MSGQueue;
import com.example.demo.mqsever.core.Message;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
// 整合 数据库:交换机、绑定、队列 + 数据文件:消息
public class DiskDataCenter {
// 管理数据库的实例
private DataBaseManager dataBaseManager = new DataBaseManager();
// 管理数据文件中的实例
private MessageFileManager messageFileManager = new MessageFileManager();
public void init(){
dataBaseManager.init();
messageFileManager.init();
}
// 交换机操作
public void insertExchange(Exchange exchange){
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName){
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges(){
return dataBaseManager.selectAllExchanges();
}
// 队列操作
public void insertQueue(MSGQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
// 创建目录的同时,也要创建文件和目录
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
// 删除目录的同时,也要删除文件和目录
messageFileManager.destroyQueueFiles(queueName);
}
public List<MSGQueue> selectAllQueues(){
return dataBaseManager.selectAllQueue();
}
// 绑定操作
public void insertBinding(Binding binding){
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding){
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings(){
return dataBaseManager.selectAllBindings();
}
// 消息操作
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue,message);
}
public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue,message);
// 判断是否需要进行 GC 操作
if (messageFileManager.checkGC(queue.getName())){
messageFileManager.gc(queue);
}
}
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
内存存储管理
借助内存中的⼀些列数据结构 ,保存 交换机、队列、绑定、消息,⼴泛使⽤了 哈希表、链表、嵌套的数据结构等使
⽤内存管理上述的数据,对于MQ来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)
线程安全
此处为了保证线程安全,统一使用 线程安全的 ConcurrentHashMap.同时再编写相关代码的时候,要考虑:要不要加锁?锁加到哪⾥?
数据结构实现
// key 是 exchangeName, value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是 MSGQueue 对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// 第一个 key 是 exchangeName, 第二个 key 是 queueName
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
// key 是 messageId, value 是 Message 对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是一个 Message 的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 第一个 key 是 queueName, 第二个 key 是 messageId
private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();