仿 RabbitMQ 的消息队列3(实战项目)

七. 消息存储设计

上一篇博客已经将消息统计文件的读写代码实现了,下一步我们将实现创建队列文件和目录。

实现创建队列文件和目录

初始化 0\t0 这样的初始值.

//创建队列对应的文件和目录:
    public void createQueueFile(String queueName) throws IOException, MqException {
        //先创建对应的目录:
        File file = new File(getQueueDir(queueName));
        if(!file.exists()){
            boolean ok = file.mkdirs();
            if(!ok) throw new IOException("创建队列目录失败。baseDir:"+file.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的目录已经被创建过了,创建失败");
        }
        //下面开始创建 数据文件:
        File dataFile = new File(getQueueDataDir(queueName));
        if(!dataFile.exists()){
            boolean ok = dataFile.createNewFile();
            if(!ok) throw new IOException("创建数据文件失败。queuedataDir:"+dataFile.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的数据文件已经被创建过了,创建失败");
        }
        //创建 统计文件:
        File statFile = new File(getQueueStatDir(queueName));
        if(!statFile.exists()){
            boolean ok = statFile.createNewFile();
            if(!ok) throw new IOException("创建统计文件失败。queuestatDir:"+statFile.getAbsolutePath());
        }else{
            throw new MqException("[createQueueFile] 队列对应的统计文件已经被创建过了,创建失败");
        }
        //给消息统计文件设定初始值 0\t0, (消息数量:0,有效消息数量:0)
        // 目的:不用在今后使用的时候对空文件做一些特殊的判定
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        //再写入:
        writeStat(queueName,stat);
    }

实现删除文件或目录

注意:File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉,如果还存在多余文件,就会删除失败。

//删除队列的文件或目录:
    //队列也是可以删除的,当队列删除后,对应的消息文件啥的,也要随之删除。
    public void deleteQueueFile(String queueName) throws IOException{
        //先删除 数据文件:
        File queueDataFile = new File(getQueueDataDir(queueName));
        boolean ok1 = queueDataFile.delete();
        //再删除 统计文件:
        File queueStatFile = new File(getQueueStatDir(queueName));
        boolean ok2 = queueStatFile.delete();
        //再删除目录;
        File file = new File(getQueueDir(queueName));
        boolean ok3 = file.delete();
        if(!(ok1 && ok2 && ok3)){
            //任意一个删除失败,就失败,抛出异常:
            throw new IOException("删除队列文件或目录失败");
        }
    }

检查队列⽂件是否存在

判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.

//检查队列的 文件或目录 是否存在: 目的:判断是否队列之前被 别人用过。
    //用处1:如果后续有生产者给 broker server 生产消息了,这个消息可能需要记录到文件上,此时需要判断文件是否存在(持久化的应用)。
    public boolean checkFilesExists(String queueName){
        //如果队列的 数据文件和统计文件都存在,才存在:
        File queueDataFiles = new File(getQueueDataDir(queueName));
        if(!queueDataFiles.exists()) return false;
        File queueStatFiles = new File(getQueueStatDir(queueName));
        if(!queueStatFiles.exists()) return false;
        //都存在,则返回true;
        return true;
    }

实现消息对象序列化/反序列化

先创建工具类BinaryTool用与序列化/反序列化。
在这里插入图片描述

  • 使⽤ ByteArrayInputStream / ByteArrayOutputStream 针对 byte[] 进⾏封装, ⽅便后续操作. (这两个流对象是纯内存的, 不需要进⾏ close).
  • 使⽤ ObjectInputStream / ObjectOutputStream 进⾏序列化 / 反序列化操作. 通过内部的readObject / writeObject 即可完成对应操作.
/**
 * 这个类用来序列化 与反序列化
 * 此处我们采用的是java标注库里的 ObjectOutputStream 和ObjectInputStream 两个流对象,但是序列化的对象必须要实现Serializable接口、
 *
 * 由于将序列化,反序列化当做一个工具,很多数据都可能用到,所以我们将它的方法搞成静态的
 *
 */
public class BinaryTool {
    //序列化:
    public static byte[] toBytes(Object object) throws IOException {
        //由于在try里面写流对象能自动关闭省去我们不少事,所以,直接写在try()里
        //这里 使用ByteArrayOutputStream是因为 未知的byte数组的长度,这个类能自动记录。
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            //将byteArrayOutputStream 传入ObjectOutputStream 就相当于将他们相互关联了,当objectOutputStream调用writeObject方法
            //时会将这个对象写入关联的byteArrayOutputStream里,然后直接调用byteArrayOutputStream里的方法,将序列化的数据转换成直接数组就行了
            //其实这个 ObjectOutputStream 不仅可以关联数组,还可以是文件,网络。关联了文件就将对象序列化到文件里,关联了网络,就是网络数据的传输socket。
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                objectOutputStream.writeObject(object);
            }
            //这个操作就是把byteArrayOutputStream中持有的二进制数据取出来,转成byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }
    //反序列化:
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
            try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
                return objectInputStream.readObject();
            }
        }
    }

}

