模拟实现消息队列项目(系列5) -- 服务器模块(虚拟主机)

目录

前言

1. 创建VirtualHost

1.1 定义虚拟主机的相关属性

1.2 VirtualHost 构造方法 

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

1.3.2 队列操作 

1.4 绑定的创建和删除

1.5 发送消息到指定的队列/交换机

2. 实现路由规则Router

2.1 checkBindingKey()

2.2 checkRoutingKey()

2.3 route()

2.4 单元测试

3. 订阅消息

3.1 添加一个订阅者

3.2 创建订阅者管理类ConsumerManager

3.3 订阅消息小结

4. 消息确认 basicAck()

5. VirtualHost单元测试

结语


前言

        写到这里,内存和硬盘的数据就组织完毕了,接下来我们就会引入在消息队列初识中提出的一个概念 --- 虚拟主机.简单回顾一下虚拟主机的概念: 它类似于MySQL的database,是一个逻辑的集合,一个BrokerServer上可以存在多个VirtualHost.在一个BrokerServer上可以组织不同的数据,可以使用不同的虚拟主机做出逻辑上的区分.本章节就是进行进一步的封装,同时实现一些消息队列的API.这里需要注意的是在RabbitMq中,虚拟主机是可以随便创建和删除的,在本项目目前只是默认只有一个虚拟主机的存在,后续根据情况会进行扩展,这里也提前预留了对于多虚拟主机的管理的数据结构.保证了不同虚拟机中的交换机 队列 绑定 消息都是相互隔离的.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 创建VirtualHost

👇👇👇

注意: 这一块比较重要也比较复杂,所以将代码进行截图加标注的形式进行总结,完整的VirtualHost.class代码会在讲解完给出.

👆👆👆

1.1 定义虚拟主机的相关属性

Router: 是用来定义交换机转发的规则,主要实现的是对routingKey进行验证以及判断,具体的细节会在后面给出.

ConsumerManager: 实现的是管理消费者进行消费.

以上两者就是锁对象了,后续我们要对硬盘和内存进行数据的读写,为了保证操作的原子性,以及线程安全我们会给相关操作进行加锁. 

1.2 VirtualHost 构造方法 

主要就是传入虚拟主机的名字,对该虚拟主机的数据库以及文件信息进行初始化,主要是对数据库进行初始化.具体DataBaseManager.init()

初始化内容如下:

 初始化完成,将硬盘中的数据恢复到内存中

至此前置工作就差不多了.下面对一些重要的方法进行创建.

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

如果交换机不存在就进行创建,存在就直接返回(ExchangeDeclare)

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 判断交换机是否存在: 存在直接返回true即可,不存在就直接创建新的交换机即可.设置交换机的属性,根据是否持久化写入到硬盘,然后在写入到内存.这里需要注意的是,我们一定要先写硬盘再写内存,因为些硬盘是一个失败率很高的事情,经常会因为文件权限问题导致数据写入不进去.如果先写内存,而硬盘写入不进去,就还需要堆内存的数据进行删除,这就很繁琐了.
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/**
     * 1. 创建交换机
     * 如果交换机不存在就进行创建,存在就直接返回
     */
    // 创建交换机
    // 如果交换机不存在, 就创建. 如果存在, 直接返回.
    // 返回值是 boolean. 创建成功, 返回 true. 失败返回 false
    public boolean exchangeDeclare(String exchangeName,
                                   ExchangeType exchangeType,
                                   boolean durable,
                                   boolean autoDelete,
                                   Map<String, Object> arguments) {

        // 1. 更改交换机的名字 交换机的名字 = 虚拟主机 + 交换机
        exchangeName = virtualHostName + exchangeName;
        try{
            synchronized (exchangeLocker){
                // 2. 判定该交换机是否存在
                Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);

                if (existsExchange != null){
                    System.out.println("[VirtualHost] 交换机已经存在!");
                    return true;
                }
                // 3. 不存在,直接进行创建新的交换机
                Exchange exchange = new Exchange();
                exchange.setName(exchangeName);
                exchange.setType(exchangeType);
                exchange.setDurable(durable);
                exchange.setAutoDelete(autoDelete);
                exchange.setArguments(arguments);
                // 4. 将构造好的交换机进行写入硬盘(含有持久化信息的交换机)  先写硬盘后写内存
                if (durable){
                    diskDataCenter.insertExchange(exchange);
                }
                // 5. 将交换机写入到内存中
                memoryDataCenter.insertExchange(exchange);
                System.out.println("[VirtualHost] 交换机创建完成! exchangeName="+exchangeName);
                // 上述操作为什么不先写内存后写硬盘?
                // 因为写硬盘操作比较容易出现异常,如果写入硬盘失败,写入内存成功,再进行从内存中进行删除就比较麻烦了
            }
            return true;
        } catch (Exception e){
            System.out.println("[VirtualHost] 交换机创建失败! exchangeName="+exchangeName);
            e.printStackTrace();
            return false;
        }
    }

