模拟实现消息队列项目(系列4) -- 服务器模块(内存管理)

目录

前言

1. 创建MemoryDataCenter

2. 封装Exchange 和 Queue方法

3. 封装Binding操作

4. 封装Message操作

4.1 封装消息中心集合messageMap

4.2 封装消息与队列的关系集合queueMessageMap的操作

5. 封装未确认消息集合waitMessage的操作

6. 从硬盘中恢复数据到内存中

7. MemoryDataCenter单元测试

结语


前言

        上一节我们总结了服务器模块的硬盘管理,将交换机,队列,绑定存书到Sqlite数据库中,将消息按照队进行创建文件存储在本地硬盘中.并且封装了对于数据库和文件的各种操作.实现了持久化的效果,但是实际的消息存储/转发,主要靠内存的结构.对于消息队列来说,内存部分是更关键的,内存速度更快,可以达到更高的并发.本节就对内存管理进行封装.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 创建MemoryDataCenter

路径:mqserver.datacenter.MemoryDataCenter

考虑到多线程的原因,我们将HashMap替换成ConcurrentHashMap (对每个哈希桶进行加锁,相对来说是线程安全的)

@Data
public class MemoryDataCenter {
    // 1. 交换机  多线程环境下使用,使用ConcurrentHashMap会相对线程安全
    //         key:ExchangeName,value:Exchange对象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();

    // 2. 队列  key:QueueName,value:MSQueue对象
    private ConcurrentHashMap<String, MSQueue> queueMap = new ConcurrentHashMap<>();

    // 3. 绑定  key:ExchangeName,value:HashMap(key:QueueName,value:MSQueue对象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();

    // 4. 消息  key:MessageID,value:Message对象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

    // 5. 消息和队列的映射关系 HashMap: key:QueueName,value:LinkedList(Message对象)
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

    // 6. 未确认的消息  HashMap: key:QueueName,value:HashMap(key:MessageID,value:Message对象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}

2. 封装Exchange 和 Queue方法

主要就是插入和获取数据以及删除

   /**
     * 1. 针对内存中的交换机,队列设置操作
     */
    public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
    }

    public void insertQueue(MSQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());
    }

    public MSQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);
    }

3. 封装Binding操作

这里呢之所以将绑定的操作单独列举出来,是因为存储绑定信息的数据结构是相对比较复杂的,是嵌套的HashMap.

对于插入绑定信息:

1, 首先按照交换机的名字进行查找,如果查找不到就进行创建一个HashMap的数据结构存储到含有绑定信息的HashMap中,如果存在的话在按照队列名字进行查找绑定信息,如果查找到了,说明改绑定信息已经插入过就不要进行插入了,如果没找到就进行插入操作.

2. 在上述查找和插入的操作比并不是原子的,所以我们要给是上述操作,按照bindingMap进行加锁.以保证我们的线程操作是安全的.