实现写入消息文件

  • 考虑线程安全, 按照队列维度进⾏加锁.
  • 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
  • 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息,这也是message里的偏移量初始化的时候,就是发送消息的时候。offsetBeg是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息大小的空间.
  • 写完消息, 要同时更新统计信息.
 //该方法将传来的一个新的消息放到对应的文件当中:新增消息
    public void sendMessage(MESGQueue queue, Message message) throws IOException, MqException {
        //先检查文件是否存在:如果不存在怎抛出异常,这个异常可以自定义。
        if(!checkFilesExists(queue.getName())){
            throw new MqException("[MessageFileManager对应的文件不存在]!queueName"+queue.getName());
        }
        //先进行序列化:
        byte[] binaryMessage = BinaryTool.toBytes(message);
        //为了解决线程安全问题,我们引入锁,如果此时的锁对象 是同一个队列,那就阻塞等待。
        synchronized (queue){
            //先将数据文件new出来,看看此时文件里已经写入的数据长度,方便我们后续计算offsetbegin和offsetend
            File file = new File(getQueueDataDir(queue.getName()));
            //在写入消息的时候才对message里的offsetbegin和offsetend 进行赋值:
            message.setOffsetBeg(file.length()+4);
            message.setOffsetEnd(file.length()+4+binaryMessage.length);
            //由于我们的写入是追加写入,所以不要忘记 true
            try (OutputStream outputStream = new FileOutputStream(file,true)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    //先写入4个字节的消息长度:
                    dataOutputStream.writeInt(binaryMessage.length);
                    //再写入offsetbegin 和 offsetend
                    dataOutputStream.write(binaryMessage);
                }
            }
            //此时已经将消息数据文件写完了,不要忘记消息统计文件:
            Stat stat = readStat(queue.getName());
            stat.validCount++;
            stat.totalCount++;
            writeStat(queue.getName(),stat);
        }
    }

创建异常类MqException

作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常
在这里插入图片描述

public class MqException extends Exception{
    public MqException(String reason){
        super(reason);
    }
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

  • 使⽤ RandomAccessFile 来随机访问到⽂件的内容.(随机访问其实没什么玄乎的,就像数组一样,能通过下标快速访问某个元素,这就是随机访问的原理。内存是支持随机访问的)。
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在⽂件中的位置. 通过randomAccessFile.seek 操作⽂件指针偏移过去. 再读取.
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回⽂件. 注意写的时候要重新设定文件指针的位置. ⽂件指针会随着上述的读操作产⽣改变,所以要重新seek,将光标移动到开始。
  • 最后, 要记得更新统计⽂件, 把合法消息 - 1.
 //删除消息:主要的操作歩奏:
    // 1,将消息读出来
    //2,将消息里的isVail 改成0x0
    //3,将消息放回文件中
    public void deleteMessage(MESGQueue queue,Message message) throws IOException, ClassNotFoundException {
        //由于删除消息的时候也可能收到线程安全问题,所以我们要加锁:
        synchronized (queue){
            //先将消息读出来:
            //由于我们正常使用的FileInputStream,只能从头开始读。而此时的场景更倾向于 随机读取,所以我们使用到了RandomAccessFile进行随机读取
            //注意这个RandomAccessFile类的第二个参数:rw可读可写。
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataDir(queue.getName()),"rw")){
                //先准备一个byte数组用来放 读出来的二进制数据:
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                //先将光标刷新到 offsetbegin
                randomAccessFile.seek(message.getOffsetBeg());
                //将message二进制数据读出来
                randomAccessFile.read(bufferSrc);
                //转换成message对象:
                Message message1 = (Message) BinaryTool.fromBytes(bufferSrc);
                //将message里的isVail 改成0x0
                message1.setIsVail((byte) 0x0);
                //将新的message1 转成二进制:
                byte[] bufferDest = BinaryTool.toBytes(message1);
                //由于上一次读文件光标已经发生了变化,所以此时还要调整光标到offsetbegin
                randomAccessFile.seek(message.getOffsetBeg());
                //将数据写入文件:
                randomAccessFile.write(bufferDest);

                //此时已经将数据文件里的vail改成无效,那我们需不需要将这个内存中的 message对象里的vail也改成无效呢?
                //可以是可以,但是没有必要:想象一下我们将一个文件标记成无效的场景是不是我们此时要删掉这个文件的时候,
                //此时我们都要删掉这个文件了,当然要连同文件里的数据和内存中的数据都删了呀,文件里的数据可能需要一些歩奏,
                //但是在内存中删一个对象实在太容易了,今后会有内存中的删除消息操作。这就相当于让一个将死之人多活几秒,但他终究逃不过死亡
                //这就是message里的vail 其实不需要改动的原因。
            }
            //不要忘了统计文件也要更新, 由于我们此时已经将数据文件里的一个消息改成无效的,所以此时统计文件里的有效消息就要--了
            Stat stat = readStat(queue.getName());
            if(stat.validCount >0){
                stat.validCount --;
            }
            //再将更新后的统计信息 写入文件
            writeStat(queue.getName(),stat);
        }
    }