删除交换机

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 根据交换机的名字得到交换机对象,判断交换机是否为空,不为空进行删除操作,还是先进行删除硬盘的数据,再删除内存中数据
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/**
     * 2.删除交换机
     * @param exchangeName 交换机名字
     * @return
     */
    public boolean exchangeDelete(String exchangeName) {
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                // 1. 先找到对应的交换机.
                Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
                if (toDelete == null) {
                    throw new MqException("[VirtualHost] 交换机不存在无法删除!");
                }
                // 2. 删除硬盘上的数据
                if (toDelete.isDurable()) {
                    diskDataCenter.deleteExchange(exchangeName);
                }
                // 3. 删除内存中的交换机数据
                memoryDataCenter.deleteExchange(exchangeName);
                System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);
            e.printStackTrace();
            return false;
        }
    }

1.3.2 队列操作 

针对队列创建和删除操作,这里就不做过多的解释了,过程跟上述交换机的操作一样. 下面给出代码:

/**
     * 3. 创建队列
     * @param queueName 队列名
     * @param durable 持久化
     * @param exclusive 队列独有
     * @param autoDelete 自动删除
     * @param arguments 其他声明
     * @return
     */
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) {
        // 把队列的名字, 给拼接上虚拟主机的名字.
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 判定队列是否存在
                MSQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if (existsQueue != null) {
                    System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);
                    return true;
                }
                // 2. 创建队列对象
                MSQueue queue = new MSQueue();
                queue.setName(queueName);
                queue.setDurable(durable);
                queue.setExclusive(exclusive);
                queue.setAutoDelete(autoDelete);
                queue.setArguments(arguments);
                // 3. 写硬盘
                if (durable) {
                    diskDataCenter.insertQueue(queue);
                }
                // 4. 写内存
                memoryDataCenter.insertQueue(queue);
                System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 4. 删除队列
     * @param queueName 队列名
     * @return
     */
    public boolean queueDelete(String queueName) {
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 根据队列名字, 查询下当前的队列对象
                MSQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);
                }
                // 2. 删除硬盘数据
                if (queue.isDurable()) {
                    diskDataCenter.deleteQueue(queueName);
                }
                // 3. 删除内存数据
                memoryDataCenter.deleteQueue(queueName);
                System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

1.4 绑定的创建和删除

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,不为空抛出异常
  • 3. 绑定对象为空: 1, 判断绑定的bindingKey是否合法. 2.合法就创建绑定对象,设置响应的绑定属性.
  • 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.
  • 5. 写入硬盘,再写内存
  • 6. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.

 这一步我们在Router进行设置一个方法,等下面更加详细的介绍router类.