下述是相关对于绑定的操作的代码:

    /**
     * 2. 针对绑定进行操作
     */
    /**
     * 2.1插入绑定信息
     * @param binding
     * @throws MqException
     */
    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        // 先使用 exchangeName 查一下, 对应的哈希表是否存在. 不存在就创建一个.
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                k -> new ConcurrentHashMap<>());

        synchronized (bindingMap) {
            // 再根据 queueName 查一下目前的绑定的交换机绑定的是否是当前传入的队列. 如果已经存在(存在相同的绑定关系了,就不需要进行传入), 就抛出异常. 不存在才能插入.
            if (bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
                        ", queueName=" + binding.getQueueName());
            }
            // 最后将绑定关系传入到bingMap中
            bindingMap.put(binding.getQueueName(), binding);
        }
        System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    /**
     * 2.2 获取绑定1: 根据exchangeName, queueName 获取唯一的绑定
     * @param exchangeName
     * @param queueName
     */
    public Binding getBinding(String exchangeName, String queueName){
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null){
            return null;
        }
        synchronized (bindingMap){
            // 防止当别的操作删除了这个队列的绑定信息,而导致的线程错误
            return bindingMap.get(queueName);
        }
    }

    /**
     * 2.3 获取绑定2: 根据exchangeName 查询所有绑定
     * @param exchangeName
     * @return
     */
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) throws MqException {
        if (bindingsMap.get(exchangeName) == null){
            return null;
        }
        return bindingsMap.get(exchangeName);
    }

    /**
     * 2.4 删除绑定关系(单个) 一个交换机对应的单个队列的绑定关系
     * @param binding
     * @throws MqException
     */
    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if (bindingMap == null) {
            // 该交换机没有绑定任何队列. 报错.
            throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName()
                    + ", queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    /**
     * 2.5 删除绑定关系(多个) 1个交换机对应的多个队列的绑定关系.
     */
    public void deleteBinding(String exchangeName){
        bindingsMap.remove(exchangeName);
    }

4. 封装Message操作

4.1 封装消息中心集合messageMap

  • 1. 添加消息到消息中心
  • 2. 根据消息ID查询消息
  • 3. 根据消息ID删除消息
 /**
     * 3. 针对消息进行操作
     */
    /**
     * 3.1 添加消息
     * @param message
     */
    public void addMessage(Message message) {
        messageMap.put(message.getMessageID(), message);
        System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageID());
    }

    /**
     * 3.2 根据 id 查询消息
     * @param messageId
     * @return
     */
    public Message getMessage(String messageId) {
        return messageMap.get(messageId);
    }

    /**
     * 3.3 根据 id 删除消息
     * @param messageId
     */
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
    }

4.2 封装消息与队列的关系集合queueMessageMap的操作

  • 1. 发送消息到指定队列名字的队列
  • 2. 从指定队列中获取消息集合
  • 3. 获取指定队列名字队列中消息的个数
 /**
     * 4 针对消息和队列的关系进行操作
     */
    /**
     * 4.1 发送消息到指定队列
     * @param queue
     * @param message
     */
    public void sendMessage(MSQueue queue, Message message) {
        // 先根据队列的名字, 找到该队列对应的消息链表.
        // 先根据队列的名字进行查询,查不到就进行创建该队列对应的链表  // computeIfAbsent线程安全的
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k-> new LinkedList<>());
        // 再把数据加到 messages 里面
        synchronized (messages) {
            // 对该队列进行添加的时候需要进行加锁
            messages.add(message);
        }
        // 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.
        // 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被添加到队列中! messageId=" + message.getMessageID());
    }

    /**
     * 4.2 从指定队列名字中进行提取信息
     * @param queueName
     * @return
     */
    public Message pollMessage(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        // 队列中没有信息
        if (messages == null){
            System.out.println("[MemoryDataCenter] 该队列中没有信息! queueName=" + queueName);
            return null;
        }
        // 将队列进行头删除(提取信息)
        synchronized (messages){
            if (messages.size() == 0){
                System.out.println("[MemoryDataCenter] 该队列中没有信息! queueName=" + queueName);
                return null;
            }
            Message currentMessage = messages.remove(0); System.out.println
                    ("[MemoryDataCenter] 消息已经从队列中取出! queueName=" + queueName + ", MessageID=" + currentMessage.getMessageID() );
            return currentMessage;
        }
    }

    /**
     * 4.3 获取指定队列名字中消息的个数
     * @param queueName
     * @return
     */
    public int getMessageCount(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        // 队列中没有信息
        if (messages == null){
            System.out.println("[MemoryDataCenter] 该队列中没有信息! queueName=" + queueName);
            return 0;
        }
        // 将队列进行头删除(提取信息)
        synchronized (messages){
            if (messages.size() == 0){
                System.out.println("[MemoryDataCenter] 该队列中没有信息! queueName=" + queueName);
                return 0;
            }
            return messages.size();
        }
    }

5. 封装未确认消息集合waitMessage的操作

  • 1. 添加消息到等待确认队列
  • 2. 从指定未确认队列中删除消息
  • 3. 根据指定的消息ID与未确认队列名字获取消息内容
