七. 消息存储设计
上一篇博客已经将消息统计文件的读写代码实现了,下一步我们将实现创建队列文件和目录。
实现创建队列文件和目录
初始化 0\t0 这样的初始值.
//创建队列对应的文件和目录:
public void createQueueFile(String queueName) throws IOException, MqException {
//先创建对应的目录:
File file = new File(getQueueDir(queueName));
if(!file.exists()){
boolean ok = file.mkdirs();
if(!ok) throw new IOException("创建队列目录失败。baseDir:"+file.getAbsolutePath());
}else{
throw new MqException("[createQueueFile] 队列对应的目录已经被创建过了,创建失败");
}
//下面开始创建 数据文件:
File dataFile = new File(getQueueDataDir(queueName));
if(!dataFile.exists()){
boolean ok = dataFile.createNewFile();
if(!ok) throw new IOException("创建数据文件失败。queuedataDir:"+dataFile.getAbsolutePath());
}else{
throw new MqException("[createQueueFile] 队列对应的数据文件已经被创建过了,创建失败");
}
//创建 统计文件:
File statFile = new File(getQueueStatDir(queueName));
if(!statFile.exists()){
boolean ok = statFile.createNewFile();
if(!ok) throw new IOException("创建统计文件失败。queuestatDir:"+statFile.getAbsolutePath());
}else{
throw new MqException("[createQueueFile] 队列对应的统计文件已经被创建过了,创建失败");
}
//给消息统计文件设定初始值 0\t0, (消息数量:0,有效消息数量:0)
// 目的:不用在今后使用的时候对空文件做一些特殊的判定
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
//再写入:
writeStat(queueName,stat);
}
实现删除文件或目录
注意:File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉,如果还存在多余文件,就会删除失败。
//删除队列的文件或目录:
//队列也是可以删除的,当队列删除后,对应的消息文件啥的,也要随之删除。
public void deleteQueueFile(String queueName) throws IOException{
//先删除 数据文件:
File queueDataFile = new File(getQueueDataDir(queueName));
boolean ok1 = queueDataFile.delete();
//再删除 统计文件:
File queueStatFile = new File(getQueueStatDir(queueName));
boolean ok2 = queueStatFile.delete();
//再删除目录;
File file = new File(getQueueDir(queueName));
boolean ok3 = file.delete();
if(!(ok1 && ok2 && ok3)){
//任意一个删除失败,就失败,抛出异常:
throw new IOException("删除队列文件或目录失败");
}
}
检查队列⽂件是否存在
判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.
//检查队列的 文件或目录 是否存在: 目的:判断是否队列之前被 别人用过。
//用处1:如果后续有生产者给 broker server 生产消息了,这个消息可能需要记录到文件上,此时需要判断文件是否存在(持久化的应用)。
public boolean checkFilesExists(String queueName){
//如果队列的 数据文件和统计文件都存在,才存在:
File queueDataFiles = new File(getQueueDataDir(queueName));
if(!queueDataFiles.exists()) return false;
File queueStatFiles = new File(getQueueStatDir(queueName));
if(!queueStatFiles.exists()) return false;
//都存在,则返回true;
return true;
}
实现消息对象序列化/反序列化
先创建工具类BinaryTool用与序列化/反序列化。
- 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这两个流对象是纯内存的, 不需要进⾏ close).
- 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的readObject / writeObject 即可完成对应操作.
/**
* 这个类用来序列化 与反序列化
* 此处我们采用的是java标注库里的 ObjectOutputStream 和ObjectInputStream 两个流对象,但是序列化的对象必须要实现Serializable接口、
*
* 由于将序列化,反序列化当做一个工具,很多数据都可能用到,所以我们将它的方法搞成静态的
*
*/
public class BinaryTool {
//序列化:
public static byte[] toBytes(Object object) throws IOException {
//由于在try里面写流对象能自动关闭省去我们不少事,所以,直接写在try()里
//这里 使用ByteArrayOutputStream是因为 未知的byte数组的长度,这个类能自动记录。
try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
//将byteArrayOutputStream 传入ObjectOutputStream 就相当于将他们相互关联了,当objectOutputStream调用writeObject方法
//时会将这个对象写入关联的byteArrayOutputStream里,然后直接调用byteArrayOutputStream里的方法,将序列化的数据转换成直接数组就行了
//其实这个 ObjectOutputStream 不仅可以关联数组,还可以是文件,网络。关联了文件就将对象序列化到文件里,关联了网络,就是网络数据的传输socket。
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
objectOutputStream.writeObject(object);
}
//这个操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]
return byteArrayOutputStream.toByteArray();
}
}
//反序列化:
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
return objectInputStream.readObject();
}
}
}
}
实现写入消息文件
- 考虑线程安全, 按照队列维度进⾏加锁.
- 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
- 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息,这也是message里的偏移量初始化的时候,就是发送消息的时候。offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息大小的空间.
- 写完消息, 要同时更新统计信息.
//该方法将传来的一个新的消息放到对应的文件当中:新增消息
public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {
//先检查文件是否存在:如果不存在怎抛出异常,这个异常可以自定义。
if(!checkFilesExists(queue.getName())){
throw new MqException("[MessageFileManager对应的文件不存在]!queueName"+queue.getName());
}
//先进行序列化:
byte[] binaryMessage = BinaryTool.toBytes(message);
//为了解决线程安全问题,我们引入锁,如果此时的锁对象 是同一个队列,那就阻塞等待。
synchronized (queue){
//先将数据文件new出来,看看此时文件里已经写入的数据长度,方便我们后续计算offsetbegin和offsetend
File file = new File(getQueueDataDir(queue.getName()));
//在写入消息的时候才对message里的offsetbegin和offsetend 进行赋值:
message.setOffsetBeg(file.length()+4);
message.setOffsetEnd(file.length()+4+binaryMessage.length);
//由于我们的写入是追加写入,所以不要忘记 true
try (OutputStream outputStream = new FileOutputStream(file,true)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
//先写入4个字节的消息长度:
dataOutputStream.writeInt(binaryMessage.length);
//再写入offsetbegin 和 offsetend
dataOutputStream.write(binaryMessage);
}
}
//此时已经将消息数据文件写完了,不要忘记消息统计文件:
Stat stat = readStat(queue.getName());
stat.validCount++;
stat.totalCount++;
writeStat(queue.getName(),stat);
}
}
创建异常类MqException
作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
public class MqException extends Exception{
public MqException(String reason){
super(reason);
}
}
实现删除消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.
- 使⽤ RandomAccessFile 来随机访问到⽂件的内容.(随机访问其实没什么玄乎的,就像数组一样,能通过下标快速访问某个元素,这就是随机访问的原理。内存是支持随机访问的)。
- 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
- 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定文件指针的位置. ⽂件指针会随着上述的读操作产⽣改变,所以要重新seek,将光标移动到开始。
- 最后, 要记得更新统计⽂件, 把合法消息 - 1.
//删除消息:主要的操作歩奏:
// 1,将消息读出来
//2,将消息里的isVail 改成0x0
//3,将消息放回文件中
public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {
//由于删除消息的时候也可能收到线程安全问题,所以我们要加锁:
synchronized (queue){
//先将消息读出来:
//由于我们正常使用的FileInputStream,只能从头开始读。而此时的场景更倾向于 随机读取,所以我们使用到了RandomAccessFile进行随机读取
//注意这个RandomAccessFile类的第二个参数:rw可读可写。
try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){
//先准备一个byte数组用来放 读出来的二进制数据:
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
//先将光标刷新到 offsetbegin
randomAccessFile.seek(message.getOffsetBeg());
//将message二进制数据读出来
randomAccessFile.read(bufferSrc);
//转换成message对象:
Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);
//将message里的isVail 改成0x0
message1.setIsVail((byte) 0x0);
//将新的message1 转成二进制:
byte[] bufferDest = BinaryTool.toBytes(message1);
//由于上一次读文件光标已经发生了变化,所以此时还要调整光标到offsetbegin
randomAccessFile.seek(message.getOffsetBeg());
//将数据写入文件:
randomAccessFile.write(bufferDest);
//此时已经将数据文件里的vail改成无效,那我们需不需要将这个内存中的 message对象里的vail也改成无效呢?
//可以是可以,但是没有必要:想象一下我们将一个文件标记成无效的场景是不是我们此时要删掉这个文件的时候,
//此时我们都要删掉这个文件了,当然要连同文件里的数据和内存中的数据都删了呀,文件里的数据可能需要一些歩奏,
//但是在内存中删一个对象实在太容易了,今后会有内存中的删除消息操作。这就相当于让一个将死之人多活几秒,但他终究逃不过死亡
//这就是message里的vail 其实不需要改动的原因。
}
//不要忘了统计文件也要更新, 由于我们此时已经将数据文件里的一个消息改成无效的,所以此时统计文件里的有效消息就要--了
Stat stat = readStat(queue.getName());
if(stat.validCount >0){
stat.validCount --;
}
//再将更新后的统计信息 写入文件
writeStat(queue.getName(),stat);
}
}
实现消息加载
这个功能在服务器重启, 和垃圾回收的时候都很关键
- 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消息内容.
- 读取完毕之后, 转换成 Message 对象.
- 同时计算出该对象的 offsetBeg 和 offsetEnd.
- 最终把结果整理成链表, 返回出去.
- 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定值. 因此需要注意上述循环的结束条件.
//从消息数据文件当中读出所有消息:
//由于是服务器刚启动的时候才会调用这个方法,此时的队列还不能处理各种请求,所以不需要考虑线程安全问题。
public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {
//先new出来一个linkedList来放所有消息:使用链表是因为要进行头删和尾删等操作:
LinkedList<Message> messages = new LinkedList<>();
//创建流对象:
try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){
//与上面的DataOutputStream对应,此时用的是DataInputStream
try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
//由于要读的消息可能不止一条,所以用一个while循环:
//但是如果我们直接这样写会一直重复的读一个消息,而DataInputStream不能控制光标的移动,所以要定义一个量来
//记录我们读到哪里了,另外,这个量也为后续message对象的offsetbegin和offsetend的初始化提供便利
long currentOffset = 0;
//写完大概得逻辑以后不知道不会不会有疑问,这个while的条件可是true啊,这不死循环了嘛,
//其实这也是无奈之举,主要原因是dataInputStream.readInt()读到文件的末尾并不会返回-1,EOF啥的,而是
//直接抛出 EOFException异常,直接结束循环,因此我们只用在外层catch住这个异常就行了,这是一个很特别的预料之内的循环结束方式
while(true){
//先读4个字节,求出数据的长度:
int messageLen = dataInputStream.readInt();
//创建一个刚好能装messageLen长度的字节数组:
byte[] messageBinary = new byte[messageLen];
//读出消息数据:并且用变量接收,判断读出的数据是否和预期的数据长度一致,若不一致,说明格式不正确,错乱了则抛出异常
int realMessageLen = dataInputStream.read(messageBinary);
if(realMessageLen != messageLen){
throw new MqException("[MessageFileManager] 文件格式错误!!!queueName:"+queueName);
}
//将数组反序列化成message对象
Message message = (Message) BinaryTool.fromBytes(messageBinary);
//如果读到的消息是无效的,则跳过这个无效消息,更新currentoffset:
if(message.getIsVail() == 0x0){
currentOffset+=(4+messageLen);
continue;
}
//再将message里的offsetbegin和offsetend给初始化:
message.setOffsetBeg(currentOffset+4);
message.setOffsetEnd(currentOffset+4+messageLen);
//正常读完后,别忘了,将currentoffset更新
currentOffset+=(4+messageLen);
//再将消息加入到链表当中
messages.add(message);
}
}catch (EOFException e){
System.out.println("[MessageFileManager] 恢复Message 数据完成!!!");
}
}
return messages;
}
实现垃圾回收(GC)
- 上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越多. 因此需要定期的进⾏批量清除.
- 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC。GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {
//根据以前的写代码经验,次GC过程可能有线程安全问题,所以我们直接加锁:
//其实这也是为什么形参传入的是一个队列,而不是队列的名字的其中一个原因。
synchronized (queue){
//由于GC的执行时间可能很慢,我们手动的将时间计算出来,如果将来服务器运行半天无响应了,如果是GC的问题
//我们也能知道
long gcBegin = System.currentTimeMillis();
//先创建新的文件:
File newQueueFile = new File(getQueueDataNewPath(queue.getName()));
//如果文件已经存在了,可能上一次gc有残留,这是不正常的,所以抛出异常
if(newQueueFile.exists()){
throw new MqException("[MessageFilemanager] 队列的queue_new_data.txt已经存在!!queueName:"+queue.getName());
}
//如果执行到这,说明文件不存在,则创建新文件:
boolean ok = newQueueFile.createNewFile();
//如果创建文件失败,则抛出异常:
if(!ok){
throw new MqException("[MessageFileManager] 创建文件失败!!newQueueDataFile:"+newQueueFile.getAbsolutePath());
}
//先创建一个链表用来存储从原来的文件中取出来的message对象:此处可以用到之前的方法:
//取出原来文件里的所有有效文件:
LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());
//new出相应的流对象用来写入新文件:
//这里我写错了,将queue.getName()传入了,但是明明是一个不存在的路径,他竟然还能正常写?底层也不抛出异常,
//我真是又惊讶,又惊吓。
//之后我又去查了查资料:原来是FileOutputStream的问题啊,FileOutputStream太nb了,
//如果传入的字符串对应的路径不存在,FileOutputStream会自动给你创建一个文件用于写入,这这这也太贴心了吧,
//不过我还是希望他能直接抛异常,毕竟我找bug也找了这么久了,况且天知道他会把我的数据写到哪里:
//其实也知道:如果是绝对路径,他会自动创建路径下的文件;如果是相对路径,他会在当前工作空间创建一个文件。
//找了一圈以后发下,在我的mq路径下,就存在一个queuetest1的文件,里面正好是之前我写入的数据,呜呜呜,要哭了。
// try(OutputStream outputStream = new FileOutputStream(queue.getName())){
try(OutputStream outputStream = new FileOutputStream(newQueueFile)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
//循环读取messages,将对象重新写入新文件:
for(Message m : messages){
//先将消息序列化:一个字节数组:
byte[] buffer = BinaryTool.toBytes(m);
//将二进制数组写入新的文件:注意遵循之前的约定:
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
//删除旧文件,这里以前传入的旧文件的路径写错了,直接传成了名字,所以写代码一定要细心啊。
// File oldQueueFile = new File(queue.getName());
File oldQueueFile = new File(getQueueDataDir(queue.getName()));
boolean ok2 = oldQueueFile.delete();
System.out.println("[ok2]oldQueueFile 文件删除:"+ok2);
//如果删除失败,可能是没有权限之类的,抛出异常:
if(!ok2){
throw new MqException("MessageFileManager 删除旧文件失败!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());
}
//重命名新文件:
boolean ok3 = newQueueFile.renameTo(oldQueueFile);
//如果重命名失败,抛出异常:
if(!ok3) {
throw new MqException("[MessageFileManager] 新文件重命名失败!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()
+" , newDataQueueFile="+newQueueFile.getAbsolutePath());
}
//不要忘记更新统计文件:
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(),stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] GC执行完毕!!! 执行的时间:"+(gcEnd - gcBegin)+"ms");
}
}
测试MessageFileManager
创建MessageFileManagerTest类用于测试:
测试前的准备:
- 创建两个队列, ⽤来辅助测试.
- 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法(这就是传说中的反射,注意它的参数,用法)。
-
ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
这个反射的参数:
第一个参数:类的实例。
第二个参数:你想调用的方法
后面的参数就是不定参数了(数量不确定),能确定的是:后面的参数的就是你想调用的方法的参数。
@SpringBootTest
public class MessageFileManagerTest {
private MessageFileManager messageFileManager = new MessageFileManager();
private static final String queueName1 = "queuetest1";
private static final String queueName2 = "queuetest2";
@BeforeEach
public void setUp() throws IOException, MqException {
//由于我们要测试的是队列,所以准备工作就是先创建队列文件:
messageFileManager.createQueueFile(queueName1);
messageFileManager.createQueueFile(queueName2);
}
//这个@AfterEach注解我试过了,即使测试方法执行过程中抛出了异常,这个方法还是在每次执行完测试单元以后该执行他还是执行他,
//无关乎异常,真nb
@AfterEach
public void tearDown() throws IOException {
//首尾工作,将刚才创建的队列文件删掉:
messageFileManager.deleteQueueFile(queueName1);
messageFileManager.deleteQueueFile(queueName2);
}
}
测试代码:
@Test
public void testCreateFile(){
//其实就测试创建的队列文件是否存在:
//由于我们在MessageFileManager里的get路径方法是 private修饰的,所以不能直接调用get路径方法,只能手动写上
//检验 队列数据文件是否存在:
File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");
//此处用的方法是isFile 而不是exists,因为要判定这是个文件,并不是只是存在就行,存在了也可能是个目录。
Assertions.assertEquals(true,queueDataFile1.isFile());
//检验 队列统计文件是否存在:
File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");
Assertions.assertEquals(true,queueStatFile1.isFile());
//检验 队列数据文件是否存在:
File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");
Assertions.assertEquals(true,queueDataFile2.isFile());
//检验 队列统计文件是否存在:
File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");
Assertions.assertEquals(true,queueStatFile2.isFile());
System.out.println("[CreateFileText] 测试创建队列文件成功!!!");
}
@Test
public void testReadAndWriteStat(){
//先创建出stat类,由于他是内部类,所以要类名. 调用出来:
MessageFileManager.Stat stat = new MessageFileManager.Stat();
stat.totalCount = 200;
stat.validCount = 100;
//此时写入stat 到统计文件当中:但是如果直接用messageFileManager. 由于writeStat是private修饰,所以肯定调用不出来,
//此时就要用spring带的 反射方法了:
// messageFileManager.
//用反射将 stat写入统计文件:
ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
//用反射将 写入的统计文件读出来:
MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
//判断读出来的stat和我们设定的stat是否一样
Assertions.assertEquals(200,statNew.totalCount);
Assertions.assertEquals(100,statNew.validCount);
System.out.println("[testReadAndWriteStat] 测试成功!!!");
}
//要想测试发送消息,首先要有队列和消息吧,所以,我们先写创建队列和消息的方法:
private MESGQueue createQueue(){
MESGQueue queue = new MESGQueue();
//这里的队列名字不能随便取,因为随便取的队列名字也没有对应的文件啊,要用就要用已经创建了文件的队列名,
//考虑到这个队列要与文件交互,而我们只创建了queuename1和queuename2两个名字对应的文件,所以只能用这两个名字的一个。
queue.setName(queueName1);
queue.setDurable(true);
queue.setExclusive(false);
queue.setAutoDelete(false);
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("aaa", "111");
hashMap.put("bbb", "222");
queue.setArguments(hashMap);
return queue;
}
private Message createMessage(String context){
//此时能用到我们之前在message里写的创建 message的工厂类了:
Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());
return message;
}
@Test
public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
//先创建队列与消息:
MESGQueue queue = createQueue();
Message message = createMessage("abcdefghijklmnopqrstuvwxyz");
//发送消息:
messageFileManager.sendMessage(queue,message);
//验证stat文件:
MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
Assertions.assertEquals(1,stat.totalCount);
Assertions.assertEquals(1,stat.validCount);
//验证data文件:使用loadAllMessagesFromQueueDataFile读取文件内容:
LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
//验证:
Assertions.assertEquals(1,messages.size());
Message message1 = messages.get(0);
//判断这个message1和我们之前的消息message是否一样:
Assertions.assertEquals(message.getMessageId(),message1.getMessageId());
Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());
Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());
Assertions.assertArrayEquals(message.getBody(),message1.getBody());
System.out.println("[testSendMessage] 测试成功!!!");
}
//虽然上一个testSendMessage 已经间接测试过这个方法,但是为了求稳,再测试一遍
@Test
public void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {
//我们需要准备200条数据用来加载:
//先创建一个队列用来存放消息:注意这个方法使用的是queueName1创建的队列
MESGQueue queue = createQueue();
//先创建一个链表用来保存消息,和后面新加载的消息作对比:
LinkedList<Message> expectedMessages = new LinkedList<>();
//使用for循环创建消息:
for(int i =0;i<200;i++){
Message message = createMessage("testMessage"+i);
//将消息写入文件:
messageFileManager.sendMessage(queue,message);
//记录消息:
expectedMessages.add(message);
}
//调用loadAllMessagesFromQueueDataFile取出所有消息:
LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
//先验证队列的数目是否一致:
Assertions.assertEquals(200,realMessages.size());
//验证基本属性:
for(int i= 0;i<realMessages.size();i++){
Message realMessage = realMessages.get(i);
Message expectMessage = expectedMessages.get(i);
System.out.println("["+i+"]"+realMessage.toString());
Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
}
System.out.println("[testLoadAllMessagesFromQueueDataFile] 测试成功!!!");
}
//测试删除消息:
@Test
public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
//先创建一个队列:
MESGQueue queue = createQueue();
//创建一个链表用来保存预期消息:
LinkedList<Message> expectedMessages = new LinkedList<>();
//再将20条消息都写入队列:
for(int i =0;i<20;i++){
Message message = createMessage("testMessage"+i);
//将消息写入队列文件:
messageFileManager.sendMessage(queue,message);
//记录消息:
expectedMessages.add(message);
}
//这里 就以删除前三条消息为例:
messageFileManager.deleteMessage(queue,expectedMessages.get(0));
messageFileManager.deleteMessage(queue,expectedMessages.get(1));
messageFileManager.deleteMessage(queue,expectedMessages.get(2));
//读出消息,对比:
LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
//先判断个数:
Assertions.assertEquals(17,realMessages.size());
for(int i =3;i<20;i++){
Message realMessage = realMessages.get(i-3);
Message expectMessage = expectedMessages.get(i);
System.out.println("["+i+"]"+realMessage.toString());
Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
}
System.out.println("[testDeleteMessage] 测试成功!!!");
}
//测试GC,这里的GC其实只是测试,不用管消息总数是否大于2000或有效消息占比不到50%,因为那是业务上的判定,会有专门的类来进一步封装
//而此处我们只进行测试GC这个方法
//计划将100条消息都存入队列,然后将奇数下标的消息都删除,然后执行GC,验证现在的文件是否比原来的文件小:
@Test
public void testGC() throws IOException, MqException, ClassNotFoundException {
//创建一个队列:
MESGQueue queue = createQueue();
//创建一个链表用来记录消息:
LinkedList<Message> expectedMessages = new LinkedList<>();
//先发送100条消息:
for(int i =0;i<100;i++){
Message message = createMessage("testMessage"+i);
//发送到队列:
messageFileManager.sendMessage(queue,message);
//记录
expectedMessages.add(message);
}
//删除奇数下标的消息:
for(int i =1;i<100;i+=2){
messageFileManager.deleteMessage(queue,expectedMessages.get(i));
}
//先记录执行GG之前的文件大小:
File oldFile = new File("./data/"+queueName1+"/queue_data.txt");
long oldFileLength = oldFile.length();
//执行GC
messageFileManager.gc(queue);
//记录执行完GC之后的文件大小:
File newFile = new File("./data/"+queueName1+"/queue_data.txt");
long newFileLength = newFile.length();
//取出真实的消息:
LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
//先验证消息数量是否对的上:
Assertions.assertEquals(50,realMessages.size());
//挨个验证消息:
for(int i = 0;i<50;i++){
Message realMessage = realMessages.get(i);
Message expectMessage = expectedMessages.get(i*2);
System.out.println("["+i+"]"+realMessage.toString());
Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
}
//验证文件大小:
//这个验证的原理其实是:
//删除一个文件并不是直接删除,而是逻辑删除,通过标记统计文件里的vail来标识的,此时的数据文件即使有很多无效的文件,但是他的大小依旧是total
//而非vail有效文件的大小。但是如果进行了文件的GC迁移,此时的新文件的大小就是旧文件的vail有效文件的大小了。所以,新文件会小于旧文件的大小。
System.out.println("[oldFileLength]:"+oldFileLength);
System.out.println("[newFileLength]:"+newFileLength);
Assertions.assertTrue(newFileLength<oldFileLength);
System.out.println("[testGC] 测试成功!!!");
}