/**
     * 5. 创建绑定
     * @param queueName 队列名字
     * @param exchangeName 交换机名字
     * @param bindingKey 绑定规则
     * @return
     */
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
        // 1. 转换交换机和队列的名字
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker){
                synchronized (queueLocker){
                    // 2. 判断交换机和队列是否已经绑定成功
                    Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);
                    if (existBinding != null){
                        throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);
                    }

                    // 3. 验证bing中的bindingKey 是否合法
                    if (!router.checkBindingKey(bindingKey)){
                        throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
                    }
                    // 4. 创建绑定对象
                    Binding binding = new Binding();
                    binding.setExchangeName(exchangeName);
                    binding.setQueueName(queueName);
                    binding.setBindingKey(bindingKey);
                    // 5. 获取对应的交换机和队列,判断是否是存在的
                    MSQueue queue = memoryDataCenter.getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
                    }
                    Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                    if (exchange == null) {
                        throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
                    }
                    // 5. 先写硬盘
                    if (queue.isDurable() && exchange.isDurable()) {
                        diskDataCenter.insertBinding(binding);
                    }
                    // 6. 写入内存
                    memoryDataCenter.insertBinding(binding);
                    System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName
                            + ", queueName=" + queueName);
                }
                return true;
            }
        } catch (MqException e) {
            System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName
                    + ", queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

删除绑定 

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,为空抛出异常
  • 3. 从硬盘进行删除,从内存进行删除
  • 4. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.这里需要注意的是,我们对交换机和队列进行加锁的时候,顺序要和创建绑定的顺序是一致的.不然会出现死锁的现象.
/**
     * 6. 删除绑定
     * @param queueName 队列名
     * @param exchangeName 交换机名字
     * @return
     */
    public boolean queueUnbind(String queueName, String exchangeName) {
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                synchronized (queueLocker) {
                    // 1. 获取 binding 看是否已经存在~
                    Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (binding == null) {
                        throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
                    }
                    // 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.
                    diskDataCenter.deleteBinding(binding);
                    // 3. 删除内存的数据
                    memoryDataCenter.deleteBinding(binding);
                    System.out.println("[VirtualHost] 删除绑定成功!");
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 删除绑定失败!");
            e.printStackTrace();
            return false;
        }
    }

1.5 发送消息到指定的队列/交换机

发布消息其实就是把消息发送到指定的交换机中,然后根据绑定关系发送到指定的队列

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字 
  • 2. 检查消息的routingKey是否合法,不合法抛出异常
  • 3. 根据传入的交换机的名字进行查找交换机对象,然后判断交换机的类型,而进行下一步的行为.
  • 4. 如果交换机类型为DIRECT,则表示为直接交换机,则把routingKey作为队列的名字,先进行根据传入的参数,创建消息对象,然后按照刚才组合好的队列名字进行查找队列,查找队列进行发送消息,没查找进行抛出异常.发送消息的时候判断消息是否是持久化的,是持久化就往硬盘中写入,否则只写内存就可以.发送完消息之后,要进行重要的操作.通知消费者进行消费消息.这一块是在管理消费者进行消费消息实现的.
  • 5. 如果交换机类型为Fanout 或者 Topic 我们需要在Router中进行设置相应的路由规则.
/**
     * 9. 发送消息到指定的队列/交换机
     */
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
        try {
            // 1. 转换交换机的名字
            exchangeName = virtualHostName + exchangeName;
            // 2. 检查 routingKey 是否合法.
            if (!router.checkRoutingKey(routingKey)) {
                throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
            }
            // 3. 查找交换机对象
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange == null) {
                throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
            }
            // 4. 判定交换机的类型
            if (exchange.getType() == ExchangeType.DIRECT) {
                // 按照直接交换机的方式来转发消息
                // 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
                // 此时, 可以无视绑定关系.
                String queueName = virtualHostName + routingKey;
                // 5. 构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 6. 查找该队列名对应的对象
                MSQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
                }
                // 7. 队列存在, 直接给队列中写入消息
                sendMessage(queue, message);
            } else {
                // 按照 fanout 和 topic 的方式来转发.
                // 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
                ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                    // 1) 获取到绑定对象, 判定对应的队列是否存在
                    Binding binding = entry.getValue();
                    MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                    if (queue == null) {
                        // 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
                        // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
                        System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
                        continue;
                    }
                    // 2) 构造消息对象
                    Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                    // 3) 判定这个消息是否能转发给该队列.
                    //    如果是 fanout, 所有绑定的队列都要转发的.
                    //    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
                    if (!router.route(exchange.getType(), binding, message)) {
                        continue;
                    }
                    // 4) 真正转发消息给队列
                    sendMessage(queue, message);
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 消息发送失败!");
            e.printStackTrace();
            return false;
        }
    }

    private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {
        // 此处发送消息, 就是把消息写入到 硬盘 和 内存 上. 根据此条消息时是否要进行持久化进行判断
        int deliverMode = message.getDeliverMode();
        // deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.
        if (deliverMode == 2) {
            diskDataCenter.sendMessage(queue, message);
        }
        // 写入内存
        memoryDataCenter.sendMessage(queue, message);

        // 此处还需要补充一个逻辑, 通知消费者可以消费消息了.
        consumerManager.notifyConsume(queue.getName());
    }

2. 实现路由规则Router

 这个类我们实现具体的路由转发规则,对之前还没实现的方法进行实现.还未实现的方法具体如下:

1. 在创建绑定的时候我们对bindingKey进行验证是否合法checkBindingKey();

2. 在往交换机进行发送消息的时候,我们对消息的routingKey进行验证\checkRoutingKey();

3. 当消息插入到交换机之后,根据交换机的主题往队列中分发消息的时候.对不同主题的交换机实现不同的路由规则route();

以上是我们在虚拟主机类中还没有进行实现的方法.下面进行一一实现:

2.1 checkBindingKey()

 以下是我们合法的BindingKey的规则

/**
     * 验证bindingKey是否是合法的
     *     1. 数字, 字母, 下划线
     *     2. 使用 . 分割成若干部分
     *     3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
     * @return
     */
    public boolean checkBindingKey(String bindingKey){
        if (bindingKey.length() == 0) {
            // 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.
            return true;
        }
        // 检查字符串中不能存在非法字符
        for (int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
                continue;
            }
            return false;
        }
        // 检查 * 或者 # 是否是独立的部分.
        // aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.
        String[] words = bindingKey.split("\\.");
        for (String word : words) {
            // 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
            if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
                return false;
            }
        }
        // 约定一下, 通配符之间的相邻关系(人为约定的).
        // 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~
        // 1. aaa.#.#.bbb    => 非法
        // 2. aaa.#.*.bbb    => 非法
        // 3. aaa.*.#.bbb    => 非法
        // 4. aaa.*.*.bbb    => 合法
        for (int i = 0; i < words.length - 1; i++) {
            // 连续两个 ##
            if (words[i].equals("#") && words[i + 1].equals("#")) {
                return false;
            }
            // # 连着 *
            if (words[i].equals("#") && words[i + 1].equals("*")) {
                return false;
            }
            // * 连着 #
            if (words[i].equals("*") && words[i + 1].equals("#")) {
                return false;
            }
        }
        return true;
    }