/**
     * 5. 未确认消息Map的操作
     */

    /**
     * 5.1 添加消息到指定等待确认队列
     * @param queueName
     * @param message
     */
    public void addMessageWaitAck(String queueName, Message message){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap
                .computeIfAbsent(queueName, k-> new ConcurrentHashMap<>());
            waitMessage.put(message.getMessageID(),message);
            System.out.println("[MemoryDataCenter] 消息进入等待确认队列! messageID=" + message.getMessageID());

    }

    /**
     * 5.2 从指定的未确认消息队列中进行删除消息
     * @param queueName
     * @param messageId
     */
    public void removeMessageWaitAck(String queueName, String messageId){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
        if (waitMessage == null){
            System.out.println("[MemoryDataCenter] 该队列为空! queueName=" + queueName);
            return;
        }

        waitMessage.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息已经从等待确认队列中移除! messageId=" + messageId);

    }

    /**
     * 5.3 根据指定消息ID从队列中进行获取信息
     * @param queueName
     * @param messageId
     * @return
     */
    public Message geMessageWaitAck(String queueName, String messageId){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
        if (waitMessage == null){
            System.out.println("[MemoryDataCenter] 该队列为空! queueName=" + queueName);
            return null;
        }
        return waitMessage.get(messageId);
    }

6. 从硬盘中恢复数据到内存中

使用之前封装过的diskDataCenter进行恢复数据.

1. 清空当前内存数据结构中的数据

2. 恢复所有的交换机,队列,绑定,消息数据,恢复消息数据的时候,要将消息中心和消息与队列的映射进行恢复.

/**
     * 6. 从硬盘中恢复数据到内存中 (使用之前封装好的管理硬盘的类进行实现)
     */
    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 1. 清空内存中各种数据信息
        queueMap.clear();
        exchangeMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 2. 恢复所有的交换机信息
        List<Exchange> exchanges = diskDataCenter.selectAllExchange();
        for (Exchange exchange :exchanges) {
             exchangeMap.put(exchange.getName(),exchange);
        }
        // 3. 恢复所有的队列信息
        List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
        for (MSQueue msQueue :queues) {
            queueMap.put(msQueue.getName(),msQueue);
        }

        // 4. 恢复所有的绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBinding();
        for (Binding binding: bindings){
            ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.
                    computeIfAbsent(binding.getExchangeName(), k-> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(),binding);
        }

        // 4. 恢复所有的消息数据

        // 4.1 遍历所有的队列
        // List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
        for (MSQueue msQueue:queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msQueue.getName());
            // 4.2 将获取的消息进行进行加入到队列
            queueMessageMap.put(msQueue.getName(),messages);
            // 4.3 将消息添加上到消息中心
            for (Message message : messages) {
                messageMap.put(message.getMessageID(),message);
            }
        }

7. MemoryDataCenter单元测试

package com.example.demo.mqserver.datacenter;

import com.example.demo.DemoApplication;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: YAO
 * Date: 2023-07-31
 * Time: 10:30
 */
@SpringBootTest
class MemoryDataCenterTest {
    MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    void setUp() {
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    void tearDown() {
        memoryDataCenter = null;
    }

    // 创建一个测试交换机
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setAutoDelete(false);
        exchange.setDurable(true);
        return exchange;
    }

    // 创建一个测试队列
    private MSQueue createTestQueue(String queueName) {
        MSQueue queue = new MSQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        return queue;
    }

    /**
     * 1. 针对交换机进行操作
     */
    @Test
    public void testExchange(){
        // 1. 创建交换机进行插入
        Exchange expectExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectExchange);
        // 2. 查询交换机
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        // 比较内存中的引用是否是同一个引用
        Assertions.assertEquals(expectExchange,actualExchange);
        // 3. 删除交换机
        memoryDataCenter.deleteExchange("testExchange");
        // 4. 查询交换机,比较结果
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