实现消息加载

这个功能在服务器重启, 和垃圾回收的时候都很关键

  • 使⽤ DataInputStream 读取数据. 先读 4 个字节为消息的⻓度, 然后再按照这个⻓度来读取实际消息内容.
  • 读取完毕之后, 转换成 Message 对象.
  • 同时计算出该对象的 offsetBeg 和 offsetEnd.
  • 最终把结果整理成链表, 返回出去.
  • 注意, 对于 DataInputStream 来说, 如果读取到 EOF, 会抛出⼀个 EOFException , ⽽不是返回特定值. 因此需要注意上述循环的结束条件.
//从消息数据文件当中读出所有消息:
    //由于是服务器刚启动的时候才会调用这个方法,此时的队列还不能处理各种请求,所以不需要考虑线程安全问题。
    public LinkedList<Message> loadAllMessagesFromQueueDataFile(String queueName) throws IOException, ClassNotFoundException, MqException {
        //先new出来一个linkedList来放所有消息:使用链表是因为要进行头删和尾删等操作:
        LinkedList<Message> messages = new LinkedList<>();
        //创建流对象:
        try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){
            //与上面的DataOutputStream对应,此时用的是DataInputStream
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
                //由于要读的消息可能不止一条,所以用一个while循环:
                //但是如果我们直接这样写会一直重复的读一个消息,而DataInputStream不能控制光标的移动,所以要定义一个量来
                //记录我们读到哪里了,另外,这个量也为后续message对象的offsetbegin和offsetend的初始化提供便利
                long currentOffset = 0;
                //写完大概得逻辑以后不知道不会不会有疑问,这个while的条件可是true啊,这不死循环了嘛,
                //其实这也是无奈之举,主要原因是dataInputStream.readInt()读到文件的末尾并不会返回-1,EOF啥的,而是
                //直接抛出 EOFException异常,直接结束循环,因此我们只用在外层catch住这个异常就行了,这是一个很特别的预料之内的循环结束方式
                while(true){
                    //先读4个字节,求出数据的长度:
                    int messageLen = dataInputStream.readInt();
                    //创建一个刚好能装messageLen长度的字节数组:
                    byte[] messageBinary = new byte[messageLen];
                    //读出消息数据:并且用变量接收,判断读出的数据是否和预期的数据长度一致,若不一致,说明格式不正确,错乱了则抛出异常
                    int realMessageLen = dataInputStream.read(messageBinary);
                    if(realMessageLen != messageLen){
                        throw new MqException("[MessageFileManager] 文件格式错误!!!queueName:"+queueName);
                    }
                    //将数组反序列化成message对象
                    Message message = (Message) BinaryTool.fromBytes(messageBinary);
                    //如果读到的消息是无效的,则跳过这个无效消息,更新currentoffset:
                    if(message.getIsVail() == 0x0){
                        currentOffset+=(4+messageLen);
                        continue;
                    }
                    //再将message里的offsetbegin和offsetend给初始化:
                    message.setOffsetBeg(currentOffset+4);
                    message.setOffsetEnd(currentOffset+4+messageLen);
                    //正常读完后,别忘了,将currentoffset更新
                    currentOffset+=(4+messageLen);
                    //再将消息加入到链表当中
                    messages.add(message);
                }
            }catch (EOFException e){
                System.out.println("[MessageFileManager] 恢复Message 数据完成!!!");
            }
        }
        return messages;
    }