2.2 checkRoutingKey()

验证routingKey是合法的.routingKey是与BindingKey进行匹配的,所以必须是具体的.        

 

/**
     * 验证routingKey是否是合法的
     *      1. 数字, 字母, 下划线
     *      2. 使用 . 分割成若干部分
     * @return
     */
    public boolean checkRoutingKey(String routingKey){
        if (routingKey.length() == 0){
            // 空字符串,合法的情况  当交换机的类型为fanout的时候,是不需要的,所以可以设置为""
            return true;
        }
        for (int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            // 判定该字符是否是大写字母
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 判定该字母是否是小写字母
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            // 判定该字母是否是阿拉伯数字
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            // 判定是否是 _ 或者 .
            if (ch == '_' || ch == '.') {
                continue;
            }
            // 该字符, 不是上述任何一种合法情况, 就直接返回 false
            return false;
        }
        // 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 true
        return true;
    }

2.3 route()

判断交换机的类型进而得出是否可以进行给队列进行转发消息.

1. 交换机的类型为fanout.代表给交换机进行绑定的所有队列进行转发消息.

2. 交换机的类型为Topic,需要对routingKey进行判断.进而设置给队列转发消息

/**
     *  判断是否可以给绑定的交换机进行转发消息
     * @return
     */
    public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {
        if (type == ExchangeType.FANOUT){
            // 如果交换机类型为 fan-out 就直接进行返回true,表示转发给当前当前绑定的所有对列
            return true;
        }else if(type == ExchangeType.TOPIC){
            // 如果是主题交换机,规则就比较复杂
            return routerTopic(binding,message);
        }else {
            throw new MqException("[Router] 交换机类型有误 exchangeType=" + type);
        }
    }

 对于主题交换机,我们进行详细的讲解.

  • 1. 将bindingKey 和 routingKey 进行按照"."进行分割成字符串数组
  • 2. 定义下标进行遍历数组
  • 3. 遍历两个数组,主要分为5种情况.
    • 3.1  当bindingKey遇到*号时直接跳过*,两个下标都进行自增1
    • 3.2 当bindingKey遇到#号,如果此时#号是bindingKey的最后一位,那么直接返回true
    • 3.3 当bindingKey遇到#号,如果此时#号不是最后一位,就去匹配#号下一位在routingKey的部分,匹配到了就将routingIndex指到匹配的位置,进而在进行上述循环,如果没匹配到就返回false
    • 3.4 此时没有遇见通配符,所有的内容部都要进行匹配上,匹配不上就返回false
    • 3.5 最后判断此时两个数组的下标是否都比较到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
/**
     * 用来实现:topic类型的交换机的转发规则
     * @param binding  绑定信息对象
     * @param message  消息对象
     * @return
     */
    private boolean routerTopic(Binding binding, Message message) {
        // 1. 将bindingKey 和 routingKey 进行按照"."进行分割
        String[] bindingTokens = binding.getBindingKey().split("\\.");
        String[] routingTokens = message.getRoutingKey().split("\\.");
        // 2. 定义用来遍历数组的下标
        int bindingIndex = 0;
        int routingIndex = 0;
        // 3. 进行遍历两个数组
        while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){
            if (bindingTokens[bindingIndex].equals("*")){
                // (1.)遇到*号两个下标直接跳过 * 可以匹配一个部分
                bindingIndex++;
                routingIndex++;
            }else if (bindingTokens[bindingIndex].equals("#")){
                bindingIndex += 1;
                // (2.)遇到#号   # 可以匹配多个部分
                if (bindingIndex == bindingTokens.length){
                    // (3.)当遇到#号,#号的下标为最后一个元素的时候,直接返回true,因为可以直接匹配后面所有的内容
                    return true;
                }else {
                    // (4.)当遇到#号,后面后还有内容的时候,就去匹配#号下一个部分在routingKey的部分,
                    // 匹配了就直接将bindingIndex指到bindingTokens下一个部分,同时将routingIndex指到匹配的地方
                    // 没匹配配到就返回false
                    routingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);
                    if (routingIndex == -1){
                        return false;
                    }
                    bindingIndex++;
                    routingIndex++;
                }
            }else {
                // (5.)此时没有遇见通配符,所有的内容部都要进行匹配上
                if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){
                    return false;
                }
                bindingIndex++;
                routingIndex++;
            }
        }
        // (6.)最后判断此时两个数组的下标是否都比较到了末尾
        // 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的
        if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
            return true;
        }
        return false;
    }


    /**
     * 给定起始下标去在一个数组中寻找指定数组元素,找到就返回该元素在数组的下标,没找到就返回-1;
     * @param routingIndex   起始下标
     * @param routingTokens  目标数组
     * @param bindingToken   目标元素
     * @return
     */
    private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {
        for (int i = routingIndex; i < routingTokens.length; i++) {
            if (routingTokens[i].equals(bindingToken)){
                return i;
            }
        }
        return -1;
    }