    /**
     * 2. 针对队列进行操作
     */
    @Test
    public void testQueue(){
        // 1. 创建交换机进行插入
        MSQueue expectQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectQueue);
        // 2. 查询交换机
        MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        // 比较内存中的引用是否是同一个引用
        Assertions.assertEquals(expectQueue,actualQueue);
        // 3. 删除交换机
        memoryDataCenter.deleteQueue("testQueue");
        // 4. 查询交换机,比较结果
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

    /**
     * 3. 针对绑定进行测试
     */
    @Test
    public void testBinding() throws MqException {
        // 1.创建绑定并加入到集合中
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        memoryDataCenter.insertBinding(expectedBinding);
        // 2. 查询绑定(单个)
        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding,actualBinding);
        // 2.1 查询所有的绑定
        ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1, bindingMap.size());
        Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));

        // 3. 删除绑定
        memoryDataCenter.deleteBinding("testExchange");
        actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertNull(actualBinding);
        bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertNull(bindingMap);
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }
    /**
     * 4. 针对消息进行测试
     */
    @Test
    public void testMessage(){
        // 1. 创建消息并插入
        Message expectedMessage = createTestMessage("testMessage");
        memoryDataCenter.addMessage(expectedMessage);

        // 2. 查询消息并比较
        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
        Assertions.assertEquals(expectedMessage, actualMessage);

        // 4. 删除消息
        memoryDataCenter.removeMessage(expectedMessage.getMessageID());

        // 5. 查询消息并比较
        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
        Assertions.assertNull(actualMessage);
    }

    /**
     * 5. 测试将消息发送到对列中
     */
    @Test
    public void sendMessage(){
        // 1. 创建一个队列. 创建10条消息,进行插入到队列
        MSQueue expectQueue = createTestQueue("testQueue");
        List<Message> expectMessage = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            memoryDataCenter.sendMessage(expectQueue,message);
            expectMessage.add(message);
        }
        // 2.从队列进行取出消息
        List<Message> actualMessage = new ArrayList<>();
        while (true){
            Message message = memoryDataCenter.pollMessage("testQueue");
            if (message == null){
                break;
            }
            actualMessage.add(message);
        }
        // 3. 比较消息前后是否一致
        Assertions.assertEquals(expectMessage.size(),actualMessage.size());
        for (int i = 0; i < expectMessage.size(); i++) {
            Assertions.assertEquals(expectMessage.get(i),actualMessage.get(i));
        }
    }

    /**
     * 6. 测试未被确认的消息
     */
    @Test
    public void testMessageWaitAck(){
        // 1. 创建消息,插入到未被确认的队列中
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);

        // 2. 获取消息从未被确认的队列中
        Message actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
        Assertions.assertEquals(expectedMessage, actualMessage);

        // 3. 从未被确认的队列中进行删除消息
        memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageID());
        // 4. 比较删除之后的队列是否还有消息
        actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
        Assertions.assertNull(actualMessage);
    }

    /**
     * 7. 测试从硬盘中恢复数据到内存
     */
    @Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        // 由于后续需要进行数据库操作, 依赖 MyBatis. 就需要先启动 SpringApplication, 这样才能进行后续的数据库操作.
        DemoApplication.context = SpringApplication.run(DemoApplication.class);

        // 1. 在硬盘上构造好数据
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        // 构造交换机
        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        // 构造队列
        MSQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        // 构造绑定
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        // 构造消息
        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue, expectedMessage);

        // 2. 执行恢复操作
        memoryDataCenter.recovery(diskDataCenter);

        // 3. 对比结果
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());
        Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());
        Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());

        MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());
        Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());
        Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());

        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());
        Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());
        Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());

        // 4. 清理硬盘的数据, 把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录).
        DemoApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }
}


结语

        以上内容就是针对内存管理的封装,主要是设计了6中数据机构进行存储交换机 队列 绑定 消息 消息和队列的映射 未确认信息.后续对数据进行操作的时候会更加具有效率.这样我们虚拟主机中两大核心部分:硬盘管理和内存管理都总结完成,下一节会对上述两种操作进一步封装到(VirtualHost)中,然后正式的提出消息队列服务器BrokerServer这个概念,对其进行完善和功能封装.请持续关注,谢谢!!!

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/67468.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