实现垃圾回收(GC)

  • 上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越多. 因此需要定期的进⾏批量清除.
  • 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC。GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
 public void gc(MESGQueue queue) throws MqException, IOException, ClassNotFoundException {
        //根据以前的写代码经验,次GC过程可能有线程安全问题,所以我们直接加锁:
        //其实这也是为什么形参传入的是一个队列,而不是队列的名字的其中一个原因。
        synchronized (queue){
            //由于GC的执行时间可能很慢,我们手动的将时间计算出来,如果将来服务器运行半天无响应了,如果是GC的问题
            //我们也能知道
            long gcBegin = System.currentTimeMillis();
            //先创建新的文件:
            File newQueueFile = new File(getQueueDataNewPath(queue.getName()));
            //如果文件已经存在了,可能上一次gc有残留,这是不正常的,所以抛出异常
            if(newQueueFile.exists()){
                throw new MqException("[MessageFilemanager] 队列的queue_new_data.txt已经存在!!queueName:"+queue.getName());
            }
            //如果执行到这,说明文件不存在,则创建新文件:
            boolean ok = newQueueFile.createNewFile();
            //如果创建文件失败,则抛出异常:
            if(!ok){
                throw new MqException("[MessageFileManager] 创建文件失败!!newQueueDataFile:"+newQueueFile.getAbsolutePath());
            }
            //先创建一个链表用来存储从原来的文件中取出来的message对象:此处可以用到之前的方法:
            //取出原来文件里的所有有效文件:
            LinkedList<Message> messages = loadAllMessagesFromQueueDataFile(queue.getName());
            //new出相应的流对象用来写入新文件:
            //这里我写错了,将queue.getName()传入了,但是明明是一个不存在的路径,他竟然还能正常写?底层也不抛出异常,
            //我真是又惊讶,又惊吓。
            //之后我又去查了查资料:原来是FileOutputStream的问题啊,FileOutputStream太nb了,
            //如果传入的字符串对应的路径不存在,FileOutputStream会自动给你创建一个文件用于写入,这这这也太贴心了吧,
            //不过我还是希望他能直接抛异常,毕竟我找bug也找了这么久了,况且天知道他会把我的数据写到哪里:
            //其实也知道:如果是绝对路径,他会自动创建路径下的文件;如果是相对路径,他会在当前工作空间创建一个文件。
            //找了一圈以后发下,在我的mq路径下,就存在一个queuetest1的文件,里面正好是之前我写入的数据,呜呜呜,要哭了。
//            try(OutputStream outputStream = new FileOutputStream(queue.getName())){
            try(OutputStream outputStream = new FileOutputStream(newQueueFile)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                    //循环读取messages,将对象重新写入新文件:
                    for(Message m : messages){
                        //先将消息序列化:一个字节数组:
                        byte[] buffer = BinaryTool.toBytes(m);
                        //将二进制数组写入新的文件:注意遵循之前的约定:
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }
            //删除旧文件,这里以前传入的旧文件的路径写错了,直接传成了名字,所以写代码一定要细心啊。
//            File oldQueueFile = new File(queue.getName());
            File oldQueueFile = new File(getQueueDataDir(queue.getName()));
            boolean ok2 = oldQueueFile.delete();
            System.out.println("[ok2]oldQueueFile 文件删除:"+ok2);
            //如果删除失败,可能是没有权限之类的,抛出异常:
            if(!ok2){
                throw new MqException("MessageFileManager 删除旧文件失败!! oldDataQueueFile:"+oldQueueFile.getAbsolutePath());
            }
            //重命名新文件:
            boolean ok3 = newQueueFile.renameTo(oldQueueFile);
            //如果重命名失败,抛出异常:
            if(!ok3) {
                throw new MqException("[MessageFileManager] 新文件重命名失败!!oldDataQueueFile:"+oldQueueFile.getAbsolutePath()
                        +" , newDataQueueFile="+newQueueFile.getAbsolutePath());
            }
            //不要忘记更新统计文件:
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(),stat);
            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] GC执行完毕!!! 执行的时间:"+(gcEnd - gcBegin)+"ms");
        }

    }

测试MessageFileManager

创建MessageFileManagerTest类用于测试:
在这里插入图片描述

测试前的准备:

  • 创建两个队列, ⽤来辅助测试.
  • 使⽤ ReflectionTestUtils.invokeMethod 来调⽤私有⽅法(这就是传说中的反射,注意它的参数,用法)。
  •     ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
    

这个反射的参数:
第一个参数:类的实例。
第二个参数:你想调用的方法
后面的参数就是不定参数了(数量不确定),能确定的是:后面的参数的就是你想调用的方法的参数。