以上就是整个Router的所有方法.我们对上述代码进行单元测试.

2.4 单元测试

 

 

package com.example.demo.mqserver.core;

import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:测试交换机的转发规则(交换机类型为topic)
 * User: YAO
 * Date: 2023-08-01
 * Time: 13:56
 */
@SpringBootTest
class RouterTest {

    private Router router = new Router();
    private Binding binding = null;
    private Message message = null;

    @BeforeEach
    public void setUp() {
        binding = new Binding();
        message = new Message();
    }

    @AfterEach
    public void tearDown() {
        binding = null;
        message = null;
    }

    /**
     *  [测试用例]
     *      binding key          routing key         result
     *      aaa                  aaa                 true
     *      aaa.bbb              aaa.bbb             true
     *      aaa.bbb              aaa.bbb.ccc         false
     *      aaa.bbb              aaa.ccc             false
     *      aaa.bbb.ccc          aaa.bbb.ccc         true
     *      aaa.*                aaa.bbb             true
     *      aaa.*.bbb            aaa.bbb.ccc         false
     *      *.aaa.bbb            aaa.bbb             false
     *      #                    aaa.bbb.ccc         true
     *      aaa.#                aaa.bbb             true
     *      aaa.#                aaa.bbb.ccc         true
     *      aaa.#.ccc            aaa.ccc             true
     *      aaa.#.ccc            aaa.bbb.ccc         true
     *      aaa.#.ccc            aaa.aaa.bbb.ccc     true
     *      #.ccc                ccc                 true
     *      #.ccc                aaa.bbb.ccc         true
     */
    @Test
    public void test1() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test2() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test3() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test4() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test5() throws MqException {
        binding.setBindingKey("aaa.bbb.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test6() throws MqException {
        binding.setBindingKey("aaa.*");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test7() throws MqException {
        binding.setBindingKey("aaa.*.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test8() throws MqException {
        binding.setBindingKey("*.aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test9() throws MqException {
        binding.setBindingKey("#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test10() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test11() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test12() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test13() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test14() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test15() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test16() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
}

 单元测试通过.

3. 订阅消息

        在我们的虚拟主机中进行添加方法完成消息的订阅.要想完成消息的订阅,就需要在消息队列中新建一个列表consumerEnvList用来存储消费者的信息,当有消息进行存储到队列的时候,此时选出消费者进行消费消息.而消费者消费信息的这个环境需要单独定义一个类ConsumerEnv进行描述.以上这个消费信息的过程我们定义一个类ConsumerManager进行管理这些逻辑.

3.1 添加一个订阅者

给队列添加消费者,当队列接收到消息的时候,就要将消息推送给订阅者

public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        // 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

此处插入的参数Consumer相当于一个回调函数,就是一个函数式接口.我们在common中进行定义Consumer

@FunctionalInterface
public interface Consumer {
    // Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.
    // 通过这个方法把消息推送给对应的消费者.
    // (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)
    void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}

定义这个回调函数表示:收到消息之后要对消息进行处理.

3.2 创建订阅者管理类ConsumerManager

1.  这个类是和虚拟主机是一一对应的,每个虚拟主机都有一个管理消费者的对象,而管理的消费者的对象对应的是与之对应的.

2. 我们采用一个堵塞队列来记录收到消息的的队列名字,每次队列收到消息,就会往这个队列中进行添加队列的名字,然后后续进行通知这个队列的消费者进行消费消息.

3. 单独使用一个线程池用来执行消息的回调.(主要是获取到消息之后,给响应设置消息的属性与消息本体发送给客户端.)

4. 我们设置一个扫描线程,从堵塞队列不断地取出元素,进而找到队列,在这个队列进行消费消息,并且设置扫描线程为后台线程,这样就不会阻止进程的结束.

public class ConsumerManager {

    // 1. 持有虚拟主机对象的引用,用来操作数据
    private VirtualHost parent;

    // 2. 指定一个线程池,负责执行具体的回调任务
    private ExecutorService workPool = Executors.newCachedThreadPool();

    // 3. 存放令牌的队列,存放接收到消息的队列名字(堵塞队列)
    // 当这个堵塞队列一接收到队列的名字,扫描线程就会就会找到虚拟主机,然后找到这个队列,进而消费消息
    private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();

    // 4. 扫描线程  (关注令牌队列中添加了哪些队列的名字,就知道哪些队列添加了消息,取出消息,进而交给线程池,进行消费这些消息)
    private Thread scannerThread = null;
}

1. 给堵塞队列设置接口,供虚拟主机进行调用.

/**
     * 1. 收到消息,通知消费者进行消费消息(将消息对应的队列名字添加到堵塞队列中)
     */
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

2. 实现扫描线程

public ConsumerManager(VirtualHost p) {
        parent = p;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
                    // 1. 拿到令牌
                    String queueName = tokenQueue.take();
                    // 2. 根据令牌, 找到队列
                    MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
                    }
                    // 3. 从这个队列中消费一个消息.
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
        // 把线程设为后台线程.
        // 后台线程不会影响进程的结束
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

3. 添加消费者环境ConsumerEnv到指定的队列

我们在common中实现这个类

@Data
public class ConsumerEnv {
    // 1. 消费者的身份标识
    private String consumerTag;
    // 2. 消费者消费队列的名字
    private String queueName;
    // 3. 是否自动应答
    private boolean autoAck;
    // 4. 通过这个回调函数来处理收到的消息.
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}

(1) 按照指定的队列名找到这个类.

(2) 创建消费者环境对象,进行添加,同时如果这个队列的消息存在,就需要进行消费这些信息,调用consumeMessage()方法传入队列的名字.

/**
     * 2. 新增Consumer对象到指定的对列
     */
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 找到对应的队列.
        MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
        if (queue == null) {
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            // 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
            int n = parent.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一条消息.
                consumeMessage(queue);
            }
        }
    }

4. 消费消息 consumeMessage()

(1) 因为一个队列中可能会有多个消费者,我们按照轮询的方式进行挑选消费者进行消费消息,在队列的类中,设置方法chooseConsumer()

/**
     * 挑选订阅者 进行消费队列中的消息 (轮询的方式)
     * @return
     */
    public ConsumerEnv chooseConsumer(){
        // 1. 如果当前队列对应的消费者的数量为0,直接返回null,表示没有筛选到消费者
        if (consumerEnvList.size() == 0){
            return null;
        }
        // 2. 使用当前订阅到的下标进行对消费者列表取模,然后进行挑选消费者记性消费消息,实现消息的轮询消费
        int index = consumerSeq.get() % consumerEnvList.size();
        consumerSeq.getAndIncrement();
        return consumerEnvList.get(index);
    }

(2) 从队列中取出消息

(3) 把消息带入到回调方法,交给线程池进行执行

/**
     * 消费者进行消费信息
     * @param queue
     */
    private void consumeMessage(MSQueue queue) {
        // 1. 按照轮询的方式, 找个消费者出来.
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if (luckyDog == null) {
            // 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        System.out.println(message);
        if (message == null) {
            // 当前队列中还没有消息, 也不需要消费.
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
        workPool.submit(() -> {
            try {
                // 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                // 2. 真正执行回调操作
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
                        message.getBody());
                // 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.
                //    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.

                if (luckyDog.isAutoAck()) {
                    // 此时是自动应答,表示直接删除
                    // 1) 删除硬盘上的消息
                    if (message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    // 2) 删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());

                    // 3) 删除内存中消息中心里的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageID());
                    System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

3.3 订阅消息小结

 4. 消息确认 basicAck()

 此处是消费者在回调函数中对消息进行处理之后再回调函数中执行的.

  • 1. 获取要删除消息以及所在队列的对象
  • 2. 删除硬盘和内存的数据
  • 3. 删除未确认消息集合的数据
/**
     * 消费者消费完消息进行手动应答
     * @return
     */
    public boolean basicAck(String queueName, String messageId){
        queueName = virtualHostName + queueName;
        try {
            // 1. 获取要删除消息以及所在队列的对象
            Message message = memoryDataCenter.getMessage(messageId);
            if (message == null){
                throw new MqException("[VirtualHost] 确认的信息不存在 messageId="+messageId);
            }
            MSQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null){
                throw new MqException("[VirtualHost] 确认的队列不存在 queueName="+queueName);
            }

            // 2
            // 1.)删除硬盘中的数据
            if(message.getDeliverMode() == 2){
                diskDataCenter.deleteMessage(queue,message);
            }
            // 2.) 删除消息中心的消息
            memoryDataCenter.removeMessage(message.getMessageID());

            // 3.) 删除委未确认消息集合的消息
            memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());
            System.out.println("[VirtualHost] basicAck成功 消息被确认成功  queueName=" + queueName
            + ",messageId:." + messageId);
            return true;
        } catch (MqException | ClassNotFoundException | IOException e) {
            e.printStackTrace();
            System.out.println("[VirtualHost] basicAck失败 消息被确认失败  queueName=" + queueName
                    + ",messageId:." + messageId);
            return false;
        }
    }

至此以上就是VirtualHost的全部内容,内容很多,很繁琐需要,静下心来仔细的体会.

5. VirtualHost单元测试

 

package com.example.demo.mqserver;

import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:虚拟主机的操作测试
 * User: YAO
 * Date: 2023-08-01
 * Time: 18:26
 */
class VirtualHostTest {@Autowired
    public VirtualHost  virtualHost = null;

    @BeforeEach
    void setUp() {
        DemoApplication.context = SpringApplication.run(DemoApplication.class);
        // 创建好虚拟主机对象
        virtualHost = new VirtualHost("default");
    }

    @AfterEach
    void tearDown() throws IOException {
        DemoApplication.context.close();
        //把硬盘的目录进行删除
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }

    @Test
    void exchangeDeclare() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);
    }

    @Test
    void exchangeDelete() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueDeclare() {
        boolean ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        Assertions.assertTrue(ok);
    }

    @Test
    void queueDelete() {
        boolean ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueDelete("testQueue");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueBind() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueBind("testQueue","testExchange",
                "testBindingKey");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueUnbind() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueBind("testQueue","testExchange",
                "testBindingKey");
        ok = virtualHost.queueUnbind("testQueue","testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    void basicPublish() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));
        Assertions.assertTrue(ok);
    }

    /**
     * 1. 先订阅, 后发布消息
     */
    @Test
    public void testBasicConsume1() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先订阅队列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消费者自身设定的回调方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 再发送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);
    }

    /**
     *  先发送消息, 后订阅队列.
     */
    @Test
    public void testBasicConsume2() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先发送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再订阅队列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消费者自身设定的回调方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);
        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeFanout() throws InterruptedException {
        // 创建一个交换机,并且绑定两个队列
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue1", "testExchange", "");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue2", "testExchange", "");
        Assertions.assertTrue(ok);
        // 发布消息发到交换机
        ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 两个消费者订阅上述的两个队列.
        ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeTopic() throws InterruptedException {
        // 1. 创建交换机(主题交换机)
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
        Assertions.assertTrue(ok);
        // 2. 创建队列
        ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
        Assertions.assertTrue(ok);
        // 3. 将交换机和队列进行绑定(设置bindingKey)
        ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
        Assertions.assertTrue(ok);
        // 4. 发布消息(设置routingKey)
        ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
        Assertions.assertTrue(ok);
        // 5. 订阅消息
        ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicAck() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先发送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再订阅队列 [要改的地方, 把 autoAck 改成 false]
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消费者自身设定的回调方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);

                // [要改的地方, 新增手动调用 basicAck]
                boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
                Assertions.assertTrue(ok);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }
}

结语

        本文将整个VirtualHost进行了实现,实现了供BrokerServer调用的API.基础的消息队列框架已经搭建好了,接下来就是搭建服务器和客户端了.请持续关注,谢谢!!!

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70467.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【软件测试】接口测试工具APIpost

说实话&#xff0c;了解APIpost是因为&#xff0c;我的所有接口相关的文章下&#xff0c;都有该APIpost水军的评论&#xff0c;无非就是APIpost是中文版的postman&#xff0c;有多么多么好用&#xff0c;虽然咱也还不是什么啥网红&#xff0c;但是不知会一声就乱在评论区打广告…

springboot项目问题

目录标题 问题后端1.[mybatis报错Parameter start not found. Available parameters are [1, 0, param1, param2]](https://www.cnblogs.com/josephcnblog/articles/7077244.html) 知识后端1. [Select 数据表的字段与实体类的属性值](https://www.cnblogs.com/yanguobin/p/1191…

【SpringBoot学习笔记】04. Thymeleaf模板引擎

模板引擎 templates下的只能通过Controller来跳转&#xff0c;templates前后端分离&#xff0c;需要模板引擎thymeleaf支持 模板引擎的作用就是我们来写一个页面模板&#xff0c;比如有些值呢&#xff0c;是动态的&#xff0c;我们写一些表达式。而这些值&#xff0c;从哪来呢…

【JavaWeb】MySQL基础操作

1 通用语法规则 SQL语句可以单行或者多行书写&#xff0c;以分号结尾SQL语句不区分大小写&#xff0c;关键字建议使用大写单行注释 --注释内容&#xff08;通用&#xff09; # 注释内容&#xff08;MySQL独有&#xff09;多行注释 /* 注释内容 */ 2 语句 数据库 -- 查…

【uniapp】uniapp使用微信开发者工具制作骨架屏:

文章目录 一、效果&#xff1a;二、过程&#xff1a; 一、效果&#xff1a; 二、过程&#xff1a; 【1】微信开发者工具打开项目&#xff0c;生成骨架屏&#xff0c;将wxml改造为vue页面组件&#xff0c;并放入样式 【2】页面使用骨架屏组件 【3】改造骨架屏&#xff08;去除…

微信小程序开发价格

小程序开发费用 小程序的开发费用是很多企业和个人在规划项目时需要重点考虑的一个方面。本文将从微信认证费、域名、服务器、程序开发费用、微信支付费率以及维护费用等多个角度为大家分析小程序开发费用的组成。 1. 微信认证费&#xff1a;作为小程序的一种信任凭证&#xf…

【Spring】实现FactoryBean接口

FactoryBean FactoryBean是一个接口&#xff0c;需要创建一个类来实现该接口&#xff0c;该接口中有三个方法&#xff0c;通过重写其中的两个方法&#xff0c;获得一个对象&#xff0c;三个方法分别是&#xff1a; 1.getObject():通过一个对象交给IOC容器管理2.getObjectType(…

ArcGIS在洪水灾害普查、风险评估及淹没制图中应用教程

详情点击链接&#xff1a;ArcGIS在洪水灾害普查、风险评估及淹没制图中应用教程 一&#xff1a;洪水普查技术规范 1.1 全国水旱灾害风险普查实施方案 1.2 洪水风险区划及防治区划编制技术要求 1.3 山丘区中小河流洪水淹没图编制技术要求 二&#xff1a;ArcGIS及数据管理 …

REDIS主从配置

目录 前言 一、概述 二、作用 三、缺点 四、redis主从复制的流程 五、搭建redis主从复制 总结 前言 Redis的主从配置是指在Redis集群中&#xff0c;将一个Redis节点配置为主节点&#xff08;master&#xff09;&#xff0c;其他节点配置为从节点&#xff08;slave&#xff09;…

用友移动管理系统 任意文件上传漏洞复现(HW0day)

0x01 产品简介 用友移动系统管理是用友公司推出的一款移动办公解决方案&#xff0c;旨在帮助企业实现移动办公、提高管理效率和员工工作灵活性。它提供了一系列功能和工具&#xff0c;方便用户在移动设备上管理和处理企业的系统和业务。 0x02 漏洞概述 用友移动管理系统 uploa…

【学会动态规划】买卖股票的最佳时机 III(17)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

【ARM64 常见汇编指令学习 15 -- ARM 标志位的学习】

文章目录 ARM 标志位介绍Zero Condition flag(零标志位)零标志位判断实例 上篇文章&#xff1a;ARM64 常见汇编指令学习 14 – ARM 汇编 .balign,.balignw,.balign 伪指令学习 下篇文章&#xff1a;ARM64 常见汇编指令学习 16 – ARM64 SMC 指令 ARM 标志位介绍 在ARM架构中&am…

JavaWeb-Servlet服务连接器(二)

目录 Request&#xff08;获取请求信息&#xff09; 1.获取请求行内容 2.解决乱码问题 3.获取请求头部分 4.获取请求体 5.其他功能 Request&#xff08;获取请求信息&#xff09; 工作流程&#xff1a; 1.通过请求的url的资源路径&#xff0c;tomcat会生成相应的Servlet实…

Mysql:Access denied for user ‘root‘@‘localhost‘ (using password:YES)解决方案

最近在配置Maven以及Mybatis时&#xff0c;连接localhost数据库时出现无法连接&#xff0c;用cmd测试时报错&#xff1a;Access denied for user ‘ODBC’‘localhost’ (using password: NO)&#xff0c;这个意思就是不允许远程访问&#xff0c;一开始笔者进入mysql试了一下是…

由于找不到d3dx9_42.dll,无法继续执行代码。重新安装程序可能会解决此问题

d3dx9_42.dll是一个动态链接库文件&#xff0c;它是Microsoft DirectX 9的一部分。这个文件包含了DirectX 9的一些函数和资源&#xff0c;用于支持计算机上运行基于DirectX 9的应用程序和游戏。它通常用于提供图形、音频和输入设备的支持&#xff0c;以及其他与图形和游戏相关的…

Playable 动画系统

Playable 基本用法 Playable意思是可播放的&#xff0c;可运行的。Playable整体是树形结构&#xff0c;PlayableGraph相当于一个容器&#xff0c;所有元素都被包含在里面&#xff0c;图中的每个节点都是Playable&#xff0c;叶子节点的Playable包裹原始数据&#xff0c;相当于输…

Vue+SpringBoot后台管理系统:Vue3+TypeScript项目搭建(一)

写在开始:一个搬砖程序员的随缘记录文章目录 一、Node安装二、Vue CLI安装三、相关的版本四、创建Vue3TypeScript项目五、Vue项目初始化六、项目启动 一、Node安装 查看Note版本 node -v查看npm版本 npm -v然后将npm升级至最新版本 npm -g install npm将npm下载源换至http:…

RS-232标准

目录 1、概述2、RS-232接口的特点3、RS-232接口协议【仿真】 1、概述 RS-232接口是在1970年由美国电子工业协会(EIA)联合贝尔系统、调制解调器厂家及计算机终端生产厂家共同制定的用于串行通讯的标准。它的全名是“数据终端设备(DTE)和数据通讯设备(DCE)之间串行二进制数据交换…

C语言 二级指针和多级指针

什么是二级指针&#xff1f; 假设&#xff1a; int a 10;int * p &a;如上&#xff0c;p是指针变量&#xff0c;寄存的是a的地址&#xff0c;指向的是元素a 那么&#xff0c;指针变量p有地址吗&#xff1f;指针变量p的指针指向的是&#xff1f; int * * pp &p; …

【Spring Boot 源码学习】自动装配流程源码解析(上)

自动装配流程源码解析&#xff08;上&#xff09; 引言往期内容主要内容1. 自动配置开关2. 加载自动配置组件3. 自动配置组件去重 总结 引言 上篇博文&#xff0c;笔者带大家从整体上了解了AutoConfigurationImportSelector 自动装配逻辑的核心功能及流程&#xff0c;由于篇幅…