介绍另外一个容器技术, Apptainer

一说到容器&#xff0c;我们往往会脱口而出&#xff0c; Docker&#xff0c; 实际上Docker 仅仅是Linux 容器化的一种&#xff0c; 今天介绍的Apptainer 就是另外一种容器技术。 那么Apptainer 具体是一个什么东西呢&#xff1f; 跟Docker 有什么区别呢&#xff1f; 首先&#…

【Python】python通过cmd创建虚拟环境(pip方式)

前言&#xff1a; 在window中使用pipenv创建虚拟环境时&#xff0c;虚拟环境默认的位置是在C:\User\Administrator\.virtualenvs\目录下&#xff1b;那如果我们想配置到自定义位置&#xff0c;该如何修改呢&#xff1f;当我们在进行python项目开发的时候&#xff0c;为了不让项…

tcl学习之路(四)(vivado设计分析)

1.FPGA芯片架构中的对象 在打开elaborated/synthesied/implemented的情况下&#xff0c;可使用如下命令获取期望的SLICE。SLICE分为SLICEL和SLICEM&#xff0c;由LUT、FF、MUX、CARRY组成。 set all_slice [get_sites SLICE*] set col_slice [get_sites SLICEX0Y*] set all_sl…

【资料分享】全志科技T507-H工业核心板规格书

1 核心板简介 创龙科技SOM-TLT507是一款基于全志科技T507-H处理器设计的4核ARM Cortex-A53全国产工业核心板&#xff0c;主频高达1.416GHz。核心板CPU、ROM、RAM、电源、晶振等所有元器件均采用国产工业级方案&#xff0c;国产化率100%。 核心板通过邮票孔连接方式引出MIPI C…

【爱书不爱输的程序猿】内网的摄像头,远程进行访问的方式方法

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 快速远程访问内网的摄像头【内网穿透】 前言一、快速远程访问内网的摄像头1. 打开“允许远程桌面”开关2. 建立TCP-IP隧道3. 获取生成的TCP-IP隧道地址4. 连接另一台电脑4.1 取得该…

Python自动化测试基础必备知识点总结

目录 一、自动化测试的概念 二、Python自动化测试基础必备知识点 一、自动化测试的概念 性能系统负载能力稳定性过载操作下的系统瓶颈自动化测试&#xff0c;使用程序代替人工&#xff0c;可以提高测试效率性&#xff0c;自动化测试能自动化使用代码模拟大量用户&#xff0c…

java+springboot+mysql小区宠物管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的小区宠物管理系统&#xff0c;系统包含超级管理员&#xff0c;系统管理员、用户角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff1b;用户管理&#xff1b;宠物分类&#xff1b;宠物管理&…

【Unity细节】Unity打包后UI面板消失是怎么回事

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity细节和bug ⭐关于物体的动画碰到其他碰撞器后停止播放的问题⭐ 文章目录 ⭐关于物体的动画碰…

Netty:用forEachByte遍历处理ByteBuf中的可读字节

说明 io.netty.buffer.ByteBuf的forEachByte(ByteProcessor processor)用指明的ByteProcessor 遍历ByteBuf中的可读字节。遍历的时候用升序遍历。 -这个函数可以在ByteBuf中寻找某个字节首次出现的位置&#xff0c;或者首次不是某个字节的位置。 如果已经遍历完了可读字节但还…

Spring Cloud Gateway

一 什么是Spring Cloud Gateway 网关作为流量的入口&#xff0c;常用的功能包括路由转发&#xff0c;权限校验&#xff0c;限流等。 Spring Cloud Gateway 是Spring Cloud官方推出的第二代网关框架&#xff0c;定位于取代 Netflix Zuul。相比 Zuul 来说&#xff0c;Spring Clo…

Unity背包系统与存档(附下载链接)