@SpringBootTest
public class MessageFileManagerTest {
    private MessageFileManager messageFileManager = new MessageFileManager();
    private static final String queueName1 = "queuetest1";
    private static final String queueName2 = "queuetest2";

    @BeforeEach
    public void setUp() throws IOException, MqException {
        //由于我们要测试的是队列,所以准备工作就是先创建队列文件:
        messageFileManager.createQueueFile(queueName1);
        messageFileManager.createQueueFile(queueName2);
    }
    //这个@AfterEach注解我试过了,即使测试方法执行过程中抛出了异常,这个方法还是在每次执行完测试单元以后该执行他还是执行他,
    //无关乎异常,真nb
    @AfterEach
    public void tearDown() throws IOException {
        //首尾工作,将刚才创建的队列文件删掉:
        messageFileManager.deleteQueueFile(queueName1);
        messageFileManager.deleteQueueFile(queueName2);
    }
  }

测试代码:

@Test
    public void testCreateFile(){
        //其实就测试创建的队列文件是否存在:
        //由于我们在MessageFileManager里的get路径方法是 private修饰的,所以不能直接调用get路径方法,只能手动写上
        //检验 队列数据文件是否存在:
        File queueDataFile1 = new File("./data/"+queueName1+"/queue_data.txt");
        //此处用的方法是isFile 而不是exists,因为要判定这是个文件,并不是只是存在就行,存在了也可能是个目录。
        Assertions.assertEquals(true,queueDataFile1.isFile());
        //检验 队列统计文件是否存在:
        File queueStatFile1 = new File("./data/"+queueName1+"/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile1.isFile());

        //检验 队列数据文件是否存在:
        File queueDataFile2 = new File("./data/"+queueName2+"/queue_data.txt");
        Assertions.assertEquals(true,queueDataFile2.isFile());
        //检验 队列统计文件是否存在:
        File queueStatFile2 = new File("./data/"+queueName2+"/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile2.isFile());
        System.out.println("[CreateFileText] 测试创建队列文件成功!!!");
    }
    @Test
    public void testReadAndWriteStat(){
        //先创建出stat类,由于他是内部类,所以要类名. 调用出来:
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 200;
        stat.validCount = 100;
        //此时写入stat 到统计文件当中:但是如果直接用messageFileManager. 由于writeStat是private修饰,所以肯定调用不出来,
        //此时就要用spring带的 反射方法了:
//        messageFileManager.
        //用反射将 stat写入统计文件:
        ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
        //用反射将 写入的统计文件读出来:
        MessageFileManager.Stat statNew = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
        //判断读出来的stat和我们设定的stat是否一样
        Assertions.assertEquals(200,statNew.totalCount);
        Assertions.assertEquals(100,statNew.validCount);
        System.out.println("[testReadAndWriteStat] 测试成功!!!");
    }

    //要想测试发送消息,首先要有队列和消息吧,所以,我们先写创建队列和消息的方法:
    private MESGQueue createQueue(){
        MESGQueue queue = new MESGQueue();
        //这里的队列名字不能随便取,因为随便取的队列名字也没有对应的文件啊,要用就要用已经创建了文件的队列名,
        //考虑到这个队列要与文件交互,而我们只创建了queuename1和queuename2两个名字对应的文件,所以只能用这两个名字的一个。
        queue.setName(queueName1);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("aaa", "111");
        hashMap.put("bbb", "222");
        queue.setArguments(hashMap);
        return queue;
    }
    private Message createMessage(String context){
        //此时能用到我们之前在message里写的创建 message的工厂类了:
        Message message = Message.createMessageWithId("testRoutingKey",null,context.getBytes());
        return message;
    }

    @Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
        //先创建队列与消息:
        MESGQueue queue = createQueue();
        Message message = createMessage("abcdefghijklmnopqrstuvwxyz");
        //发送消息:
        messageFileManager.sendMessage(queue,message);

        //验证stat文件:
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat",queueName1);
        Assertions.assertEquals(1,stat.totalCount);
        Assertions.assertEquals(1,stat.validCount);

        //验证data文件:使用loadAllMessagesFromQueueDataFile读取文件内容:
        LinkedList<Message> messages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //验证:
        Assertions.assertEquals(1,messages.size());
        Message message1 = messages.get(0);
        //判断这个message1和我们之前的消息message是否一样:
        Assertions.assertEquals(message.getMessageId(),message1.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(),message1.getRoutingKey());
        Assertions.assertEquals(message.getDeliverMode(),message1.getDeliverMode());
        Assertions.assertArrayEquals(message.getBody(),message1.getBody());
        System.out.println("[testSendMessage] 测试成功!!!");
    }

    //虽然上一个testSendMessage 已经间接测试过这个方法,但是为了求稳,再测试一遍
    @Test
    public void testLoadAllMessagesFromQueueDataFile() throws IOException, MqException, ClassNotFoundException {
        //我们需要准备200条数据用来加载:
        //先创建一个队列用来存放消息:注意这个方法使用的是queueName1创建的队列
        MESGQueue queue = createQueue();
        //先创建一个链表用来保存消息,和后面新加载的消息作对比:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //使用for循环创建消息:
        for(int i =0;i<200;i++){
            Message message = createMessage("testMessage"+i);
            //将消息写入文件:
            messageFileManager.sendMessage(queue,message);
            //记录消息:
            expectedMessages.add(message);
        }
        //调用loadAllMessagesFromQueueDataFile取出所有消息:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先验证队列的数目是否一致:
        Assertions.assertEquals(200,realMessages.size());
        //验证基本属性:
        for(int i= 0;i<realMessages.size();i++){
            Message realMessage = realMessages.get(i);
            Message expectMessage = expectedMessages.get(i);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        System.out.println("[testLoadAllMessagesFromQueueDataFile] 测试成功!!!");
    }

    //测试删除消息:
    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        //先创建一个队列:
        MESGQueue queue = createQueue();
        //创建一个链表用来保存预期消息:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //再将20条消息都写入队列:
        for(int i =0;i<20;i++){
            Message message = createMessage("testMessage"+i);
            //将消息写入队列文件:
            messageFileManager.sendMessage(queue,message);
            //记录消息:
            expectedMessages.add(message);
        }
        //这里 就以删除前三条消息为例:
        messageFileManager.deleteMessage(queue,expectedMessages.get(0));
        messageFileManager.deleteMessage(queue,expectedMessages.get(1));
        messageFileManager.deleteMessage(queue,expectedMessages.get(2));

        //读出消息,对比:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先判断个数:
        Assertions.assertEquals(17,realMessages.size());
        for(int i =3;i<20;i++){
            Message realMessage = realMessages.get(i-3);
            Message expectMessage = expectedMessages.get(i);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        System.out.println("[testDeleteMessage] 测试成功!!!");
    }

    //测试GC,这里的GC其实只是测试,不用管消息总数是否大于2000或有效消息占比不到50%,因为那是业务上的判定,会有专门的类来进一步封装
    //而此处我们只进行测试GC这个方法
    //计划将100条消息都存入队列,然后将奇数下标的消息都删除,然后执行GC,验证现在的文件是否比原来的文件小:
    @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        //创建一个队列:
        MESGQueue queue = createQueue();
        //创建一个链表用来记录消息:
        LinkedList<Message> expectedMessages = new LinkedList<>();
        //先发送100条消息:
        for(int i =0;i<100;i++){
            Message message = createMessage("testMessage"+i);
            //发送到队列:
            messageFileManager.sendMessage(queue,message);
            //记录
            expectedMessages.add(message);
        }
        //删除奇数下标的消息:
        for(int i =1;i<100;i+=2){
            messageFileManager.deleteMessage(queue,expectedMessages.get(i));
        }
        //先记录执行GG之前的文件大小:
        File oldFile = new File("./data/"+queueName1+"/queue_data.txt");
        long oldFileLength = oldFile.length();

        //执行GC
        messageFileManager.gc(queue);
        //记录执行完GC之后的文件大小:
        File newFile = new File("./data/"+queueName1+"/queue_data.txt");
        long newFileLength = newFile.length();
        //取出真实的消息:
        LinkedList<Message> realMessages = messageFileManager.loadAllMessagesFromQueueDataFile(queueName1);
        //先验证消息数量是否对的上:
        Assertions.assertEquals(50,realMessages.size());
        //挨个验证消息:
        for(int i = 0;i<50;i++){
            Message realMessage = realMessages.get(i);
            Message expectMessage = expectedMessages.get(i*2);
            System.out.println("["+i+"]"+realMessage.toString());
            Assertions.assertEquals(realMessage.getMessageId(),expectMessage.getMessageId());
            Assertions.assertEquals(realMessage.getIsVail(),expectMessage.getIsVail());
            Assertions.assertEquals(realMessage.getDeliverMode(),expectMessage.getDeliverMode());
            Assertions.assertEquals(realMessage.getRoutingKey(),expectMessage.getRoutingKey());
            Assertions.assertArrayEquals(realMessage.getBody(),expectMessage.getBody());
        }
        //验证文件大小:
        //这个验证的原理其实是:
        //删除一个文件并不是直接删除,而是逻辑删除,通过标记统计文件里的vail来标识的,此时的数据文件即使有很多无效的文件,但是他的大小依旧是total
        //而非vail有效文件的大小。但是如果进行了文件的GC迁移,此时的新文件的大小就是旧文件的vail有效文件的大小了。所以,新文件会小于旧文件的大小。
        System.out.println("[oldFileLength]:"+oldFileLength);
        System.out.println("[newFileLength]:"+newFileLength);
        Assertions.assertTrue(newFileLength<oldFileLength);
        System.out.println("[testGC] 测试成功!!!");
    }

测试结果:没问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/958362.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

无人机 PX4 飞控 | PX4源码添加自定义参数方法并用QGC显示与调整

无人机 PX4 飞控 | PX4源码添加自定义参数方法并用QGC显示与调整 0 前言 之前文章添加了一个自定义的模块&#xff0c;本篇文章在之前的自定义模块中&#xff0c;添加两个自定义参数 使用QGC显示出来&#xff0c;并通过QGC调整参数值&#xff0c;代码实现参数更新 新增的参…

【真机调试】前端开发:移动端特殊手机型号有问题,如何在电脑上进行调试?

目录 前言一、怎么设置成开发者模式&#xff1f;二、真机调试基本步骤&#xff1f; &#x1f680;写在最后 前言 edge浏览器 edge://inspect/#devices 谷歌浏览器&#xff08;开tizi&#xff09; chrome://inspect 一、怎么设置成开发者模式&#xff1f; Android 设备 打开设…

2024年第十五届蓝桥杯青少组国赛(c++)真题—快速分解质因数

快速分解质因数 完整题目和在线测评可点击下方链接前往&#xff1a; 快速分解质因数_C_少儿编程题库学习中心-嗨信奥https://www.hixinao.com/tiku/cpp/show-3781.htmlhttps://www.hixinao.com/tiku/cpp/show-3781.html 若如其他赛事真题可自行前往题库中心查找&#xff0c;题…

Linux系统下速通stm32的clion开发环境配置

陆陆续续搞这个已经很久了。 因为自己新电脑是linux系统无法使用keil&#xff0c;一开始想使用vscode里的eide但感觉不太好用&#xff1b;后面想直接使用cudeide但又不想妥协&#xff0c;想趁着这个机会把linux上的其他单片机开发配置也搞明白&#xff1b;而且非常想搞懂cmake…

【FFmpeg】FLV 格式分析 ③ ( Tag Body 数据块体结构 - Vedio Data 视频数据 )

文章目录 一、Tag Body 数据块体结构 - Video Data 视频数据1、Vedio Data 视频数据 类型标识2、Vedio Data 视频数据 结构分析3、Composition Time Offset 字段涉及的时间计算4、AVC Packet Type 字段说明① AVC Sequence Header 类型② AVC NALU 类型③ AVC End of Sequence …

尚硅谷大数据数仓项目superset db upgrade报错解决(2025.1.23解决)

尚硅谷大数据数仓项目superset db upgrade报错解决&#xff08;2025.1.23解决&#xff09;和 superset安装MySQL报错解决 解决方法&#xff08;2025.1.23解决&#xff09; 0.卸载之前安装好的Superset -- 退出当前环境 conda deactivate-- 卸载Superset conda remove -n sup…

vue3+uniapp开发鸿蒙初体验

去年7月20号&#xff0c;uniapp官网就已经开始支持鸿蒙应用开发了&#xff0c;话不多说&#xff0c;按照现有规则进行配置实现一下鸿蒙开发效果&#xff1b; 本文基于macOS Monterey 版本 12.6.5实现 开发鸿蒙的前置准备 这里就直接说我的版本&#xff1a; DevEco Studio 5.…

996引擎 - 前期准备-配置开发环境

996引擎 - 前期准备 官网搭建服务端、客户端单机搭建 开发环境配置后端开发环境配置环境 前端开发环境配置环境 后端简介前端简介GUILayoutGUIExport 官网 996传奇引擎官网 所有资料从官网首页开始&#xff0c;多探索。 文档&#xff1a; 996M2-服务端Lua 996M2-客户端Lua 搭…

Python FastAPI 实战应用指南

文章目录 1. 前言2. FastAPI 的优势3. FastAPI 快速入门3.1 安装3.2 最简单的 API 案例 4. 基础功能应用4.1 模型验证和参数校验4.2 实现高级计划&#xff1a;用于实时功能和快速发布 5. 高级应用5.1 实现 OAuth2 认证5.2 提供 WebSocket 支持 6. 总结 1. 前言 FastAPI 是一个…

Vue3 项目打包并部署到Nginx

一、安装Nginx 官网下载链接&#xff1a; nginx: downloadhttps://nginx.org/en/download.htmlhttps://nginx.org/en/download.html 下载后解压并双击 nginx.exe 启动服务&#xff1a; 打开浏览器&#xff0c;访问 http://localhost/ &#xff0c;若出现如下页面&#xff0c…

二叉树(了解)c++

二叉树是一种特殊的树型结构&#xff0c;它的特点是: 每个结点至多只有2棵子树(即二叉树中不存在度大于2的结点) 并且二叉树的子树有左右之分&#xff0c;其次序不能任意颠倒&#xff0c;因此是一颗有序树 以A结点为例&#xff0c;左边的B是它的左孩子&#xff0c;右边的C是…

会议签到系统的架构和实现

会议签到系统的架构和实现 摘要:通过定制安卓会议机开机APP呈现签到界面&#xff0c;并且通过W/B结构采集管理签到信息&#xff0c;实现会议签到的功能。为达到此目标本文将探讨使用Redis提供后台数据支持&#xff1b;使用SocketIo处理适时消息&#xff1b;使用Flask进行原型开…

PIC单片机HEX文件格式分析

在调试PIC单片机在bootloader程序时&#xff0c;需要将hex文件转换为bin文件&#xff0c;在转换之前先了解一下hex文件中数据是如何定义的。 直接打开一个LED灯闪烁的程序生成的hex文件&#xff0c;芯片型号为PIC18F46K80 可以看到每条数据都是由6部分组成的&#xff0c;下面分…

17-使用椭圆制作鼻子

17-使用椭圆制作鼻子_哔哩哔哩_bilibili17-使用椭圆制作鼻子是一次性学会 Canvas 动画绘图&#xff08;核心精讲50个案例&#xff09;2023最新教程的第18集视频&#xff0c;该合集共计53集&#xff0c;视频收藏或关注UP主&#xff0c;及时了解更多相关视频内容。https://www.bi…

通过 Visual Studio Code 启动 IPython

在Visual Studio Code 中&#xff0c;你可以使用内置的终端来启动 ipython&#xff0c;当然首先要安装好ipython。 安装ipython的方法是在cmd里面输入以下命令安装&#xff1a; pip install ipython 启动ipython的步骤如下&#xff1a; 打开 VSCode 终端&#xff1a; 在 VSCo…

网络(三) 协议

目录 1. IP协议; 2. 以太网协议; 3. DNS协议, ICMP协议, NAT技术. 1. IP协议: 1.1 介绍: 网际互连协议, 网络层是进行数据真正传输的一层, 进行数据从一个主机传输到另一个主机. 网络层可以将数据主机进行传送, 那么传输层保证数据可靠性, 一起就是TCP/IP协议. 路径选择: 确…

Qt基础项目篇——Qt版Word字处理软件

一、核心功能 本软件为多文档型程序&#xff0c;界面是标准的 Windows 主从窗口 拥有&#xff1a;主菜单、工具栏、文档显示区 和 状态栏。 所要实现的东西&#xff0c;均在下图了。 开发该软件&#xff0c;主要分为下面三个阶段 1&#xff09;界面设计开发 多窗口 MDI 程序…

Python+langchain+通义千问qwen(大模型实现自己的聊天机器人)

Langchain langchain是一个用于开发由语言模型驱动的应用程序的框架&#xff0c;致力于简化AI模型应用的开发。简单来说&#xff0c;langchain就是一个&#xff08;帮助开发者轻松完成AI模型应用开发的&#xff09;框架,现在支持python和js两个版本&#xff0c;它集成多种大语…

FPGA中场战事

2023年10月3日,英特尔宣布由桑德拉里维拉(Sandra Rivera)担任“分拆”后独立运营的可编程事业部首席执行官。 从数据中心和人工智能(DCAI)部门总经理,转身为执掌该业务的CEO,对她取得像AMD掌门人苏姿丰博士类似的成功,无疑抱以厚望。 十年前,英特尔花费167亿美元真金白银…

【超详细】ELK实现日志采集(日志文件、springboot服务项目)进行实时日志采集上报

本文章介绍&#xff0c;Logstash进行自动采集服务器日志文件&#xff0c;并手把手教你如何在springboot项目中配置logstash进行日志自动上报与日志自定义格式输出给logstash。kibana如何进行配置索引模式&#xff0c;可以在kibana中看到采集到的日志 日志流程 logfile-> l…