下载地址: https://download.csdn.net/download/qq_58804985/88184776 视频演示: 功能: 拖动物品在背包中自由移动,当物品拖动到其他物品上时,和其交换位置.基于EPPlus的背包数据与位置保存 原理: 给定一个道具池表格与一个背包表格 道具池表格负责存储所有道具的信息 背…

[分享]STM32G070 串口 乱码 解决方法

硬件 NUCLEO-G070RB 工具 cubemx 解决方法 7bit 改为 8bit printf 配置方法 添加头文件 #include <stdio.h> 添加重定向代码 #ifdef __GNUC__#define PUTCHAR_PROTOTYPE int __io_putchar(int ch)#else#define PUTCHAR_PROTOTYPE int fputc(int ch, FILE *f)#endi…

网络安全(黑客)常用工具(附配套资料+工具安装包)

几十年来&#xff0c;攻击方、白帽和安全从业者的工具不断演进&#xff0c;成为网络安全长河中最具技术特色的灯塔&#xff0c;并在一定程度上左右着网络安全产业发展和演进的方向&#xff0c;成为不可或缺的关键要素之一。 话不多说&#xff0c;2022年全球白帽常用工具排行榜…

解决Windows:Call to undefined function exif_imagetype()

很明显,是php安装时没有打开某些扩展,以致不能执行exif_imagetype()这个方法,因此需要打开。 网上很多人说需要打开下面这两个扩展: extension=php_exif.dll extension=php_mbstring.dll 但只说对了一半,我一开始也按照网上文章说的打开这两个扩展,但是还是同样错误。…

Leetcode-每日一题【剑指 Offer 14- II. 剪绳子 II】

题目 2、3、3的三段&#xff0c;此时得到的最大乘积是18。 答案需要取模 1e97&#xff08;1000000007&#xff09;&#xff0c;如计算初始结果为&#xff1a;1000000008&#xff0c;请返回 1。 示例 1&#xff1a; 输入: 2输出: 1解释: 2 1 1, 1 1 1 示例 2: 输入: 10输出…

Tensorflow2-初识

TensorFlow2是一个深度学习框架&#xff0c;可以理解为一个工具&#xff0c;有谷歌的全力支持&#xff0c;具有易用、灵活、可扩展、性能优越、良好的社区资源等优点。 1、环境的搭建 1.1 Anaconda3的安装 https://www.anaconda.com/ Python全家桶&#xff0c;包括Python环境和…

无涯教程-Perl - int函数

描述 此函数返回EXPR的整数元素,如果省略则返回$_。 int函数不进行舍入。如果需要将值四舍五入为整数,则应使用sprintf。 语法 以下是此函数的简单语法- int EXPRint返回值 此函数返回EXPR的整数部分。 例 以下是显示其基本用法的示例代码- #!/usr/bin/perl$int_valint…

[保研/考研机试] KY180 堆栈的使用 吉林大学复试上机题 C++实现

题目链接&#xff1a; 堆栈的使用_牛客题霸_牛客网 描述 堆栈是一种基本的数据结构。堆栈具有两种基本操作方式&#xff0c;push 和 pop。其中 push一个值会将其压入栈顶&#xff0c;而 pop 则会将栈顶的值弹出。现在我们就来验证一下堆栈的使用。 输入描述&#xff1a; 对于…

springboot项目get请求下划线转驼峰@JsonProperty注解失效问题

问题&#xff1a;解决sprigboot项目get请求中有下划线的入参参数&#xff0c;如&#xff1a;first_name&#xff0c;希望在项目中将下划线格式转成firstName&#xff0c;用JsonProperty注解发现失效问题 1.核查&#xff1a;JsonProperty注解对应包是否正确 正确包&#xff1a…

虹科新闻 | 虹科与Power-MI正式建立合作伙伴关系

近日&#xff0c;虹科与Power-MI正式建立合作伙伴关系&#xff0c;双方就工业预测性维护领域进行深入的交流与合作&#xff0c;未来将共同致力于为亚洲市场提供完整的、更高质量的预测性维护解决方案&#xff0c;解决亚洲客户的工业自动化挑战。 虹科与Power-MI都表示十分期待…