【JavaEE Spring 项目】消息队列的设计

消息队列的设计

  • 一、消息队列的背景知识
  • 二、需求分析
    • 核心概念
    • ⼀个⽣产者, ⼀个消费者
    • N 个⽣产者, N 个消费者
    • Broker Server 中的相关概念
    • 核⼼ API
    • 交换机类型 (Exchange Type)
    • 持久化
    • ⽹络通信
    • 消息应答
  • 三、 模块划分
  • 四、 项⽬创建
  • 五、创建核心类
    • 创建 Exchange
    • 创建 MSGQUeue
    • 创建 Binding
    • 创建Message
  • 六、 数据库设计
    • 配置 SQLite
    • 实现创建表
    • 实现数据库基本操作
    • 实现 DataBaseManager
    • 测试 DataBaseManager
  • 七、消息存储设计
    • 设计思路
    • 创建 MessageFileManager 类
    • 实现统计⽂件读写
    • 实现创建队列⽬录
    • 实现删除队列⽬录
    • 检查队列⽂件是否存在
    • 实现消息对象序列化/反序列化
    • 实现写⼊消息⽂件
    • 实现删除消息
    • 实现消息加载
    • 实现垃圾回收(GC)
    • 测试 MessageFileManager
  • ⼋、 整合数据库和⽂件
    • 创建 DiskDataCenter
    • 封装 Exchange ⽅法
    • 封装 Queue ⽅法
    • 封装 Binding 方法
    • 封装 Message ⽅法
  • 九、 内存数据结构设计
    • 创建 MemoryDataCenter
    • 封装 Exchange ⽅法
    • 封装 Queue ⽅法
    • 封装 Binding ⽅法
    • 封装 Message ⽅法
    • 针对未确认的消息的处理
    • 实现重启后恢复内存
    • 测试 MemoryDataCenter
  • ⼗、 虚拟主机设计
    • 创建 VirtualHost
    • 实现构造⽅法和 getter
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发布消息
    • 路由规则
    • 订阅消息
    • 消息确认
    • 测试 VirtualHost
  • ⼗⼀、 ⽹络通信协议设计
    • 明确需求
    • 设计应⽤层协议
    • 定义 Request / Response
    • 定义参数⽗类
    • 定义返回值⽗类
    • 定义其他参数类
  • ⼗⼆、 实现 BrokerServer
    • 创建 BrokerServer 类
    • 启动/停⽌服务器
    • 实现处理连接
    • 实现 readRequest
    • 实现 writeResponse
    • 实现处理请求
    • 实现 clearClosedSession
  • ⼗三、 实现客⼾端
    • 创建 ConnectionFactory
    • Connection 和 Channel 的定义
    • 封装请求响应读写操作
    • 创建 channel
    • 发送请求
    • 关闭 channel
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发送消息
    • 订阅消息
    • 确认消息
    • 处理响应
    • 关闭 Connection
    • 测试代码
  • 项目结果

一、消息队列的背景知识

  • Java标准库中有提供阻塞队列的数据结构, 阻塞队列最重要的用途是实现生产者消费者模型;
  • 生产者消费者模型存在诸多好处, 是后端开发的常用编程方式
    • 解耦合
    • 削峰填谷
  • 在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求, 因此我们通常会把阻塞队列, 封装成一个独立的服务器程序, 并且赋予其更丰富的功能, 这样的程序就被称为 消息队列 (Message Queue, MQ)
  • 市场上有许多消息队列, 如
    • RabbitMQ
    • Kafka
  • 这里仿照 RabbitMQ 来模拟实现一下消息队列

二、需求分析

核心概念

  • ⽣产者 (Producer)
  • 消费者 (Consumer)
  • 中间⼈ (Broker)
  • 发布 (Publish) : 生产者向中间人这里投递消息的过程
  • 订阅 (Subscribe) : 哪些消费者要从这个中间人这里取数据, 这个注册的过程, 称为 “订阅”
  • 消费 (Consume): 消费者从中间人这里取走消息后处理数据的动作

通过取快递来理解上述概念

  1. 商家就是生产者
  2. "我"就是消费者
  3. 菜鸟驿站就是中间人
  4. 首先可以是商家向菜鸟驿站发快递 (发布)
  5. 接着 ''我" 关注哪个商家发的快递 (订阅)
  6. 最后"我"从菜鸟驿站中取走快递后, 并使用快递里的商品 (消费)

⼀个⽣产者, ⼀个消费者

在这里插入图片描述

N 个⽣产者, N 个消费者

在这里插入图片描述
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.

Broker Server 中的相关概念

  • 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”, 是一个逻辑上的集合, 一个 BrokerServer 上可以存在多个 VirtualHost
  • 交换机(Exchange): 生产者把消息先发到Broker的Exchange上, 在根据不同的规则, 把消息转发给不同的Queue
  • 队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个Queue上读取消息
  • 绑定(Binding): Exchange 和 Queue 之间的关联关系, Exchange 和 Queue可以理解成 “多对多” 关系, 使用一个关联表就可以把这两个概念联系起来
  • 消息 (Message): 传递的内容

所谓的Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” 一样, 意思是:
一个Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
一个 Queue 也可被多个Exchange 绑定(一个 Queue 中的消息可以来自于多个 Exchange)

关系结构图

在这里插入图片描述
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.

  • 内存存储: 方便使用
  • 磁盘存储: 重启服务器后数据不丢失

核⼼ API

对于 Broker 来说, 要实现以下核心 API, 通过这些 API 来实现消息队列的基本功能

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.

交换机类型 (Exchange Type)

对于 RabbitMQ 来说, 主要支持四种交换机类型

  • Direct
  • Fanout
  • Topic
  • Header

其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 此处也主要实现这三种.

  • Direct: 生产者发送消息时, 直接指定被该交换机绑定的队列名
  • Fanout: 生产者发送的消息会被复制到交换机的所有队列中
  • Topic: 绑定队列到交换机上时, 指定一个字符串为 bindingKey, 发送消息指定一个字符串为 routingKey, 当 routingKey 和 bindingKey 满足一定匹配条件的时候, 则把消息投递到指定队列中

这三个操作就像有发奖品一样

  • Direct是发一个专属的奖品给特定的人, 只有指定的人才能领取
  • Fanout 就是给每一个人都发一个安慰奖
  • Topic是有奖竞猜, 出了一道题, 只有作答并正确的人才能领取到奖品

持久化

Exchange, Queue, Binding, Message 都有持久化需求.

当程序重启 / 主机重启, 保证上述内容不丢失.

⽹络通信

⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.

在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.

Connection 对应⼀个 TCP 连接

Channel 则是 Connection 中的逻辑通道

⼀个 Connection 中可以包含多个 Channel.

Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.

这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.

Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.
在这里插入图片描述

消息应答

被消费的消息, 需要进行应答

应答模式分成两种

  • 自动应答: 消费者只要消费了消息, 就算应答完毕了, Broker 直接删除这个消息
  • 手动应答: 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息

手动应答的目的, 是为了保证消息确实被消费者处理成功了, 在一些对于数据可靠性要求高的场景, 比较常见

三、 模块划分

在这里插入图片描述
可以看到, 像 交换机, 队列, 绑定, 消息, 这⼏个核⼼概念在内存和硬盘中都是存储了的.

其中内存为主, 是⽤来实现消息转发的关键; 硬盘为辅, 主要是保证服务器重启之后, 之前的信息都可以正常保持.

四、 项⽬创建

创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 8.
依赖引⼊ Spring Web 、 MyBatis 和 lombok.

五、创建核心类

创建包 mqserver.mq

创建 Exchange

/**
 * Created with IntelliJ IDEA.
 * Description:这个类表示一个交换机
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 20:05:48
 */
@Data
public class Exchange {
    // 此处使用 name 来作为交换机的身份标识 (唯一的)
    private String name;
    // 交换机类型: Direct, Fanout, Topic
    private ExchangeType type = ExchangeType.DIRECT;
    // 该交换机是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.
    private Boolean durable = false;

    // RabbitMQ 有的字段, 相关功能待开发
    // 该属性表示 如果当前交换机没有人用了, 就会自动删除
    private Boolean autoDelete = false;
    // arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发
    private Map<String,Object> arguments = new HashMap<>();
}
package en.edu.zxj.mq.mqserver.core;

/**
 * Created with IntelliJ IDEA.
 * Description:交换机类型
 *     • Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
 *     • Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
 *     • Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey.
 *              当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 20:10:02
 */
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2)
    ;

    private Integer type;

    ExchangeType(Integer type) {
        this.type = type;
    }

    public Integer getType() {
        return type;
    }
}
  • name: 交换机的名字, 相当于交换机的身份标识
  • type: 交换机的类型, 三种取值, DIRECT, FANOUT, TOPIC
  • durable: 交换机是否要持久化存储, true 为持久化, false 不持久化
  • autoDelete: 使用完毕后是否自动删除
  • arguments: 交换机的其他参数属性

创建 MSGQUeue

/**
 * Created with IntelliJ IDEA.
 * Description:消息队列,
 * 类名叫做 MSGQueue, ⽽不是 Queue, 是为了防⽌和标准库中的 Queue 混淆
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 20:19:52
 */
@Data
public class MSGQueue {
    // 表示队列的身份标识.
    private String name;
    // 该消息队列是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.
    private Boolean durable = false;


    // 以下为保留字段
    // exclusive 为 true, 表示这个队列只能被一个消费者使用(别人用不了), 如果为 false 则是大家都能使用
    private Boolean exclusive = false;
    // 该属性表示 如果当前交换机没有人用了, 就会自动删除
    private Boolean autoDelete = false;
    // arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发
    private Map<String, Object> arguments = new HashMap<>();
}    

类名叫做MSGQUeue, 而不是 Queue, 是为了防止和标准库中的Queue混淆

  • name: 队列名字, 相当于队列的身份标识
  • durable: 交换机是否要持久化存储, true 为持久化, false 不持久化
  • exclusive: 独占(排他), 队列只能被一个消费者使用
  • autoDelete: 使用完毕后是否自动删除
  • arguments: 消息队列的其他参数属性

创建 Binding

/**
 * Created with IntelliJ IDEA.
 * Description:表示队列和交换机之间的关联关系
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 20:24:23
 */
@Data
public class Binding {
    // exchangeName 交换机名字
    private String exchangeName;
    // queueName 队列名字
    private String queueName;
    // bindingKey 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配.
    private String bindingKey;
}
  • exchangeName: 交换机名字
  • queueName: 队列名字
  • bindingKey: 只在交换机类型为 TOPIC 时才有效, 用于和消息中的 routingKey 进行匹配

创建Message

/**
 * Created with IntelliJ IDEA.
 * Description:表示一个要传递的消息
 * 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.
 * 此时就需要针对 Message 进行序列化和反序列化.
 * 此处使用 标准库 自带的 序列化/反序列化 操作.
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 20:55:17
 */
@Data
@Component
public class Message implements Serializable {
    // Message 核心属性
    private BasicProperties basicProperties = new BasicProperties();
    // 存储需要传输的消息, 使用字节的方式存储
    private byte[] body;

    // 辅助属性

    /**
     * 一个文件中会存储很多信息, 如何找到某个消息, 在文件中的具体位置呢?
     * 使用下列的两个偏移量进行表示, [offset, offsetEnd)
     * 这两个属性并不需要被反序列化保存到文件中, 此时信息一旦被写入文件之后, 所在的位置就固定了, 并不需要单独存储
     * 这两个属性存在的目的, 主要是为了让内存中的 Message 对象, 能够快速找到对应硬盘上的 Message 位置.
     **/
    private long offsetBeg = 0; // 消息数据开头距离文件开头的偏移位置 (单位是字节)
    private long offsetEnd = 0; // 消息数据结尾距离文件开头的偏移位置 (单位是字节)

    /**
     * 使用这个属性表示该消息在文件中是否是有效信息. (针对文件中的信息, 如果删除, 使用逻辑删除的方式)
     * 0x1 表示有效, 0x0 表示无效
     **/
    private byte isValid = 0x1;
        /**
     * @description: 工厂模式创建 Message 实例
     * 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程
     * 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
     * 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主
     **/
    public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
        Message message = new Message();
        if (basicProperties != null) {
            message.setBasicProperties(basicProperties);
        }
        // 此处生成的 MessageId 以 M- 作为前缀.
        message.setMessageId("M-" + UUID.randomUUID());
        message.setRoutingKey(routingKey);
        message.setBody(body);
        /* 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
        而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
        此处只是在内存中创建一个 Message 对象.*/

        return message;
    }
}
/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-02-25
 * @time: 21:18:11
 */
@Component
@Data
public class BasicProperties implements Serializable {
    // 消息的唯一身份标识, 此处为了保证 id 的唯一性, 使用 UUID 来作为 messageId
    private String messageId;
    /**
     * 是一个消息上带有的内容, 和 bindingKey 做匹配
     * 如果当前交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名
     * 如果当前交换机类型是 FANOUT, 此时 routingKey 没有意义
     * 如果当前交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey做匹配, 符合要求的才能转发给对应的队列
     **/
    private String routingKey;

    // 这个属性表示消息是否要持久化: 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样的)
    private Integer deliverMode = 1;
}

  • Message 需要实现 Serializable 接口, 后续需要把Message写入文件以及进行网络传输
  • basicProperties: 是消息的属性信息, body 是消息体
  • offsetBegoffsetEnd 表示消息在消息文件中所在的起始位置和结束位置, 这一块具体的设计后续再说; 使用 transient 关键字避免属性被序列化
  • isValid 用来标识消息在文件中是否有效, 这一块具体设计后续再说
  • createMessageWithId 相当于一个工厂方法, 用来创建一个 Message 实例, messageId 通过 UUID 的方式来生成

六、 数据库设计

对于 Exchange,MSGQUeue,Binding,我们使用数据库进行持久化保存

此处我们使用的数据库是 SQLite,是一个更轻量的数据库

SQLite 只是一个动态库,我们在 Java 中直接注入 SQLite 依赖即可直接使用,不必安装其他的软件

配置 SQLite

引入 pom.xml 依赖

 <!--导入 sqlite 数据库-->
 <dependency>
     <groupId>org.xerial</groupId>
     <artifactId>sqlite-jdbc</artifactId>
     <version>3.45.1.0</version>
 </dependency>

配置数据源 application.yml

spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db
    username:
    password:
    driver-class-name: org.sqlite.JDBC
mybatis:
  configuration:
    map-underscore-to-camel-case: true #配置驼峰自动转换
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句
  mapper-locations: classpath:mapper/**Mapper.xml

username 和 password 空着即可

此处我们约定, 把数据库文件放到 ./data/meta.db 中

SQLite 只是把数据单存的存储到一个文件中, 非常简单方便

实现创建表

@Mapper
public interface MetaMapper {
    // 建表操作
    void createExchangeTable();

    void createMSGQueueTable();

    void createBindingTable();
}

本身 MyBatis 针对 MySQL 和 Oracle 是可以执行多个 sql 语句的, 但是 SQLite 不行

MetaMapper.xml 中 具体实现 sql 语句

<update id="createExchangeTable">
        create table if not exists exchange_table
        (
            name          varchar(64) primary key,
            type          int comment '0 表示 Direct, 1 表示 Fanout, 2 表示 Topic',
            durable       boolean,
            auto_delete   boolean,
            arguments     varchar(1024),

            `delete_flag` tinyint(4) DEFAULT '0',
            `create_time` datetime   DEFAULT CURRENT_TIMESTAMP,
            `update_time` datetime   DEFAULT CURRENT_TIMESTAMP
        );
    </update>
    <update id="createMSGQueueTable">
        create table if not exists msg_queue_table
        (
            name          varchar(64) primary key,
            durable       boolean,
            exclusive     boolean,
            auto_delete   boolean,
            arguments     varchar(1024),

            `delete_flag` tinyint(4) DEFAULT '0',
            `create_time` datetime   DEFAULT CURRENT_TIMESTAMP,
            `update_time` datetime   DEFAULT CURRENT_TIMESTAMP
        );
    </update>
    <update id="createBindingTable">
        create table if not exists binding_table
        (
            exchange_name varchar(64),
            queue_name    varchar(64),
            binding_key   varchar(64),

            `delete_flag` tinyint(4) DEFAULT '0',
            `create_time` datetime   DEFAULT CURRENT_TIMESTAMP,
            `update_time` datetime   DEFAULT CURRENT_TIMESTAMP
        );
    </update>

实现数据库基本操作

给 mapper.MetaMapper 中添加

// 相关的增删改查操作
 Integer insertExchange(Exchange exchange);

 Integer deleteExchangeByName(String name);

 List<Exchange> selectAllExchanges();

 Integer insertMSGQueue(MSGQueue msgQueue);

 Integer deleteMSGQueueByName(String name);

 List<MSGQueue> selectAllMSGQueues();


 Integer insertBinding(Binding binding);

 Integer deleteBinding(String exchangeName, String queueName);

 List<Binding> selectAllBindings();

相关sql语句实现

    <insert id="insertExchange">
        insert into exchange_table (name, type, durable, auto_delete, arguments)
        values (#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
    </insert>
    <insert id="insertMSGQueue">
        insert into msg_queue_table (name, durable, exclusive, auto_delete, arguments)
        values (#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
    </insert>
    <insert id="insertBinding">
        insert into binding_table (exchange_name, queue_name, binding_key)
        values (#{exchangeName}, #{queueName}, #{bindingKey});
    </insert>

    <update id="deleteExchangeByName">
        update exchange_table
        set delete_flag = 1
        where name = #{nanme};
    </update>
    <update id="deleteMSGQueueByName">
        update msg_queue_table
        set delete_flag = 1
        where name = #{nanme};
    </update>
    <update id="deleteBinding">
        update binding_table
        set delete_flag = 1
        where exchange_name = #{exchangeName}
          and queue_name = #{queueName};
    </update>

    <select id="selectAllExchanges" resultType="en.edu.zxj.mq.mqserver.core.Exchange">
        select name,
               type,
               durable,

               auto_delete,
               arguments,
               delete_flag,
               create_time,
               update_time
        from exchange_table
        where delete_flag = 0;
    </select>
    <select id="selectAllMSGQueues" resultType="en.edu.zxj.mq.mqserver.core.MSGQueue">
        select name,
               durable,
               exclusive,
               auto_delete,
               arguments,
               delete_flag,
               create_time,
               update_time
        from msg_queue_table
        where delete_flag = 0;
    </select>

    <select id="selectAllBindings" resultType="en.edu.zxj.mq.mqserver.core.Binding">
        select exchange_name,
               queue_name,
               binding_key,
               delete_flag,
               create_time,
               update_time
        from binding_table
        where delete_flag = 0;
    </select>

实现 DataBaseManager

mqserver.datacenter.DatabaseManager

  1. 创建 DatabaseManager 类 – 通过这个类来封装对数据库的操作

/**
 * Created with IntelliJ IDEA.
 * Description:通过这个类来封装针对数据库的操作.
 *
 * @author: zxj
 * @date: 2024-02-26
 * @time: 21:21:21
 */
@Slf4j
public class DatabaseManager {
    // 由于 DataBaseManager 不是⼀个 Bean
    // 需要⼿动来获取实例
    private MetaMapper metaMapper;

    public void init() {
        metaMapper = MqApplication.context.getBean(MetaMapper.class);

        if (!checkDBExits()) {
            // 数据库不存在
            // 1. 先创建目录
            File file = new File("./data/");
            if (!file.exists()) {
                file.mkdirs();
            }
            // 2. 建表
            createTables();
            // 3. 插入默认的数据
            createDefaultData();

            log.info("创建数据库成功~");
        } else {
            log.info("数据库已经存在!");
        }
    }

}

如果数据库已经存在了, 就不必建库建表了

针对 MqApplication, 需要新增⼀个 context 属性. 并初始化

@SpringBootApplication
public class MqApplication {
    public static ConfigurableApplicationContext context = null;

    public static void main(String[] args) {
        context = SpringApplication.run(MqApplication.class, args);
    }
}
  1. 实现 checkDBExists
 private boolean checkDBExits() {
     File file = new File("./data/meta.db");
     return file.exists();
 }
  1. 实现 createTable
 /**
  * 这个方法用来建表.
  * 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)
  * 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)
  **/
 private void createTables() {
     metaMapper.createMSGQueueTable();
     metaMapper.createBindingTable();
     metaMapper.createExchangeTable();

     log.info("建表成功");
 }
  1. 实现 createDefaultData
/**
 * @description: 给数据库表中, 添加默认的数据
 * 此处主要是添加一个默认的交换机
 * RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT
 **/
private void createDefaultData() {
    // 构造一个默认的交换机
    Exchange exchange = new Exchange();
    exchange.setName("");
    exchange.setType(ExchangeType.DIRECT);
    exchange.setDurable(false);
    exchange.setAutoDelete(false);
    metaMapper.insertExchange(exchange);

    log.info("创建默认的数据成功~");
}

默认数据主要是创建⼀个默认的交换机. 这个默认交换机没有名字, 并且是直接交换机.

  1. 封装其他的数据库操作
// 封装其他的数据库操作
    Integer insertExchange(Exchange exchange) {
        return metaMapper.insertExchange(exchange);
    }

    Integer deleteExchangeByName(String name) {
        return metaMapper.deleteExchangeByName(name);
    }

    List<Exchange> selectAllExchanges() {
        return metaMapper.selectAllExchanges();
    }

    Integer insertMSGQueue(MSGQueue msgQueue) {
        return metaMapper.insertMSGQueue(msgQueue);
    }

    Integer deleteMSGQueueByName(String name) {
        return metaMapper.deleteMSGQueueByName(name);
    }

    List<MSGQueue> selectAllMSGQueues() {
        return metaMapper.selectAllMSGQueues();
    }


    Integer insertBinding(Binding binding) {
        return metaMapper.insertBinding(binding);
    }

    Integer deleteBinding(String exchangeName, String queueName) {
        return metaMapper.deleteBinding(exchangeName, queueName);
    }

    List<Binding> selectAllBindings() {
        return metaMapper.selectAllBindings();
    }

    // 清理资源
    public void deleteDB() {
        File file = new File("./data/meta.db");
        if (file.delete()) {
            log.info("删除数据库文件成功~");
        } else {
            log.info("删除数据库文件失败~");
        }

        File dataDir = new File("./data/");
        // 使用 delete 删除目录的时候, 需要保证目录是空的.
        if (dataDir.delete()) {
            log.info("删除数据库目录成功~");
        } else {
            log.info("删除数据库目录失败~");
        }
    }

测试 DataBaseManager

使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.

在 test ⽬录中, 创建 DataBaseManagerTests

  1. 准备⼯作

@SpringBootTest
class DatabaseManagerTest {
    private final DatabaseManager databaseManager = new DatabaseManager();

    // 接下来需要编写多个方法, 每个方法都是一个/一组单元测试用例
    // 还需要做一个准备工作, 需要写两个方法, 分别用于今 "准备工作" 和 "收尾工作"

    // 使用这个方法, 来执行准备工作, 每个用例执行前, 都要调用这个方法
    @BeforeEach
    public void setUp() {
        // 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的
        // 所以就需要先把 context 对象给搞出来, 给搞出来
        MqApplication.context = SpringApplication.run(MqApplication.class);
        databaseManager.init();
    }

    // 使用这个方法, 来执行收尾工作, 每个用例执行后, 都要调用这个方法
    @AfterEach
    public void tearDown() {
        /*
         这里需要进行操作, 就是把数据库给清空~ (把数据库文件, meta.db 直接删了就行了)
         注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!
         此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件
         如果 meta.db 被别人打开了, 次数的删除文件操作是不会成功的 (Windows 系统的限制, Linux 没有这个问题)
         另一方面, 获取 context 操作, 会占用 8080 端口, 此处的 close 也是释放 8080
         */
        MqApplication.context.close();
        databaseManager.deleteDB();
    }
}
  • @SpringBootTest 注解表示该类是一个测试类
  • @BeforeEach 每个测试用例之前执行, 一般用来做准备工作, 此处进行数据库初始化, 以及针对 Spring 服务的初始化
  • @AfterEach 每个测试用例之后执行, 一般用来做收尾工作, 此处需要先关闭 Spring 项目, 再删除数据库
  1. 编写测试用例
  • @Test 注解表示一个测试用例
  • Assertions 是断言, 用来判定结果的
  • 每个用例执行之前, 都会先调用 setUp, 每次用例执行后, 都会调用 tearDown
  • 确保每个用例执行的都是 “clean” 的, 也就是每个测试用例不会被上一个测试用例干扰

具体代码


    @Test
    void init() {
        // 由于 init 方法, 已经在上面 setUp 方法中调用了, 直接在测试用例代码中, 检查当前的数据库状态即可
        // 直接从数据库中查询, 看数据是否符合预期.
        // 查交换机表, 里面应该有一个数据 (匿名的 exchange); 查消息队列表, 没有数据; 查绑定表, 没有数据
        List<Exchange> exchanges = databaseManager.selectAllExchanges();
        List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();
        List<Binding> bindings = databaseManager.selectAllBindings();

        /*
           直接打印结果, 通过肉眼来检查结果, 可以但是不优雅, 不方便
           更好的方法是使用断言
           System.out.println(exchanges.size());
           assertEquals 判定结果是不是相等
           注意 assertEquals 两个参数的顺序, 虽然比较相等, 谁在前, 谁在后, 无所谓
           但是 assertEquals 的形参, 第一个形参叫做 expected (预期), 第二个形参叫做 actual (实际的)
         */
        Assertions.assertEquals(1, exchanges.size());
        Assertions.assertEquals("", exchanges.get(0).getName());
        Assertions.assertEquals(ExchangeType.DIRECT, exchanges.get(0).getType());
        Assertions.assertEquals(0, msgQueues.size());
        Assertions.assertEquals(0, bindings.size());
    }


    @org.jetbrains.annotations.NotNull
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.FANOUT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        exchange.setArguments("11", "aa");
        exchange.setArguments("22", "bb");

        return exchange;
    }


    @Test
    void insertExchange() {
        // 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.
        String exchangeName = "exchangeTest";
        Exchange exchange = createTestExchange(exchangeName);
        databaseManager.insertExchange(exchange);
        // 插入完毕后, 查询结果
        List<Exchange> exchanges = databaseManager.selectAllExchanges();

        Exchange newExchange = exchanges.get(1);
        Assertions.assertEquals(2, exchanges.size());
        Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
        Assertions.assertEquals(true, newExchange.getDurable());
        Assertions.assertEquals(false, newExchange.getAutoDelete());
        Assertions.assertEquals("aa", newExchange.getArguments("11"));
        Assertions.assertEquals("bb", newExchange.getArguments("22"));
    }

    @Test
    void deleteExchangeByName() {
        // 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!
        String exchangeName = "exchangeTest";
        Exchange exchange = createTestExchange(exchangeName);
        databaseManager.insertExchange(exchange);
        // 插入完毕后, 查询结果
        List<Exchange> exchanges = databaseManager.selectAllExchanges();
        Exchange newExchange = exchanges.get(1);
        Assertions.assertEquals(2, exchanges.size());
        Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());

        // 删除交换机
        databaseManager.deleteExchangeByName(exchangeName);
        exchanges = databaseManager.selectAllExchanges();
        Assertions.assertEquals(1, exchanges.size());
        Assertions.assertEquals("", exchanges.get(0).getName());
    }

    @Test
    void selectAllExchange() {
        // 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.
        String exchangeName = "exchangeTest";
        Exchange exchange = createTestExchange(exchangeName);
        databaseManager.insertExchange(exchange);
        // 插入完毕后, 查询结果
        List<Exchange> exchanges = databaseManager.selectAllExchanges();

        Exchange newExchange = exchanges.get(1);
        Assertions.assertEquals(2, exchanges.size());
        Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
        Assertions.assertEquals(true, newExchange.getDurable());
        Assertions.assertEquals(false, newExchange.getAutoDelete());
        Assertions.assertEquals("aa", newExchange.getArguments("11"));
        Assertions.assertEquals("bb", newExchange.getArguments("22"));
    }


    private MSGQueue createTestMSGQueue(String msgQueueName) {
        MSGQueue msgQueue = new MSGQueue();
        msgQueue.setName(msgQueueName);
        msgQueue.setDurable(true);
        msgQueue.setAutoDelete(false);
        msgQueue.setExclusive(false);
        msgQueue.setArguments("a", 1);
        msgQueue.setArguments("b", 2);
        return msgQueue;
    }


    @Test
    void insertMSGQueue() {
        // 插入数据
        String msgQueueName = "testMSGQueueName";
        MSGQueue msgQueue = createTestMSGQueue(msgQueueName);
        databaseManager.insertMSGQueue(msgQueue);

        // 查询数据, 判断插入的数据是否正确
        List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();

        MSGQueue msgQueueNew = msgQueues.get(0);

        Assertions.assertEquals(1, msgQueues.size());
        Assertions.assertEquals(msgQueueName, msgQueueNew.getName());
        Assertions.assertEquals(true, msgQueueNew.getDurable());
        Assertions.assertEquals(false, msgQueueNew.getAutoDelete());
        Assertions.assertEquals(false, msgQueueNew.getExclusive());
        Assertions.assertEquals(1, msgQueueNew.getArguments("a"));
        Assertions.assertEquals(2, msgQueueNew.getArguments("b"));

    }

    @Test
    void deleteMSGQueueByName() {
        // 插入数据
        String msgQueueName = "testMSGQueueName";
        MSGQueue msgQueue = createTestMSGQueue(msgQueueName);
        databaseManager.insertMSGQueue(msgQueue);
        // 查询数据, 判断插入的数据是否正确
        List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();
        MSGQueue msgQueueNew = msgQueues.get(0);
        Assertions.assertEquals(1, msgQueues.size());
        Assertions.assertEquals(msgQueueName, msgQueueNew.getName());

        // 依据名字删除
        databaseManager.deleteMSGQueueByName(msgQueueName);
        msgQueues = databaseManager.selectAllMSGQueues();
        Assertions.assertEquals(0, msgQueues.size());

    }

    @Test
    void selectAllMSGQueue() {
        // 插入数据
        String msgQueueName = "testMSGQueueName";
        MSGQueue msgQueue = createTestMSGQueue(msgQueueName);
        databaseManager.insertMSGQueue(msgQueue);
        // 查询数据, 判断插入的数据是否正确
        List<MSGQueue> msgQueues = databaseManager.selectAllMSGQueues();
        MSGQueue msgQueueNew = msgQueues.get(0);
        Assertions.assertEquals(1, msgQueues.size());
        Assertions.assertEquals(msgQueueName, msgQueueNew.getName());
    }

    private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {
        Binding binding = new Binding();
        binding.setExchangeName(exchangeName);
        binding.setQueueName(msgQueueName);
        binding.setBindingKey("Hello word");
        return binding;
    }


    @Test
    void insertBinding() {
        // 插入 binding 数据
        String exchangeName = "testBindingExchangeName";
        String msgQueueName = "testBindingMSGQueueName";
        Binding binding = createTestBinding(exchangeName, msgQueueName);
        databaseManager.insertBinding(binding);

        // 查询
        List<Binding> bindings = databaseManager.selectAllBindings();
        Binding bindingNew = bindings.get(0);

        Assertions.assertEquals(1, bindings.size());
        Assertions.assertEquals(exchangeName, bindingNew.getExchangeName());
        Assertions.assertEquals(msgQueueName, bindingNew.getQueueName());
        Assertions.assertEquals("Hello word", bindingNew.getBindingKey());
    }

    @Test
    void deleteBinding() {
        // 插入 binding 数据
        String exchangeName = "testBindingExchangeName";
        String msgQueueName = "testBindingMSGQueueName";
        Binding binding = createTestBinding(exchangeName, msgQueueName);
        databaseManager.insertBinding(binding);
        // 查询
        List<Binding> bindings = databaseManager.selectAllBindings();
        Binding bindingNew = bindings.get(0);
        Assertions.assertEquals(1, bindings.size());
        // 删除
        databaseManager.deleteBinding(exchangeName, msgQueueName);
        bindings = databaseManager.selectAllBindings();
        Assertions.assertEquals(0, bindings.size());

    }

    @Test
    void selectAllBinding() {
        // 插入 binding 数据
        String exchangeName = "testBindingExchangeName";
        String msgQueueName = "testBindingMSGQueueName";
        Binding binding = createTestBinding(exchangeName, msgQueueName);
        databaseManager.insertBinding(binding);
        // 查询
        List<Binding> bindings = databaseManager.selectAllBindings();
        Binding bindingNew = bindings.get(0);
        Assertions.assertEquals(1, bindings.size());
    }

七、消息存储设计

设计思路

消息需要再硬盘上存储, 但是并不是直接放到数据库中,而是直接使用文件存储。

原因如下:

  1. 对于消息的操作不需要复杂的 增删改查
  2. 对于文件的操作效率比数据库会高很多

主流 mq 的实现 (包括 RabbitMQ), 都是把消息存储在文件中, 而不是数据库中

我们给每个队列分配一个目录, 目录的名字为 data + 队列名, 形如 ./data/testQueue, 该目录中包含两个固定名字的文件

  • queue_data.txt 消息数据文件, 用来保存消息内容
  • queue_stat.txt 消息统计文件, 用来保存消息统计信息

queue_data.txt 文件格式:

使用二进制方式存储.

每个消息分成两个部分:

  • 前四个字节, 表示 Message 对象的长度(字节为单位)
  • 后面若干个字节, 表示 Message 内容
  • 消息和消息之间收尾相连

每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化

在这里插入图片描述

Message 对象中的 offsetBeg 和 offsetEnd 正是用来描述每个消息体所在的位置

queue_static.txt 文件格式:
使用文本方式存储

文件中只包含一行, 里面包含两列(都是整数), 使用 \t 分割.

第一列表示当前总的消息数目. 第二列表示有效消息数目.

形如:

2000\t1500

创建 MessageFileManager 类

创建 mqserver.datacenter.MessageFileManager


/**
 * Created with IntelliJ IDEA.
 * Description:消息持久化存储
 * 存储单位是以队列名字为单位存储的
 *
 * @author: zxj
 * @date: 2024-02-28
 * @time: 13:43:23
 */
@Slf4j
public class MessageFileManger {

    private final static String BASIC_DIR = "./data/";

    /**
     * @description: 内部类, 用于管理 queue_stat.txt 中的数据
     * 存储格式: totalCount \t validCount
     * 作用: 为了后面垃圾回收功能做准备的
     * 约定: 当 有效信息的占比低于 50% 时, 并且 总的消息数目大于 2000 时, 触发 GC 功能
     **/
    static public class Stat {
        // 总信息的存储数目
        public Integer totalCount;
        // 有效的信息数目
        public Integer validCount;


        // 最少消息数目
        private static final Integer atLeastCount = 2000;
        // 最低有效信息占比
        private static final Double minProportion = 0.5;

    }

    public void init() {
        // 暂时不需要任何初始化操作, 方便后续扩展
    }

    // 设定信息存储的目录和文件

    /**
     * @description: 用来获取指定队列信息存储的目录
     **/
    @Contract(pure = true)
    private @NotNull String getQueueDir(String queueName) {
        return BASIC_DIR + queueName;
    }

    /**
     * @description: 用来获取指定队列信息数据存储路径
     **/
    @Contract(pure = true)
    private @NotNull String getQueueDataPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data.txt";
    }

    /**
     * @description: 用来获取指定队列信息记录存储路径
     **/
    @Contract(pure = true)
    private @NotNull String getQueueStatPath(String queueName) {
        return getQueueDir(queueName) + "/queue_stat.txt";
    }

    /**
     * @description: 用来获取指定队列新数据存储路径
     **/
    @Contract(pure = true)
    private @NotNull String getQueueDataNewPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }

}
  • 内部包含一个 Stat 类, 用来标识消息统计文件的内容
  • getQueueDir, getQueueDataPath, getQueueStatPath, getQueueDataNewPath 用来表示这几个文件的位置

实现统计⽂件读写

这是后续操作的一些准备工作


    /**
     * @description: 读取 queue_stat.txt 文件里面的内容
     **/
    private @NotNull Stat readStat(String queueName) throws IOException {
        Stat stat = new Stat();
        try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueStatPath(queueName)))) {
            // 因为 queue_stat.txt 里面存储的内容是文本的, 所以可以使用 Scanner 来读取
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
        }
        return stat;
    }

    /**
     * @description: 向 queue_stat.txt 文件里面写入内容
     **/
    private void writeStat(String queueName, @NotNull Stat stat) throws IOException {
        // 使用 PrintWrite 来写文件.
        // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
        try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueStatPath(queueName)))) {
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        }
    }

直接使用 Scanner 和 Printer 写即可

实现创建队列⽬录

每个队列都是自己的目录和配置的文件, 通过下列方法把目录和文件先准备好


    /**
     * @description: 创建相关目录信息, 并进行相关的初始化操作
     **/
    public void createQueueFiles(String queueName) throws IOException {
        // 1. 创建对应的目录
        File fileDir = new File(getQueueDir(queueName));
        if (!fileDir.exists()) {
            // 不存在对应目录
            if (!fileDir.mkdirs()) {
                throw new IOException("创建目录失败, fileDir: " + fileDir.getAbsoluteFile());
            }
        }
        // 2. 创建 queue_data.txt 文件
        File fileData = new File(getQueueDataPath(queueName));
        if (!fileData.exists()) {
            // 不存在对应文件
            if (!fileData.createNewFile()) {
                throw new IOException("创建目录失败, fileData: " + fileData.getAbsoluteFile());
            }
        }
        // 3. 创建 queue_stat.txt 文件
        File fileStat = new File(getQueueDataPath(queueName));
        if (!fileStat.exists()) {
            // 不存在对应文件
            if (!fileStat.createNewFile()) {
                throw new IOException("创建目录失败, fileStat: " + fileStat.getAbsoluteFile());
            }
        }
        // 4. 初始化 queue_stat.txt 文件
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName, stat);
    }

把上述约定的文件都创建出来, 并对消息统计文件进行初始化

初始化 0\t0 这样的初始值

实现删除队列⽬录

如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除

    /**
     * @description: 销毁消息的目录文件
     **/
    public void destroyQueueFiles(String queueName) throws IOException {
        // 先删除文件, 在删除目录
        File fileData = new File(getQueueDataPath(queueName));
        boolean ok1 = fileData.delete();
        File fileStat = new File(getQueueStatPath(queueName));
        boolean ok2 = fileStat.delete();
        File fileDir = new File(getQueueDir(queueName));
        boolean ok3 = fileDir.delete();
        if (!ok1 || !ok2 || !ok3) {
            // 但凡有一个失败, 就算整体是失败的
            throw new IOException("删除指定文件和目录失败, dir: " + fileDir.getAbsoluteFile());
        }
    }

注意: File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉

检查队列⽂件是否存在

    /**
     * @description: 判断 queueName 对应的文件是否存在
     * 比如后续生产者给 broker server 生产消息了, 这消息可能需要被记录到文件上(取决于该信息是否需要持久化)
     * @return:
     **/
    public boolean checkFilesExists(String queueName) {
        // 判断队里的数据文件和状态文件是否存在即可
        File fileData = new File(getQueueDataPath(queueName));
        File fileStat = new File(getQueueStatPath(queueName));
        return fileStat.exists() && fileData.exists();
    }

实现消息对象序列化/反序列化

Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.

创建 common.BinaryUtils


/**
 * Created with IntelliJ IDEA.
 * Description:操作二级制数据相关的工具类 -- 提供将 java 对象 反序列化和序列化
 *
 * @author: zxj
 * @date: 2024-02-28
 * @time: 14:33:24
 */
public class BinaryUtils {

    /**
     * @description: 反序列化, 将字节数组转化为 java 对象
     **/
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
                // 这里的 readObject 就是将字节数组 反序列为 java 对象
                object = objectInputStream.readObject();
            }
        }
        return object;
    }

    public static byte[] toBytes(Object object) throws IOException {
        // 这个流对象相当于一个变长的字节数据
        // 可以把 Object 序列化的数据逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 ObjectOutputStream中
                // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream 中, 最终结果就写入到 ByteArrayOutputStream 中了
                objectOutputStream.writeObject(object);
            }
            // 这个操作就是把 byteArrayOutputStream 中持有的二进制数据提取出现, 转化为 byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }

}
  • 使用 ByteArrayInputStream / ByteArrayOutputStream 针对 byte[ ]进行封装, 方便后续操作 (这两个流对象是纯内存的, 不需要进行 close)
  • 使用 ObjectInputStream / ObjectOutputStream 进行序列化和反序列化操作, 通过内部的 readObject / writeObject 即可完成对应操作
  • 此处涉及到的序列化对象, 需要实现 Serializable 接口

实现写⼊消息⽂件


    /**
     * @description: 增
     * 将 message 放到 msgQueue 对应的队列文件中
     * @param: [msgQueue 消息队列, message 需要存储的信息 - 内存中也会管理该对象]
     **/
    public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {
        // 1. 检查当前队列要写入的文件是否存在
        if (!checkFilesExists(msgQueue.getName())) {
            throw new MqException("[MessageFileManager] 队列所对应的文件不存在! queueName=" + msgQueue.getName());
        }

        // 2. 把 message 转化为 字节数组
        byte[] messageBinary = BinaryUtils.toBytes(message);

        // 将 messageBinary 写入到 msgQueue 所对应的队列文件中
        // 文件属于一个公共资源, 此时进行写操作, 存在线程安全的问题
        // 需要对对应的队列进行加锁, 确保同时向同一个队列中写入信息是线程安全的
        synchronized (msgQueue) {
            // 3. 设置 Message 对象中 offsetBeg 和 offsetEnd 字段
            // 3.1 获取此时对应文件的总长度, fileQueueData.length() 就可以获取
            File fileQueueData = new File(getQueueDataPath(msgQueue.getName()));
            // 3.2 计算
            // 把新的 message 写入到文件中, offsetBeg = 旧的总长度 + 4, offsetEnd = 旧的总长度 + messageBinary.length + 4
            message.setOffsetBeg(fileQueueData.length() + 4);
            message.setOffsetEnd(fileQueueData.length() + messageBinary.length + 4);

            // 4. 将 messageBinary 写入到文件的默认, 注意: 这里是追加写
            try (OutputStream outputStream = new FileOutputStream(getQueueDataPath(msgQueue.getName()), true)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    // 4.1 先写入新 messageBinary 的长度 -- 固定占四个字节
                    // 知识点: outputStream.write() 参数看似是 int 类型, 但是实际上只是写入一个字节的数据, dataOutputStream.writeInt() 就是写四个字节的数据
                    dataOutputStream.writeInt(messageBinary.length);
                    // 4.2 写入主体信息
                    dataOutputStream.write(messageBinary);
                }
            }

            // 5. 更新信息统计文件的信息
            Stat stat = readStat(msgQueue.getName());
            stat.validCount += 1;
            stat.totalCount += 1;
            writeStat(msgQueue.getName(), stat);
        }
    }
  • 考虑线程安全, 按照队列维度进行加锁
  • 使用 DataOutputStream 进行二进制写操作, 比原生 OutputStream 要方便
  • 需要记录 Message 对象在文件中的偏移量, 后续的删除操作依赖这个偏移量定位到信息, offsetBeg是原文件大小的基础上 + 4, 4个字节是存放消息大小的空间
  • 写完消息, 要同时更新统计消息

创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常.

/**
 * Created with IntelliJ IDEA.
 * Description:自定义异常信息
 *
 * @author: zxj
 * @date: 2024-02-28
 * @time: 14:30:32
 */
public class MqException extends Exception {
    public MqException() {
    }

    public MqException(String message) {
        super(message);
    }
}

实现删除消息

此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.

这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决.


    /**
     * @description: 删除 message
     * 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置为 0x0
     * 1. 先把这个文件中的这一段数据, 读出现来, 还原回 Message 对象
     * 2. 把 isValid 该成 0;
     * 3. 把上述数据重新写回文件
     * 此处这个参数中的 message 对象, 必须要包含有效的 offsetBeg 和 offsetEnd
     **/
    public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException {
        // 修改文件, 存在线程安全问题
        synchronized (msgQueue) {
            try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(msgQueue.getName()), "rw")) {
                // 1. 先从文件中读取对应的 message 数据
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.read(bufferSrc);
                // 2. 将当前读出来的二进制数据, 转换成 Message 对象
                Message diskMessage = (Message) BinaryUtils.fromBytes(bufferSrc);
                // 3. 把 isValid 设置为无效
                diskMessage.setIsValid((byte) 0x0);
                // 此处不需要宰割参数这个 Message 的 isValid 设为 0, 因为这个参数代表的内容中管理的 Message 对象, 而这个对象也马上要被从内存中销毁了
                // 4. 重新写入文件
                byte[] buffDest = BinaryUtils.toBytes(diskMessage);
                // 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到下一个信息的位置了,
                // 因此想要接下来的写入, 能能够刚好写回到之前的位置, 就需要重新调整文件光标
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(buffDest);
                // 通过上述这些操作, 对于文件来说, 只是有一个字节发生了改变了而已~
            }
            // 更新统计文件, 把一个消息设置为无效了, 此时有效信息个数就需要 -1
            Stat stat = readStat(msgQueue.getName());
            if (stat.validCount > 0) {
                stat.validCount -= 1;
            }
            writeStat(msgQueue.getName(), stat);
        }
    }
  • 使用 RandomAccessFile 来随机访问到文件的内容
  • 根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在文件中的位置, 通过 randomAccessFile.seek 操作文件指针偏移过去, 在读取
  • 读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回文件, 注意写的时候要重新设定文件指针的位置, 文件指针会随着上述的读操作产生改变
  • 最好, 要记得更新统计文件, 把合法消息 -1

实现消息加载

把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键.

    /**
     * @description: 查
     * 使用这个方法, 从文件中, 读取所有的消息内容, 加载到内存中 (具体来说是放到一个链表里面)
     * 这个方法,使用一个 LinkedList, 主要目的是为了后续进行头删操作
     * 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象, 因为这个方法不需要加锁,只使用 queueName 就够了
     * 由于该方法是在程序启动调用, 此时服务还不能处理请求, 不涉及多线程操作文件
     **/
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueDataPath(queueName)))) {
            try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
                // 这个变量记录当前文件的光标
                long currentOffset = 0;
                while (true) {
                    // 1. 先读取当前消息长度字段
                    int messageSize = dataInputStream.readInt();
                    // 2. 按照这个长度, 读取后续的数据长度
                    byte[] buffSrc = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffSrc);
                    if (messageSize != actualSize) {
                        // 如果不匹配, 说明文件有问题, 格式错了
                        throw new MqException("[MessageFileManager] 文件格式错误, queueName=" + queueName);
                    }
                    // 3. 把读到的二进制数据, 反序列化为 Message 对象
                    Message message = (Message) BinaryUtils.fromBytes(buffSrc);
                    // 4. 判定一下, 看看这个消息对象, 是不是无效对象
                    if (message.getIsValid() != 0x1) {
                        // 无效数据, 直接跳过
                        // 虽然消息是无效数据, 但是 offset 仍要更新
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 5. 有效数据, 则需要将这个 Message 对象加入到链表中, 加入之前还需要添加 OffsetBeg 和 OffsetEnd
                    //    进行计算 Offset 的时候, 需要当前文件光标的位置, 由于当下使用的 DataInputStream 并不方便直接获取文件光标位置
                    //    因此需要手动计算下文件光标
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            } catch (EOFException e) {
                // 这个异常是表示读取到文件的末尾了, 这是 DataInputStream 中规定的
                // 不需要做什么特殊处理
                log.info("恢复磁盘文件数据完成");
            }
        }
        return messages;
    }

  • 使用 DataInputStream 读取数据, 先读 4 个字节为消息懂得长度, 然后在按照这个长度来读取实际消息内容
  • 读取完毕之后, 转化成 Message 对象
  • 同时计算出该对象的 offsetBeg 和 offsetEnd
  • 最终把结果整理成链表, 返回出去
  • 注意, 对于DataInputStream 来说, 如果读到 EOF, 就会抛出一个 EOFException, 而不是返回特定值, 因此需要注意上述循环的结束条件

实现垃圾回收(GC)

上述删除操作, 只是把消息在文件上标记成了无效, 并没有腾出磁盘空间, 因此需要定期的进行批量删除.

此处使用类似于复制算法, 当总消息数超过 2000, 并且有效消息数目少于 50%的时候, 就触发 GC.

GC 的时候会把所有有效消息加载出来, 写入一个新的消息文件中, 使用新文件, 替代旧文件即可

    /**
     * @description: 检查当前是否需要针对队列的消息数据文件进行 GC
     **/
    public boolean checkGC(String queueName) throws IOException {
        // 判断是否要 GC, 是根据总消息数和有效消息数, 这两个值都是在 消息统计文件中的
        Stat stat = readStat(queueName);
        return stat.totalCount > Stat.atLeastCount && (double) stat.validCount / (double) stat.totalCount < Stat.minProportion;
    }


    /**
     * @description: 垃圾回收, 防止存储过多垃圾信息
     * 通过这个方法, 真正执行消息数据文件的垃圾回收操作
     * 使用复制算法来完成
     * 创建一个新的文件, 名字就是 queue_data_new.txt
     * 把之前消息数据文件中的有效消息都读出来, 写到新的文件中
     * 删除旧的文件,在把新的文件改名回 queue_data.txt
     * 同时要更新消息统计文件
     **/
    public void gc(MSGQueue msgQueue) throws MqException, IOException, ClassNotFoundException {
        // 进行 gc 的过程, 是针对消息数据文件进行大洗牌, 这个过程中, 其他线程不能针对该队列的消息文件做任何修改
        synchronized (msgQueue) {
            // 由于 gc 操作可能比较耗时, 此处统计一下消耗时间
            long gcBeg = System.currentTimeMillis();

            // 1. 创建一个新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(msgQueue.getName()));
            if (queueDataNewFile.exists()) {
                // 正常情况下, 这个文件不应该存在, 如果存在, 就是意外, 说明上次 gc 了一半, 程序意味崩溃了
                throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在, queueName=" + msgQueue.getName());
            }
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建文件失败 ,queueDataNewFile=" + queueDataNewFile.getAbsoluteFile());
            }

            // 2. 从旧文件中, 读取出所有的有效消息对象
            LinkedList<Message> messages = loadAllMessageFromQueue(msgQueue.getName());

            // 3. 把有效信息写入到新的文件中
            try (OutputStream outputStream = Files.newOutputStream(Paths.get(getQueueDataNewPath(msgQueue.getName())))) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    for (Message message : messages) {
                        byte[] buffer = BinaryUtils.toBytes(message);
                        // 先写长度
                        dataOutputStream.writeInt(buffer.length);
                        // 在写整体数据
                        dataOutputStream.write(buffer);
                    }
                }
            }


            // 4. 删除旧的数据文件, 并把新的文件进行重命名
            File queueDataOldFile = new File(getQueueDataPath(msgQueue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
                throw new MqException("[MessageFileManager] 文件重命名失败, queueDataOldFile: " + queueDataOldFile.getAbsoluteFile() + ", queueDataNewFile: " + queueDataNewFile.getAbsoluteFile());

            }

            // 5. 更新统计文件
            Stat stat = readStat(msgQueue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();

            long gcEnd = System.currentTimeMillis();
            log.info("gc 执行消耗时间: {} ms", (gcEnd - gcBeg));
        }
    }

如果文件很大, 消息非常多, 可能比较低效, 这种就需要把文件做拆分和合并了
Rabbitmq 是这样实现了, 此处实现简答, 就不做了

测试 MessageFileManager

  • 创建两个队列, 用来辅助测试
  • 使用 ReflectionTestUtils.invokeMethod 来调用私有方法
package en.edu.zxj.mq.mqserver.datacenter;

import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.MSGQueue;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-02-28
 * @time: 18:30:12
 */
@SpringBootTest
class MessageFileMangerTest {
    private MessageFileManger messageFileManger = new MessageFileManger();


    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";


    /**
     * @description: 每个方法执行前的 准备工作
     **/
    @BeforeEach
    void setUp() throws IOException {
        // 创建两个队列
        messageFileManger.createQueueFiles(queueName1);
        messageFileManger.createQueueFiles(queueName2);
    }

    /**
     * @description: 每个方法执行前的 收尾工作工作
     **/
    @AfterEach
    void tearDown() throws IOException {
        // 删除两个队列文件
        messageFileManger.destroyQueueFiles(queueName1);
        messageFileManger.destroyQueueFiles(queueName2);
    }


    // @Test
    // void init() {
    //    // 功能待开发
    // }

    @Test
    void createQueueFiles() {
        // 创建文件已经在上面 setUp 阶段执行过了, 此处主要是验证看看文件是否存在
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile2.isFile());
    }

    @Test
    void destroyQueueFiles() throws IOException {
        // 删除文件, 看看是否存在, 不存在才对
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile1.isFile());
        // messageFileManger.destroyQueueFiles(queueName1);
        // Assertions.assertEquals(false,queueDataFile1.isFile());
        // Assertions.assertEquals(false,queueStatFile1.isFile());


        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile2.isFile());

    }

    @Test
    void checkFilesExists() {
        // 在 setUp 阶段, 创建了两个队列, 此处只要判断接口返回的是否是 true 即可
        Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName1));
        Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName2));
    }


    private Message createTestMessage(String content) {
        return Message.createMessageWithId("testRoutingKey", null, content.getBytes());
    }

    private MSGQueue createTestMSGQueue(String queueName) {
        MSGQueue msgQueue = new MSGQueue();
        msgQueue.setName(queueName);
        msgQueue.setDurable(true);
        msgQueue.setAutoDelete(false);
        msgQueue.setExclusive(false);
        return msgQueue;
    }

    @Test
    void sendMessage() throws IOException, MqException, ClassNotFoundException {
        // 构造出消息, 并构造出队列
        Message message = createTestMessage("testSendMessage");
        // 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,
        // 需要保证这个队列对象对应的目录和文件都存在才行
        MSGQueue queue = createTestMSGQueue(queueName1);

        // 调用发送信息方法
        messageFileManger.sendMessage(queue, message);

        // 检查 stat 文件
        MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger, "readStat", queueName1);
        Assertions.assertEquals(1, stat.validCount);
        Assertions.assertEquals(1, stat.totalCount);


        // 检查 data 文件
        LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);
        Message newMessage = messages.get(0);
        Assertions.assertEquals(true, newMessage.equals(message));

        System.out.println("message: " + newMessage);

    }

    @Test
    void deleteMessage() throws IOException, MqException, ClassNotFoundException {
        // 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,
        // 需要保证这个队列对象对应的目录和文件都存在才行
        MSGQueue queue = createTestMSGQueue(queueName1);

        // 构造 10 条数据, 在进行删除
        List<Message> exceptedMessages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManger.sendMessage(queue,message);
            exceptedMessages.add(message);
        }

        // 删除数据
        messageFileManger.deleteMessage(queue,exceptedMessages.get(9));
        messageFileManger.deleteMessage(queue,exceptedMessages.get(8));

        // 判断数据是否正确
        LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);

        Assertions.assertEquals(8,actualMessages.size());

        for (int i = 0; i < 8; i++) {
            Message exceptedMessage = exceptedMessages.get(i);
            Message actualMessage = actualMessages.get(i);

            System.out.println(i + ":  " + actualMessage);

            Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());
            Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());
            Assertions.assertEquals(0x1
                    ,actualMessage.getIsValid());
            Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());
            Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());

        }

    }

    @Test
    void loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
        // 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,
        // 需要保证这个队列对象对应的目录和文件都存在才行
        MSGQueue queue = createTestMSGQueue(queueName1);

        // 构造 100 条数据
        List<Message> exceptedMessages = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManger.sendMessage(queue,message);
            exceptedMessages.add(message);
        }

        // 读取所有数据, 看释放和原来的数据相同
        // 判断数据是否正确
        LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);

        Assertions.assertEquals(100,actualMessages.size());

        for (int i = 0; i < 100; i++) {
            Message exceptedMessage = exceptedMessages.get(i);
            Message actualMessage = actualMessages.get(i);

            System.out.println(i + ":  " + actualMessage);

            Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());
            Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());
            Assertions.assertEquals(0x1
                    ,actualMessage.getIsValid());
            Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());
            Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());

        }
    }

    @Test
    void gc() throws IOException, MqException, ClassNotFoundException {
        // 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,
        // 需要保证这个队列对象对应的目录和文件都存在才行
        MSGQueue queue = createTestMSGQueue(queueName1);

        // 构造 100 条数据
        List<Message> exceptedMessages = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManger.sendMessage(queue,message);
            exceptedMessages.add(message);
        }

        // 保存 gc 前文件的大小
        File file = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGC = file.length();

        // 删除偶数下标的数据
        for (int i = 0; i < 100; i+=2) {
            messageFileManger.deleteMessage(queue,exceptedMessages.get(i));
        }

        // 手动调用 gc
        messageFileManger.gc(queue);

        // 读取所有数据
        LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);

        Assertions.assertEquals(50,actualMessages.size());

        for (int i = 0; i < 50; i++) {
            Message exceptedMessage = exceptedMessages.get(i * 2 + 1);
            Message actualMessage = actualMessages.get(i);

            System.out.println(i + ":  " + actualMessage);

            Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());
            Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());
            Assertions.assertEquals(0x1
                    ,actualMessage.getIsValid());
            Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());
            Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());

        }
        File file1 = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGC = file1.length();

        Assertions.assertEquals(true,afterGC < beforeGC);
    }
}

⼋、 整合数据库和⽂件

上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.

接下来我们把两个部分整合起来, 统⼀进⾏磁盘管理.

创建 DiskDataCenter

使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.

DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象.

/**
 * Created with IntelliJ IDEA.
 * Description:封装访问磁盘数据操作: 数据库 + 文件
 * 1. 数据库: 交换机, 消息队列, 绑定
 * 2. 文件: 消息
 * 上层逻辑如果需要访问磁盘, 就直接调用这个类, 不需要知道下层访问的是数据库还是文件
 *
 * @author: zxj
 * @date: 2024-02-28
 * @time: 21:28:00
 */
public class DiskDataCenter {
    private DatabaseManager databaseManager = new DatabaseManager();
    private MessageFileManger messageFileManger = new MessageFileManger();


    public void init() {
        databaseManager.init();
        messageFileManger.init();
    }
}

封装 Exchange ⽅法

    /*
     * 封装交换机操作
     */
    public Integer insertExchange(Exchange exchange) {
        return databaseManager.insertExchange(exchange);
    }

    public Integer deleteExchangeByName(String name) {
        return databaseManager.deleteExchangeByName(name);
    }

    public List<Exchange> selectAllExchanges() {
        return databaseManager.selectAllExchanges();
    }

封装 Queue ⽅法

    /*
     * 封装消息队列操作
     */
    public Integer insertMSGQueue(MSGQueue msgQueue) throws IOException {
        Integer ret = databaseManager.insertMSGQueue(msgQueue);
        messageFileManger.createQueueFiles(msgQueue.getName());
        return ret;
    }

    public Integer deleteMSGQueueByName(String name) throws IOException {
        Integer ret = databaseManager.deleteMSGQueueByName(name);
        messageFileManger.destroyQueueFiles(name);
        return ret;
    }

    public List<MSGQueue> selectAllMSGQueues() {
        return databaseManager.selectAllMSGQueues();
    }

封装 Binding 方法


    /*
     * 封装绑定机操作
     */
    public Integer insertBinding(Binding binding) {
        return databaseManager.insertBinding(binding);
    }

    public Integer deleteBinding(String exchangeName, String queueName) {
        return databaseManager.deleteBinding(exchangeName, queueName);
    }

    public List<Binding> selectAllBindings() {
        return databaseManager.selectAllBindings();
    }

封装 Message ⽅法


    /*
     *   封装消息操作
     */
    public void createQueueFiles(String queueName) throws IOException {
        messageFileManger.createQueueFiles(queueName);
    }

    public void destroyQueueFiles(String queueName) throws IOException {
        messageFileManger.destroyQueueFiles(queueName);
    }


    public boolean checkFilesExists(String queueName) {
        return messageFileManger.checkFilesExists(queueName);
    }


    public void sendMessage(@NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {
        messageFileManger.sendMessage(msgQueue, message);
    }


    public void deleteMessage(@NotNull MSGQueue msgQueue, @NotNull Message message) throws IOException, ClassNotFoundException, MqException {
        messageFileManger.deleteMessage(msgQueue, message);
        // 删除一条信息之后, 判断是否需要 gc
        if (messageFileManger.checkGC(msgQueue.getName())) {
            messageFileManger.gc(msgQueue);
        }
    }

    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        return messageFileManger.loadAllMessageFromQueue(queueName);
    }
  • 在 deleteMessage 的时候判定是否进⾏ GC.

⼩结

通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据是存储在哪个部分的.

九、 内存数据结构设计

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.

对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.

创建 MemoryDataCenter

创建 mqserver.datacenter.MemoryDataCenter

/**
 * Created with IntelliJ IDEA.
 * Description:内存数据管理类 -- 实际消息转发/存储的类
 * 该类后续提供的一些方法, 可能会在多线程环境下使用, 因此需要注意线程安全的问题
 *
 * @author: zxj
 * @date: 2024-02-29
 * @time: 20:58:38
 */
@Slf4j
public class MemoryDataCenter {
    // key: 为 exchangeName, value: Exchange 对象
    private final ConcurrentHashMap<String, Exchange> exchangesMap = new ConcurrentHashMap<>();
    // key: 为 messageId, value: Message 对象
    private final ConcurrentHashMap<String, Message> messagesMap = new ConcurrentHashMap<>();
    // key: 为 exchangeName, value: Exchange 对象
    private final ConcurrentHashMap<String, MSGQueue> msgQueuesMap = new ConcurrentHashMap<>();
    // key1: exchangeName, key2: msgQueueName, value: Binding 对象
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();

}
  • 使用四个哈希表, 管理 Exchange, Queue, Binding, Message
  • 使用一个哈希表 + 链表管理队列 -> 消息之间的关系
  • 使用一个哈希表 + 哈希表管理所有的未被确认的消息

为了保证消息被正确消费了, 会使用两种方式进行确认, 自动 Ack, 和 手动 Ack
其中自动 Ack 是指当消息被消费之后, 就会立即被销毁释放
其中手动 Ack 是指当消息被消费之后, 由消费者主动调用一个 basicAck方法, 进行主动确认, 服务器收到这个确认之后, 才能真正被销毁消息
此处的 “未确认消息” 就是指在手动Ack模式下, 该消息还没有被调用 basicAck, 此时消息不能被删除, 但是要和其他未消费的消息区分开, 于是另搞个结构
当后续basicAck到了, 就可以删除消息了

封装 Exchange ⽅法


    /**
     * 封装 Exchange 操作
     **/
    public void insertExchange(Exchange exchange) {
        exchangesMap.put(exchange.getName(), exchange);
        log.info("新交换机添加成功! exchangeName: {}", exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangesMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangesMap.remove(exchangeName);
        log.info("删除交换机成功! exchangeName: {}", exchangeName);
    }

封装 Queue ⽅法


    /**
     * 封装 MSGQueue 操作
     **/
    public void insertMSGQueue(MSGQueue msgQueue) {
        msgQueuesMap.put(msgQueue.getName(), msgQueue);
        log.info("新交换机添加成功! msgQueueName: {}", msgQueue.getName());
    }

    public MSGQueue getMSGQueue(String msgQueueName) {
        return msgQueuesMap.get(msgQueueName);
    }

    public void deleteMSGQueue(String msgQueueName) {
        msgQueuesMap.remove(msgQueueName);
        log.info("删除交换机成功! msgQueueName: {}", msgQueueName);
    }

封装 Binding ⽅法


    /**
     * 封装 Binding 操作
     **/
    public void insetBinding(Binding binding) throws MqException {
        // 传统的创建 Map 的步骤, 因为不是原子性操作, 存在线程安全的问题, 为了保证线程安全, 可以加上 synchronized 加锁
        // ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.get(binding.getExchangeName()) ;
        // if (stringBindingConcurrentHashMap == null) {
        //     stringBindingConcurrentHashMap = new ConcurrentHashMap<>();
        //     bindingsMap.put(binding.getExchangeName(),stringBindingConcurrentHashMap);
        // }
        // ConcurrentHashMap 中有提供了 computeIfAbsent 方法, 简化了上述步骤, 并且是线程安全的 --
        // 先使用 exchangeName 查询一下, 如果存在就直接返回, 如果不存在就创建
        ConcurrentHashMap<String, Binding> stringBindingConcurrentHashMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {
            return new ConcurrentHashMap<>();
        });


        synchronized (stringBindingConcurrentHashMap) {
            // 这里先查询在插入, 具有强的顺序关系, 数据存在二次覆盖,  存在线程安全的问题
            // 在根据 msgQueueName 查找, 如果存在, 就直接抛异常, 不存在才能插入
            if (stringBindingConcurrentHashMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 绑定已经存在, exchangeName: " + binding.getExchangeName() +
                        "; msgQueueName: " + binding.getQueueName());
            }

            stringBindingConcurrentHashMap.put(binding.getQueueName(), binding);
        }

        log.info("绑定添加成功成功, binding.exchangeName: {}, binding.queueName: {},", binding.getExchangeName(), binding.getQueueName());
    }

    // 获取绑定
    // 1. 依据 exchangeName 和 queueName 获取唯一的 binding
    public Binding getBinding(String exchangeName, String queueName) {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null) {
            return null;
        }
        return bindingMap.get(queueName);
    }

    // 2. 依据 exchangeName 获取所有的 binding
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
        return bindingsMap.get(exchangeName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if (bindingMap == null) {
            // 该交换机没有绑定任何队列, 报错
            throw new MqException("[MemoryDataCenter] 绑定不存在! binding: " + binding);
        }

        bindingsMap.remove(binding.getExchangeName());
        log.info("删除绑定成功! binding: {}", binding);
    }

封装 Message ⽅法


    /**
     * 封装信息操作
     **/
    // 添加信息
    public void addMessage(Message message) {
        messagesMap.put(message.getMessageId(), message);
        log.info("添加信息成功! messageId: {}", message.getMessageId());
    }

    // 依据 Id 查询信息
    public Message getMessage(String messageId) {
        return messagesMap.get(messageId);
    }

    // 依据 Id 删除信息
    public void deleteMessage(String messageId) {
        messagesMap.remove(messageId);
        log.info("消息被删除! messageId: {}", messageId);
    }

    // 发送消息到指定队列
    public void sendMessage(MSGQueue queue, Message message) {
        // 把消息放到对应的队列数据结构中
        // 先根据队列的名字, 找到该队列对应的消息链表
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), (k) -> {
            return new LinkedList<>();
        });
        // 把数据假如到 messages 里面
        synchronized (messages) {
            messages.add(message);
        }
        // 在这里把该消息也往消息中心中插入一下, 假设如果 message 已经在消息中心存在, 重复插入也没有关系
        // 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器不会对 Message 内容修改 basicProperties 和 body)
        addMessage(message);
        log.info("消息被投递到队列中! messageId = " + message.getMessageId());
    }

    // 从队列中取消息
    public Message pollMessage(String queueName) {
        // 根据队列名, 查找一下, 对应的队列的消息链表
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        // 为空
        if (messages == null) {
            return null;
        }

        synchronized (messages) {
            // 队列中没有任何消息
            if (messages.isEmpty()) {
                return null;
            }
            // 链表中有元素, 就进行头删
            Message currentMessage = messages.remove(0);
            log.info("从消息从队列中取出! messageId: {}", currentMessage.getMessageId());
            return currentMessage;
        }
    }

    // 获取指定队列中消息的个数
    public int getMessageCount(String queueName) {
        // 根据队列名, 查找一下, 对应的队列的消息链表
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if (messages == null) {
            return 0;
        }

        synchronized (messages) {
            return messages.size();
        }
    }

针对未确认的消息的处理


    // 添加未确认的消息
    public void addMessageWaitAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {
            return new ConcurrentHashMap<>();
        });
        messageHashMap.put(message.getMessageId(), message);
        log.info("消息进入待确认队列! messageId: {}", message.getMessageId());
    }

    // 删除未确认的消息(消息已经确认了)
    public void removeMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAck.computeIfAbsent(queueName, (k) -> {
            return new ConcurrentHashMap<>();
        });
        messageHashMap.remove(messageId);
        log.info("消息从待确认队列中删除! messageId: {}", messageId);
    }


    // 获取指定的未确认的信息
    public Message getMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageConcurrentHashMap = queueMessageWaitAck.get(queueName);
        if (messageConcurrentHashMap == null) {
            return null;
        }
        return messageConcurrentHashMap.get(messageId);
    }

实现重启后恢复内存


    // 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 -- 交换机, 消息队列, 绑定, 消息
    public void recovery(DiskDataCenter diskDataCenter) throws MqException, IOException, ClassNotFoundException {
        // 0. 清空之前的所有数据
        exchangesMap.clear();
        msgQueuesMap.clear();
        bindingsMap.clear();
        messagesMap.clear();
        queueMessageMap.clear();
        // 1. 恢复所有的交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for (Exchange exchange : exchanges) {
            exchangesMap.put(exchange.getName(), exchange);
        }
        log.info("恢复所有的 交换机 数据成功!");
        // 2. 恢复所有的队列数据
        List<MSGQueue> msgQueues = diskDataCenter.selectAllMSGQueues();
        for (MSGQueue msgQueue : msgQueues) {
            msgQueuesMap.put(msgQueue.getName(), msgQueue);
        }
        log.info("恢复所有的 队列 数据成功!");
        // 3. 恢复所有的绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for (Binding binding : bindings) {
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) -> {
                return new ConcurrentHashMap<>();
            });
            bindingMap.put(binding.getQueueName(), binding);
        }
        log.info("恢复所有的 绑定 数据成功!");
        // 4. 恢复所有的消息队列
        //    遍历所有的队列, 根据每个队列的名字, 获取到所有的消息
        for (MSGQueue msgQueue : msgQueues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msgQueue.getName());
            queueMessageMap.put(msgQueue.getName(), messages);
            for (Message message : messages) {
                messagesMap.put(message.getMessageId(), message);
            }
        }
        log.info("恢复所有的 消息队列 成功!");

        log.info("从磁盘中恢复所有数据到内存成功");

        // 规定:
        // 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复, 之前考虑硬盘存储的时候, 也没有设定这一块
        // 这个消息在硬盘上存储的时候, 就是当做 "为被取走"

    }

测试 MemoryDataCenter

package en.edu.zxj.mq.mqserver.datacenter;

import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-02-29
 * @time: 23:07:51
 */
@SpringBootTest
class MemoryDataCenterTest {
    private MemoryDataCenter memoryDataCenter;

    @BeforeEach
    public void setUp() {
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    public void tearDown() {
        memoryDataCenter = null;
    }


    /**
     * 创建测试 消息 对象
     **/
    private @NotNull Message createTestMessage(@NotNull String content) {
        return Message.createMessageWithId("testRoutingKey", null, content.getBytes());
    }

    /**
     * 创建 消息队列 对象
     **/
    private @NotNull MSGQueue createTestMSGQueue(String queueName) {
        MSGQueue msgQueue = new MSGQueue();
        msgQueue.setName(queueName);
        msgQueue.setDurable(true);
        msgQueue.setAutoDelete(false);
        msgQueue.setExclusive(false);
        return msgQueue;
    }

    /**
     * 创建 绑定 对象
     **/
    private @NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {
        Binding binding = new Binding();
        binding.setExchangeName(exchangeName);
        binding.setQueueName(msgQueueName);
        binding.setBindingKey("Hello word");
        return binding;
    }


    /**
     * 创建交换机对象
     **/
    @org.jetbrains.annotations.NotNull
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.FANOUT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        exchange.setArguments("11", "aa");
        exchange.setArguments("22", "bb");

        return exchange;
    }


    @Test
    void testExchange() {
        // 对 交换机 相关操作统一测试
        Exchange exceptedExchange = createTestExchange("exchangeTest");
        // 1. 插入操作
        memoryDataCenter.insertExchange(exceptedExchange);
        // 2. 查找操作
        Exchange actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());
        // 此时 exceptedExchange 和 actualExchange 应该指向同一个空间才对
        Assertions.assertEquals(exceptedExchange, actualExchange);
        // 3. 删除操作
        memoryDataCenter.deleteExchange(exceptedExchange.getName());
        actualExchange = memoryDataCenter.getExchange(exceptedExchange.getName());
        // 判断是否为空
        Assertions.assertNull(actualExchange);
    }

    @Test
    void testMSGQueue() {
        // 对 消息队列 相关操作统一测试
        MSGQueue exceptedMSGQueue = createTestMSGQueue("testMSGQueue");
        // 1. 插入操作
        memoryDataCenter.insertMSGQueue(exceptedMSGQueue);
        // 2. 查找操作
        MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());
        Assertions.assertEquals(exceptedMSGQueue, actualMSGQueue);
        // 3. 删除操作
        memoryDataCenter.deleteMSGQueue(exceptedMSGQueue.getName());
        actualMSGQueue = memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());
        // 判断是否为空
        Assertions.assertNull(actualMSGQueue);

    }


    @Test
    void testBinding() throws MqException {
        // 对 绑定 相关操作的统一测试
        Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");
        // 1. 插入操作
        memoryDataCenter.insetBinding(exceptedBinding);
        // 2. 查找操作
        Binding actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());
        Assertions.assertEquals(exceptedBinding, actrualBinding);

        ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());
        Assertions.assertEquals(1, bindings.size());
        Assertions.assertEquals(exceptedBinding, bindings.get(exceptedBinding.getQueueName()));

        // 3. 删除操作
        memoryDataCenter.deleteBinding(exceptedBinding);
        actrualBinding = memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());
        bindings = memoryDataCenter.getBindings(exceptedBinding.getExchangeName());

        // 判断是否为空
        Assertions.assertNull(actrualBinding);
        Assertions.assertNull(bindings);
    }


    @Test
    void testMessage() {
        // 测试 消息相关的增删查 操作
        Message exceptedMessage = createTestMessage("testMessage");
        // 1. 插入操作
        memoryDataCenter.addMessage(exceptedMessage);
        // 2. 查找操作
        Message actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());
        Assertions.assertEquals(exceptedMessage, actualMessage);
        // 3. 删除操作
        memoryDataCenter.deleteMessage(exceptedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessage(exceptedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }


    @Test
    void sendMessage() {
        // 1. 创建一个队列, 创建十条消息, 把这些消息都插入到队列中
        MSGQueue queue = createTestMSGQueue("testQueue");
        List<Message> exceptedMessages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            memoryDataCenter.sendMessage(queue, message);
            exceptedMessages.add(message);
        }

        // 2. 从这个队列中取出这些消息
        List<Message> actualMessages = new ArrayList<>();
        while (true) {
            Message message = memoryDataCenter.pollMessage(queue.getName());
            if (message == null) {
                break;
            }
            actualMessages.add(message);
        }


        // 3. 比较取出的消息和之前的消息是否是一致的
        Assertions.assertEquals(exceptedMessages.size(), actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            Assertions.assertEquals(exceptedMessages.get(i), actualMessages.get(i));
        }

    }

    @Test
    void testMessageWaitAck() {
        // 测试 消息相关的增删查 操作
        Message exceptedMessage = createTestMessage("testMessage");
        // 1. 插入操作
        memoryDataCenter.addMessageWaitAck("testQueueName", exceptedMessage);
        // 2. 查找操作
        Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());
        Assertions.assertEquals(exceptedMessage, actualMessage);
        // 3. 删除操作
        memoryDataCenter.removeMessageWaitAck("testQueueName", exceptedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessageWaitAck("testQueueName", exceptedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

    @Test
    void recovery() throws IOException, MqException, ClassNotFoundException {

        MqApplication.context = SpringApplication.run(MqApplication.class);


        // 1. 在硬盘上构造好数据
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();
        // 构造交换机
        Exchange exceptedExchange = createTestExchange("testExchangeName");
        diskDataCenter.insertExchange(exceptedExchange);
        // 构造消息队列
        MSGQueue exceptedMSGQueue = createTestMSGQueue("testQueueName");
        diskDataCenter.insertMSGQueue(exceptedMSGQueue);
        // 构造绑定
        Binding exceptedBinding = createTestBinding("testExchangeName", "testMSGQueueName");
        diskDataCenter.insertBinding(exceptedBinding);
        // 构造消息
        Message exceptedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(exceptedMSGQueue, exceptedMessage);


        // 2. 执行恢复操作
        memoryDataCenter.recovery(diskDataCenter);

        // 3. 对比结果
        Exchange actualExchangeName = memoryDataCenter.getExchange("testExchangeName");
        Assertions.assertEquals(exceptedExchange.getName(), actualExchangeName.getName());
        Assertions.assertEquals(exceptedExchange.getType(), actualExchangeName.getType());
        Assertions.assertEquals(exceptedExchange.getDurable(), actualExchangeName.getDurable());
        Assertions.assertEquals(exceptedExchange.getAutoDelete(), actualExchangeName.getAutoDelete());


        MSGQueue actualMSGQueue = memoryDataCenter.getMSGQueue("testQueueName");
        Assertions.assertEquals(exceptedMSGQueue.getName(), actualMSGQueue.getName());
        Assertions.assertEquals(exceptedMSGQueue.getExclusive(), actualMSGQueue.getExclusive());
        Assertions.assertEquals(exceptedMSGQueue.getAutoDelete(), actualMSGQueue.getAutoDelete());
        Assertions.assertEquals(exceptedMSGQueue.getDurable(), actualMSGQueue.getDurable());

        Binding actualBinding = memoryDataCenter.getBinding("testExchangeName", "testMSGQueueName");
        Assertions.assertEquals(exceptedBinding.getExchangeName(), actualBinding.getExchangeName());
        Assertions.assertEquals(exceptedBinding.getBindingKey(), actualBinding.getBindingKey());
        Assertions.assertEquals(exceptedBinding.getQueueName(), actualBinding.getQueueName());

        Message actualMessage = memoryDataCenter.pollMessage("testQueueName");
        Assertions.assertEquals(exceptedMessage.getMessageId(), actualMessage.getMessageId());
        Assertions.assertEquals(exceptedMessage.getRoutingKey(), actualMessage.getRoutingKey());
        Assertions.assertEquals(exceptedMessage.getIsValid(), actualMessage.getIsValid());
        Assertions.assertEquals(exceptedMessage.getDeliverMode(), actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(exceptedMessage.getBody(), actualMessage.getBody());


        // 4. 清理磁盘
        MqApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);

    }
}

⼗、 虚拟主机设计

⾄此, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来

并且实现⼀些 MQ 的关键 API.

创建 VirtualHost

创建 mqserver.VirtualHost

/**
 * Created with IntelliJ IDEA.
 * Description:通过这个类, 来标识虚拟主机
 * 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息, 数据
 * 同时提供 API 供上层调用
 * 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了
 *
 * @author: zxj
 * @date: 2024-03-01
 * @time: 19:23:34
 */
@Getter
@Slf4j
public class VirtualHost {
    private final String virtualHostName;
    private final DiskDataCenter diskDataCenter = new DiskDataCenter();
    private final MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
    // Router 定义转发规则
    private final Router router = new Router();
    // ConsumerManager 实现消费逻辑
    private final ConsumerManager consumerManager = new ConsumerManager(this);
}
  • 其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 后续介绍

实现构造⽅法和 getter

构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化.

同时会把硬盘的数据恢复到内存中.

    public VirtualHost(String virtualHostName) {
        this.virtualHostName = virtualHostName;
        // 先初始化硬盘操作
        diskDataCenter.init();
        // 后初始化内存操作
        memoryDataCenter.init();

        // 从磁盘中恢复数据到内存中
        try {
            memoryDataCenter.recovery(diskDataCenter);
        } catch (Exception e) {
            log.error("从磁盘中恢复数据到内存失败!");
            e.printStackTrace();
            return;
        }

        log.info("初始化 VirtualHost 成功, VirtualHostName: {}", virtualHostName);
    }

创建交换机

  • 约定, 交换机 / 队列的名字, 都加上 VirtualHostName 作为前置, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了
  • exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回, 因此不叫做"exchangeCreate"
  • 先写硬盘, 后写内存, 因为硬盘失败概率更大, 如果硬盘写失败了,也就不必写内存了



    /**
     * @description: 创建交换机, declare 表示存在就不创建, 因此不叫做 "exchangeCreate"
     * 此处的 autoDelete, arguments 其实并没有使用, 只是先预留出来. (RabbitMQ 是支持的)
     * 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了
     * 先写磁盘, 后写内存, 因为写磁盘失败概率更大, 如果磁盘写失败了, 也就不必要写内存了
     * @param: [exchangeName, exchangeType, durable, autoDelete, arguments]
     * @return: 抛异常就返回 false, 正常执行逻辑就返回 true
     **/
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        // 真实的 exchangeName 要加上 virtualHostName 前缀
        exchangeName = virtualHostName + exchangeName;
        synchronized (lockerExchange) {

            try {
                // 1. 判断交换机是否存在
                Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);
                if (exsitsExchange != null) {
                    log.info("交换机已经存在, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);
                    return true;
                }

                // 2. 构造 Exchange 对象
                Exchange exchange = new Exchange();
                exchange.setName(exchangeName);
                exchange.setType(exchangeType);
                exchange.setDurable(durable);
                exchange.setAutoDelete(autoDelete);
                exchange.setArguments(arguments);

                // 3. 先写入磁盘
                if (exchange.getDurable()) {
                    diskDataCenter.insertExchange(exchange);
                }

                // 4. 后写入内存
                memoryDataCenter.insertExchange(exchange);

                log.info("交换机创建成功, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);

                return true;
            } catch (Exception e) {
                log.warn("创建交换机失败, exchangeName: {}, exchangeType: {}", exchangeName, exchangeType);
                e.printStackTrace();
                return false;
            }
        }
    }

删除交换机


    /**
     * @description: 删除交换机
     **/
    public boolean exchangeDelete(String exchangeName) {
        // 真正存储的 exchangeName
        exchangeName = virtualHostName + exchangeName;
        synchronized (lockerExchange) {
            try {
                // 1. 判断交换机是否存在
                Exchange exsitsExchange = memoryDataCenter.getExchange(exchangeName);
                if (exsitsExchange == null) {
                    throw new MqException("交换机不存在, 无法删除! exchangeName; " + exchangeName);
                }
                // 2. 删除磁盘中的交换机
                diskDataCenter.deleteExchangeByName(exchangeName);
                // 3. 删除内存中的交换机
                memoryDataCenter.deleteExchange(exchangeName);
                log.info("删除交换机成功, exchangeName: {},", exchangeName);
                return true;
            } catch (Exception e) {
                log.warn("删除交换机失败, exchangeName: {},", exchangeName);
                e.printStackTrace();
                return false;
            }
        }
    }

创建队列


    /**
     * @description: 创建队列
     **/
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
        // 真实的 queueName
        queueName = virtualHostName + queueName;
        synchronized (lockerQueue) {

            try {
                // 1. 判断队列是否存在
                MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);
                if (existsQueue != null) {
                    log.info("队列已经存在, queueName: {}", queueName);
                    return true;
                }
                // 2. 创建队列
                MSGQueue queue = new MSGQueue();
                queue.setName(queueName);
                queue.setDurable(durable);
                queue.setExclusive(exclusive);
                queue.setAutoDelete(autoDelete);
                queue.setArguments(arguments);
                // 3. 先存入磁盘
                if (queue.getDurable()) {
                    diskDataCenter.insertMSGQueue(queue);
                }
                // 4. 后存入到内存
                memoryDataCenter.insertMSGQueue(queue);
                log.info("队列创建成功, queueName: {}", queueName);
                return true;
            } catch (Exception e) {
                log.warn("创建队列失败, queueName: {}", queueName);
                e.printStackTrace();
                return false;
            }
        }
    }

删除队列


    /**
     * @description: 删除队列
     **/
    public boolean queueDelete(String queueName) {
        queueName = virtualHostName + queueName;
        synchronized (lockerQueue) {
            try {
                // 1. 判断队列是否存在
                MSGQueue existsQueue = memoryDataCenter.getMSGQueue(queueName);
                if (existsQueue == null) {
                    throw new MqException("要删除的队列不存在, 无法删除! queueName=" + queueName);
                }
                // 2. 删除磁盘中的队列
                if (existsQueue.getDurable()) {
                    diskDataCenter.deleteMSGQueueByName(queueName);
                }
                // 3. 删除内存中的队列
                memoryDataCenter.deleteMSGQueue(queueName);
                log.info("删除队列成功, queueName: {}", queueName);

                return true;
            } catch (Exception e) {
                log.warn("删除队列失败, queueName: {}", queueName);
                e.printStackTrace();
                return false;
            }
        }
    }

创建绑定

  • bindingKey 是进⾏ topic 转发时的⼀个关键概念. 使⽤ router 类来检测是否是合法的 bindingKey.
  • 后续再介绍 router.checkBindingKeyValid 的实现. 此处先留空

    /**
     * @description: 添加绑定
     **/
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
        // 加上 virtualHostName 前缀
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        synchronized (lockerQueue) {
            synchronized (lockerExchange) {
                try {
                    // 1. 判断绑定是否存在
                    Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (existedBinding != null) {
                        log.info("绑定存在, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);
                        return true;
                    }
                    // 2. 判断 bindingKey 是否合法
                    if (!router.checkBindingKeyValid(bindingKey)) {
                        throw new MqException("bindingKey 不合法! bindingKey: " + bindingKey);
                    }
                    // 3. 创建绑定
                    Binding binding = new Binding();
                    binding.setExchangeName(exchangeName);
                    binding.setQueueName(queueName);
                    binding.setBindingKey(bindingKey);
                    // 4. 获取绑定对应的队列和交换机, 判断这两个是否存在, 都存在才能创建
                    MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);
                    if (msgQueue == null) {
                        throw new MqException("队列不存在, queueName: " + queueName);
                    }
                    Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                    if (exchange == null) {
                        throw new MqException("交换机不存在, exchangeName: " + exchangeName);
                    }
                    // 5. 写入磁盘
                    if (msgQueue.getDurable() && exchange.getDurable()) {
                        diskDataCenter.insertBinding(binding);
                    }
                    // 6. 写入内存
                    memoryDataCenter.insetBinding(binding);
                    log.info("添加绑定成功, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);
                    return true;
                } catch (Exception e) {
                    log.warn("添加绑定失败, queueName: {}, exchangeName: {}, bindingKey: {}", queueName, exchangeName, bindingKey);
                    e.printStackTrace();
                    return false;
                }
            }
        }
    }

删除绑定


    /**
     * @description: 删除绑定
     **/
    public boolean queueUnBind(String queueName, String exchangeName) {
        // 加上 virtualHostName 前缀
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        synchronized (lockerExchange) {
            synchronized (lockerQueue) {

                try {
                    // 1. 判断绑定是否存在
                    Binding existedBinding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (existedBinding == null) {
                        throw new MqException("要删除的绑定不存在, 无法删除, exchangeName: " + exchangeName + " , queueName: " + queueName);
                    }
                    // 2. 无论绑定是否持久化了, 都试着删除一下磁盘中的数据, 影响不大
                    diskDataCenter.deleteBinding(exchangeName, queueName);
                    // 3. 删除内存
                    memoryDataCenter.deleteBinding(existedBinding);
                    log.info("删除绑定成功, queueName: {}, exchangeName: {}", queueName, exchangeName);
                    return true;
                } catch (Exception e) {
                    log.warn("删除绑定失败, queueName: {}, exchangeName: {}", queueName, exchangeName);
                    e.printStackTrace();
                    return false;
                }
            }
        }
    }

发布消息

  • 发布消息其实是吧消息发送给指定的Exchange, 在根据 Exchange 和 Queue 的binding关系, 转发到对应队列中
  • 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType是相关的
    • Direct: routingKey 就是对应的队列名字, 此时不需要binding关系, 也不需要bindingKey,就可以直接转发消息
    • Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定到该交换机上的所有队列中
    • Topic: routingKey 是一个特定的字符串, 会和bindingKey进行匹配, 如果匹配成功, 则发到对应的队列中, 具体规则后续介绍
  • BasicProperties 是消息的元消息, body是消息本体

    /**
     * @description: 发布消息
     * 发布消息其实就是把消息发送给指定的exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中
     * 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType 相关的
     * Direct: routingKey 就是对应的队列名字, 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息
     * Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定该交换机上的所有队列中
     * Topic: routingKey 是一个特定的字符串, 会和 bindingKey 按照一定规则进行匹配, 如果匹配成功, 则发送到对应的队列中, 具体规则在 Router 类中介绍
     * @param: [exchangeName, routingKey, basicProperties 消息的元消息, body 消息本体]
     * @return:
     **/
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
        try {
            // 1. 转换交换机的名字, 如果为 null, 就使用默认的交换机名字
            if (exchangeName == null) {
                exchangeName = "";
            }
            exchangeName = virtualHostName + exchangeName;

            // 2. 检验 routingKey 的合法性
            if (!router.checkRoutingKeyValid(routingKey)) {
                throw new MqException("routingKey 非法! routingKey: " + routingKey);
            }

            // 3. 查找交换机对象
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange == null) {
                throw new MqException("交换机不存在! exchangeName: " + exchangeName);
            }

            // 4. 依据交换机的类型来进行消息转发
            if (exchange.getType() == ExchangeType.DIRECT) {
                // 按照直接交换机的方式来转发消息
                // 此时 routingKey 作为队列的名字, 直接把消息写入指定的队里中, 此时可以无视绑定关系
                String queueName = virtualHostName + routingKey;
                // 5. 构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 6. 找找队列名字对应的对象
                MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);
                if (msgQueue == null) {
                    throw new MqException("队列不存在, queueName=" + queueName);
                }
                // 7. 队列存在, 直接给队列中写入消息 -- 执行一次方法就消费一次消息
                sendMessage(msgQueue, message);
            } else {
                // 按照 fanout 和 topic 的方式来转发
                // 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
                ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                    // ① 获取到绑定对象, 判断对应的队列是否存在
                    Binding binding = entry.getValue();
                    MSGQueue msgQueue = memoryDataCenter.getMSGQueue(binding.getQueueName());
                    if (msgQueue == null) {
                        // 此处就不抛异常, 可能此处有多个这样的队列
                        // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输
                        log.warn("basicPublish 发送消息是, 发现队列不存在! queueName: {}", binding.getQueueName());
                        continue;
                    }
                    // ② 构造消息对象, 发送给每一个队列的对象都是一个新的复制体
                    Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                    // ③ 判定这个消息是否能转发给改队列
                    //    如果是 fanout, 所有绑定的队列都是要转发的
                    //    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配
                    if (!router.route(exchange.getType(), binding, message)) {
                        continue;
                    }
                    // ④ 真正的转发消息给队列
                    sendMessage(msgQueue, message);
                }
            }
            log.info("发送信息成功, exchangeName: {}, routingKey: {}", exchangeName, routingKey);
            return true;
        } catch (Exception e) {
            log.warn("发送信息失败, exchangeName: {}, routingKey: {}", exchangeName, routingKey);
            e.printStackTrace();
            return false;
        }
    }

    /**
     * @description: 发送一次消息
     **/
    private void sendMessage(MSGQueue msgQueue, Message message) throws IOException, MqException, InterruptedException {
        // 此处发送消息, 就是把消息写入到 硬盘 和 内存 上
        int deliverMode = message.getDeliverMode();
        // deliverMode 为 1, 不持久化, 为 2 表示持久化
        if (deliverMode == 2) {
            diskDataCenter.sendMessage(msgQueue, message);
        }
        // 写入内存
        memoryDataCenter.sendMessage(msgQueue, message);

        // 通知消费者可以消费消息, 就是让消费者从对应的内存中取出消息
        consumerManager.notifyConsume(msgQueue.getName());
    }

路由规则

实现 mqserver.core.Router

  1. 实现 route ⽅法
    /**
     * @description: 路由选择
     * @param: [type 交换机类型, binding 绑定对象 -- 里面提取 routingKey, message 消息对象]
     * @return: 返回该交换机是否可以将该消息转发给绑定的队列中, true 表示可以, false 表示不可以
     **/
    public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {
        // 根据不同的转发类型来进行不同的转发逻辑
        if (type == ExchangeType.FANOUT) {
            // 如果是 FANOUT 类型, 该交换机上所有绑定的队列都需要进行转发
            return true;
        } else if (type == ExchangeType.TOPIC) {
            // 如果是 TOPIC 类型, 规则复杂
            return routeTopic(binding,message);
        } else {
            // 其他情况是不应该存在的
            throw new MqException("[Router] 交换机类型违法! type: " + type);
        }
    }
  1. 实现 checkRoutingKeyValid

    /**
     * @description: 判断 routingKey 是否合法
     * routingKey 组成规则如下:
     *      1. 组成: 数字, 字母, 下划线
     *      2. 使用符号 . 将 routingKey 划分成多个部分
     * 形如:
     *      aaa.bbb.ccc
     *      a.1.b
     *      a
     **/
    public boolean checkRoutingKeyValid(String routingKey) {
        if (!StringUtils.hasLength(routingKey)) {
            // null or 空字符串, 合法的情况, 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""
            return true;
        }
        for (int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            // 判断该字符是否是大写字母
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 判断字符是否是小写字母
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            // 判断该字符阿拉伯数字
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            // 判断字符是否 '_' 或 '.'
            if (ch == '.' || ch == '_') {
                continue;
            }

            // 走到这里, 都不是上述任何一种合法的情况, 就直接返回 false
            return false;
        }
        return true;
    }
  1. 实现 checkBindingKeyValid

在这里插入图片描述


    /**
     * @description: 判断 bindingKey 是否合法
     * bindingKey 组成规则如下:
     *      1. 组成: 数字, 字母, 下划线
     *      2. 使用符号 . 将 routingKey 划分成多个部分
     *      3. 支持两种特殊符号作为通配符 (* 和 # 必须是作为被 . 分割出来的独立的部分)
     *          1) *:  * 可以匹配任何一个独立的部分
     *          2) #:  # 监听匹配任何 0 个或者多个独立的部分
     * 形如:
     *      aaa.bbb.ccc
     *      a.1.b
     *      a
     *      #
     *      a.*.b
     **/
    public boolean checkBindingKeyValid(String bindingKey) {
        if (!StringUtils.hasLength(bindingKey)) {
            // null or 空字符串, 合法的情况, 比如在使用 direct / fanout 交换机的时候, bindingKey 用不上, 就可以设为 ""
            return true;
        }
        for (int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            // 判断该字符是否是大写字母
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 判断字符是否是小写字母
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            // 判断该字符阿拉伯数字
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            // 判断字符是否 '_' 或 '.''
            if (ch == '.' || ch == '_') {
                continue;
            }
            // 判断字符是否为通配符 '*' 或 '#'
            if (ch == '*' || ch == '#') {
                continue;
            }

            // 走到这里, 都不是上述任何一种合法的情况, 就直接返回 false
            return false;
        }

        // 检查 * 或者 # 是否是独立的部分
        // aaa.*.bbb 合法情况, aaa.a*.bbb 非法情况
        String[] words = bindingKey.split("\\.");
        for (String word : words) {
            // 检查 word 长度 > 1 并且包含了 * 或者 #, 就是非法的格式了
            if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
                return false;
            }
        }

        return true;
    }
  1. 实现 routeTopic
  • 使用动态规划的方式来进行规则的匹配

    private boolean routeTopic(@NotNull Binding binding, @NotNull Message message) {
        String bindingKey = binding.getBindingKey();
        String routingKey = message.getRoutingKey();
        String[] bindingStr = bindingKey.split("\\.");
        String[] routingStr = routingKey.split("\\.");
        return mate(bindingStr,routingStr);
    }

    private boolean mate(String @NotNull [] bindingStr, String @NotNull [] routingStr) {
        int m = bindingStr.length;
        int n = routingStr.length;
        boolean[][] dp = new boolean[m + 1][n + 1];
        dp[0][0] = true;
        for (int i = 0; i < m; i++) {
            if ("#".equals(bindingStr[i])) {
                dp[i+1][0] = true;
            } else {
                break;
            }
        }
        for (int i = 1; i <= m; i++) {
            String wordBinding = bindingStr[i - 1];
            for (int j = 1; j <= n; j++) {
                String wordRouting = routingStr[j - 1];
                if (!"#".equals(wordBinding) && !"*".equals(wordBinding)) {
                    if (wordBinding.equals(wordRouting)) {
                        dp[i][j] = dp[i - 1][j - 1];
                    } else {
                        dp[i][j] = false;
                    }
                } else if ("*".equals(wordBinding)) {
                    dp[i][j] = dp[i - 1][j - 1];
                } else {
                    dp[i][j] = dp[i - 1][j] || dp[i][j - 1];
                }
            }
        }

        return dp[m][n];
    }
  1. 测试 Router 的匹配规则
    测试代码如下:
package en.edu.zxj.mq.common;

import en.edu.zxj.mq.mqserver.core.Binding;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-03-02
 * @time: 19:32:53
 */
@SpringBootTest
class RouterTest {
    private Router router = new Router();
    private Binding binding = null;
    private Message message = null;

    @BeforeEach
    public void setUp() {
        binding = new Binding();
        message = new Message();
    }

    @AfterEach
    public void tearDown() {
        binding = null;
        message = null;
    }


    @Test
    void checkBindingKeyValid1() {
        boolean ok = router.checkBindingKeyValid("aaa.bbb.ccc");
        Assertions.assertTrue(ok);
    }


    @Test
    void checkBindingKeyValid2() {
        boolean ok = router.checkBindingKeyValid("1.a.c");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkBindingKeyValid3() {
        boolean ok = router.checkBindingKeyValid("a");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkBindingKeyValid4() {
        boolean ok = router.checkBindingKeyValid("");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkBindingKeyValid5() {
        boolean ok = router.checkBindingKeyValid("a.*.ccc");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkBindingKeyValid6() {
        boolean ok = router.checkBindingKeyValid("#");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkBindingKeyValid7() {
        boolean ok = router.checkBindingKeyValid("aaa.bb*b.ccc");
        Assertions.assertFalse(ok);
    }

    @Test
    void checkBindingKeyValid8() {
        boolean ok = router.checkBindingKeyValid("123.bbb.ccc");
        Assertions.assertTrue(ok);
    }


    @Test
    void checkRoutingKeyValid1() {
        boolean ok = router.checkRoutingKeyValid("a.b.c");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid2() {
        boolean ok = router.checkRoutingKeyValid("a.b._c");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid3() {
        boolean ok = router.checkRoutingKeyValid("a.b.c@");
        Assertions.assertFalse(ok);
    }

    @Test
    void checkRoutingKeyValid4() {
        boolean ok = router.checkRoutingKeyValid("a");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid5() {
        boolean ok = router.checkRoutingKeyValid("a.1.b");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid6() {
        boolean ok = router.checkRoutingKeyValid("12222222223123123");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid7() {
        boolean ok = router.checkRoutingKeyValid("aaaa");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid8() {
        boolean ok = router.checkRoutingKeyValid("_____________._______________");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid9() {
        boolean ok = router.checkRoutingKeyValid("!!!!!!!!!!!!!!");
        Assertions.assertFalse(ok);
    }

    @Test
    void checkRoutingKeyValid10() {
        boolean ok = router.checkRoutingKeyValid("a.2._.!");
        Assertions.assertFalse(ok);
    }

    @Test
    void checkRoutingKeyValid11() {
        boolean ok = router.checkRoutingKeyValid("_a_.1_a.b");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid12() {
        boolean ok = router.checkRoutingKeyValid("a.b.c.12.7.234.4234.adf.___");
        Assertions.assertTrue(ok);
    }

    @Test
    void checkRoutingKeyValid13() {
        boolean ok = router.checkRoutingKeyValid("123.468a.sdfa.w");
        Assertions.assertTrue(ok);
    }


    @Test
    void route1() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        boolean ok = router.route(ExchangeType.FANOUT, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route2() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route3() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route4() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertFalse(ok);
    }

    @Test
    void route5() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertFalse(ok);
    }

    @Test
    void route6() throws MqException {
        binding.setBindingKey("aaa.bbb.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route7() throws MqException {
        binding.setBindingKey("aaa.*");
        message.setRoutingKey("aaa.bbb");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route8() throws MqException {
        binding.setBindingKey("aaa.*.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertFalse(ok);
    }

    @Test
    void route9() throws MqException {
        binding.setBindingKey("*.aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertFalse(ok);
    }

    @Test
    void route10() throws MqException {
        binding.setBindingKey("#");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route11() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route12() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route13() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route14() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route15() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route16() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route17() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route18() throws MqException {
        binding.setBindingKey("aaa.#.#.#");
        message.setRoutingKey("aaa");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

    @Test
    void route19() throws MqException {
        binding.setBindingKey("aaa.#.#.#.*");
        message.setRoutingKey("aaa");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertFalse(ok);
    }

    @Test
    void route20() throws MqException {
        binding.setBindingKey("aaa.#.#.#.ccc");
        message.setRoutingKey("aaa.aaa.aaa.bbb.ccc");
        boolean ok = router.route(ExchangeType.TOPIC, binding, message);
        Assertions.assertTrue(ok);
    }

}

订阅消息

  1. 添加一个订阅者

    /**
     * @description: 订阅消息
     * 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者
     * @param: [consumerTag: 消费者的身份标识,
     * queueName: 订阅的队列名字,
     * autoAck: 消息被消费后的应当方式, true 为自动应当, false 为手动应答
     * consumer: 是一个回调函数, 此处类型设定成函数是接口, 这样后续调用 basicConsume 并且传递实参的时候, 就可以写成 lambda 样子了]
     * @return: true 表示订阅成功, false 表示订阅失败
     **/
    public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        // 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
            log.info("basicConsume 成功! consumerTag: {}, queueName: {}", consumerTag, queueName);
            return true;
        } catch (Exception e) {
            log.warn("basicConsume 失败! consumerTag: {}, queueName: {}", consumerTag, queueName);
            e.printStackTrace();
            return false;
        }
    }

Consumer 相当于⼀个回调函数. 放到 common.Consumer 中.
在这里插入图片描述

  1. 创建订阅者管理管理类

创建 mqserver.core.ConsumerManager

@Slf4j
public class ConsumerManager {
    // parent 用来记录虚拟主机
    private final VirtualHost parent;
    // 存放令牌的队列, 通过令牌来触发消费线程的消费操作
    // 使用一个阻塞队列来触发消息消费, 称为令牌队列, 每次有消息过来了, 都为队列中放一个令牌(也就是队列名), 让后消费者再去消费对应的队列信息
    // 作用: 令牌队列的设定, 避免搞出来太多线程, 否则就需要给每个队列都安排一个单独的线程了, 如果队列很多则开销就比较的了
    private final BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
    // 使用一个线程池用来执行消息的回调
    private final ExecutorService workerPool = Executors.newFixedThreadPool(4);

    // 扫描线程
    private Thread scannerThread = null;
}
  • parent 用来记录虚拟主机
  • 使用一个阻塞队列来触发信息消费,称为令牌队列, 每次有消息过来了, 都往队列中放一个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
  • 使用一个线程池用来执行消息回调

这样令牌队列的设定避免搞出来太多线程,否则需要给每个队列到安排一个单独的线程了,如果队列很多开销就比较大了.

  1. 添加令牌接⼝
    /**
     * @description: 通知消费者去消费消息
     **/
    public void notifyConsume(String name) throws InterruptedException {
        tokenQueue.put(name);
    }
  1. 实现添加订阅者

    /**
     * @description: 添加订阅者
     * 新来的消费者需要消费掉之前的消息
     **/
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 找到对应的队列
        MSGQueue queue = parent.getMemoryDataCenter().getMSGQueue(queueName);
        if (queue == null) {
            throw new MqException("[ConsumerManager] 队列不存在, queueName: " + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);

        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            // 如果当前队列中已经有了一些消息, 需要立即消费掉
            int n = parent.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一次消息
                consumeMessage(queue);
            }
        }
    }

创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境.

@Data
@Slf4j
public class ConsumerEnv {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}

MsgQueue 添加⼀个订阅者列表.

在这里插入图片描述

此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮流拿到消息.

  1. 实现扫描线程

在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作.

    public ConsumerManager(VirtualHost virtualHost) {
        this.parent = virtualHost;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
                    // 1. 拿到令牌
                    String queueName = tokenQueue.take();
                    // 2. 根据令牌找到对应的队列
                    MSGQueue msgQueue = parent.getMemoryDataCenter().getMSGQueue(queueName);
                    if (msgQueue == null) {
                        throw new MqException("获取令牌后, 发现队列为空! queueName: " + queueName);
                    }
                    // 3. 从队列中消费一次消息
                    consumeMessage(msgQueue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scannerThread.start();
    }
  1. 实现消费消息
    所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.

    /**
     * @description: 消费一次消息
     **/
    private void consumeMessage(MSGQueue queue) {
        // 1. 按照轮询的方式, 取出消费者
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if (luckyDog == null) {
            // 暂时还没有消费者, 就暂时不消费
            return;
        }
        // 2. 从指定队列中取出一个元素
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null) {
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行
        workerPool.submit(() -> {
            try {
                // 1. 把消息放到待确认的集合中, 这个操作势必在执行回调之前
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                // 2. 真正执行回调操作
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());
                // 3. 如果当前是 "自动应答", 就可以直接报消息给删除了
                //    如果当前是 "手动应答", 则先不处理, 交给后续消费调用 basicAck 方法来处理
                if (luckyDog.isAutoAck()) {
                    // ① 先删除磁盘上的消息
                    if (message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    // ② 删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // ③ 删除内存中消息
                    parent.getMemoryDataCenter().deleteMessage(message.getMessageId());

                    log.info("消息被消费成功, queueName: {}", queue.getName());

                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.

⼩结

一、订阅者已经存在了, 才发送消息

  1. 这种直接获取队列的订阅者,从中按照轮询的方式挑一个消费者来调用回调即可
  2. 消息先发送到队列了,订阅者还没到,此时当订阅者到达,就快速把指定队列中的消息全部消费掉。

⼆. 关于消息不丢失的论证

每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.

  • 如果 autoAck 为 true
    消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼
  • 如果 autoAck 为 false
    在回调内部, 进⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
  1. 执⾏消息回调的时候抛出异常

此时消息仍然处在待确认队列中
此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.

  1. 执⾏消息回调的时候服务器宕机
    内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.

消息确认

下列⽅法只是⼿动应答的时候才会使⽤.

应答成功, 则把消息删除掉.


    /**
     * @description: 消息确认
     **/
    public boolean basicAck(String queueName, String messageId) {
        queueName = virtualHostName + queueName;
        try {
            // 1. 获取到消息和队列
            Message message = memoryDataCenter.getMessage(messageId);
            if (message == null) {
                throw new MqException("要确认的消息不存在, messageId: " + messageId);
            }
            MSGQueue msgQueue = memoryDataCenter.getMSGQueue(queueName);
            if (msgQueue == null) {
                throw new MqException("要确认的队列不存在, queueName: " + queueName);
            }
            // 2. 删除硬盘上的数据
            if (message.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(msgQueue,message);
            }
            // 3. 删除消息中心中心的数据
            memoryDataCenter.deleteMessage(messageId);
            // 4. 删除待确认的集合中的数据
            memoryDataCenter.removeMessageWaitAck(queueName,messageId);
            log.info("basicAck 成功, 消息确认成功! queueName: {}, messageId: {}",queueName,messageId);
            return true;
        } catch (Exception e) {
            log.warn("basicAck 失败, 消息确认失败! queueName: {}, messageId: {}",queueName,messageId);
            e.printStackTrace();
            return false;
        }
    }

测试 VirtualHost

package en.edu.zxj.mq.mqserver;

import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-03-02
 * @time: 21:45:01
 */
@SpringBootTest
class VirtualHostTest {

    private VirtualHost virtualHost = null;

    @BeforeEach
    public void setUp() {
        MqApplication.context = SpringApplication.run(MqApplication.class);
        virtualHost = new VirtualHost("default");
    }

    @AfterEach
    public void tearDown() throws IOException {
        MqApplication.context.close();
        virtualHost = null;
        // 把硬盘的目录删除掉
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }


    @Test
    void exchangeDeclare() {
        boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
    }

    @Test
    void exchangeDelete() {
        boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.exchangeDelete("testExchangeName");
        Assertions.assertTrue(ok);


        ok = virtualHost.exchangeDelete("testExchangeName");
        Assertions.assertFalse(ok);
    }

    @Test
    void queueDeclare() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("a", 1);
        arguments.put("b", 2);
        boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);
        Assertions.assertTrue(ok);

    }

    @Test
    void queueDelete() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("a", 1);
        arguments.put("b", 2);
        boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDelete("testQueueName");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueBind() {
        boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("a", 1);
        arguments.put("b", 2);
        ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueUnBind() {
        boolean ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("a", 1);
        arguments.put("b", 2);
        ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueUnBind("testQueueName","testExchangeName");
        Assertions.assertTrue(ok);
    }

    @Test
    void basicPublish() {
        boolean ok = virtualHost.basicPublish("testExchangeName", "testRoutingKey", new BasicProperties(), "Hello word".getBytes());
        Assertions.assertFalse(ok);


        ok = virtualHost.exchangeDeclare("testExchangeName", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("a", 1);
        arguments.put("b", 2);
        ok = virtualHost.queueDeclare("testQueueName", true, false, false, arguments);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueueName", "testExchangeName", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = virtualHost.basicPublish("testExchangeName", "testQueueName", new BasicProperties(), "Hello word".getBytes());
        Assertions.assertTrue(ok);
    }

    // 先订阅队列, 后发送消息
    @Test
    void basicConsume1() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);
        // 先订阅队列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                try {
                    // 消费者自身设定的回调方法
                    System.out.println("messageId: " + basicProperties.getMessageId());
                    System.out.println("body = " + new String(body,0,body.length));

                    Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());
                    Assertions.assertEquals(1,basicProperties.getDeliverMode());
                    Assertions.assertArrayEquals("hello".getBytes(),body);

                } catch (Error e) {
                    //断言如果失败, 抛出的是 Error, 而不是 Exception
                    e.printStackTrace();
                }
            }
        });
        Assertions.assertTrue(ok);
        Thread.sleep(500);

        // 在发送消息
        ok  = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }


    // 先发送消息, 后订阅队列
    @Test
    void basicConsume2() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueueName", true, false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchangeName",ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);

        // 先发送消息
        ok  = virtualHost.basicPublish("testExchangeName","testQueueName",null,"hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 后订阅队列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueueName", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                try {
                    // 消费者自身设定的回调方法
                    System.out.println("messageId: " + basicProperties.getMessageId());
                    System.out.println("body = " + new String(body,0,body.length));

                    Assertions.assertEquals("testQueueName",basicProperties.getRoutingKey());
                    Assertions.assertEquals(1,basicProperties.getDeliverMode());
                    Assertions.assertArrayEquals("hello".getBytes(),body);

                } catch (Error e) {
                    //断言如果失败, 抛出的是 Error, 而不是 Exception
                    e.printStackTrace();
                }
            }
        });
        Assertions.assertTrue(ok);
        Thread.sleep(500);
    }
    @Test
    void basicAck() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先发送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再订阅队列 [要改的地方, 把 autoAck 改成 false]
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消费者自身设定的回调方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);

                // [要改的地方, 新增手动调用 basicAck]
                boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
                Assertions.assertTrue(ok);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }


    @Test
    public void testBasicConsumeFanout() throws InterruptedException {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.queueBind("testQueue1", "testExchange", "");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.queueBind("testQueue2", "testExchange", "");
        Assertions.assertTrue(ok);

        // 往交换机中发布一个消息
        ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 两个消费者订阅上述的两个队列.
        ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeTopic() throws InterruptedException {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
        Assertions.assertTrue(ok);

        ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }
}

⼗⼀、 ⽹络通信协议设计

明确需求

接下来需要考虑客⼾端和服务器之间的通信. 回顾交互模型.
在这里插入图片描述

⽣产者和消费者都是客⼾端, 都需要通过⽹络和 Broker Server 进⾏通信.

此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服务器这边功能的远程调⽤.

要调⽤的功能有:

  • 创建 channel
  • 关闭 channel
  • 创建 exchange
  • 删除 exchange
  • 创建 queue
  • 删除 queue
  • 创建 binding
  • 删除 binding
  • 发送 message
  • 订阅 message
  • 发送 ack
  • 返回 message (服务器 -> 客⼾端)

设计应⽤层协议

使⽤⼆进制的⽅式设定协议.

因为 Message 的消息体本⾝就是⼆进制的. 因此不太⽅便使⽤ json 等⽂本格式的协议.

请求:

在这里插入图片描述

响应:
在这里插入图片描述
其中 type 表⽰请求响应不同的功能. 取值如下:

  • 0x1 创建 channel
  • 0x2 关闭 channel
  • 0x3 创建 exchange
  • 0x4 销毁 exchange
  • 0x5 创建 queue
  • 0x6 销毁 queue
  • 0x7 创建 binding
  • 0x8 销毁 binding
  • 0x9 发送 message
  • 0xa 订阅 message
  • 0xb 返回 ack
  • 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的

其中 payload 部分, 会根据不同的 type, 存在不同的格式.

对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息.

对于响应来说, payload 表⽰这次⽅法调⽤的返回值.

定义 Request / Response

创建 common.Request

/**
 * Created with IntelliJ IDEA.
 * Description:定义了请求的格式
 * 一个完整的请求, 分成了三个部分
 * 1. type: 表示请求不同的功能, 调用不同的函数 -- 4 个字节
 * 2. length: 表示 payload 的长度  -- 4 个字节
 * 3. payload: 要传输的二进制数据  -- length 个字节
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 21:16:58
 */
@Data
public class Request implements Serializable {
    private Integer type;
    private Integer length;
    private byte[] payload;

    @Override
    public String toString() {
        return "Request{" +
                "type=" + type +
                ", length=" + length +
                '}';
    }
}

创建 common.Response

/**
 * Created with IntelliJ IDEA.
 * Description: 定义一个完整的响应格式
 * 一个完整的响应, 分成了三个部分
 * 1. type: 表示响应不同的功能, 调用不同的函数 -- 4 个字节
 * 2. length: 表示 payload 的长度  -- 4 个字节
 * 3. payload: 要传输的二进制数据  -- length 个字节
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 21:16:46
 */
@Data
public class Response implements Serializable {
    private Integer type;
    private Integer length;
    private byte[] payload;

    @Override
    public String toString() {
        return "Response{" +
                "type=" + type +
                ", length=" + length +
                '}';
    }
}

定义参数⽗类

构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.

不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现.

common.BasicArguments

/**
 * Created with IntelliJ IDEA.
 * Description:定义请求父类
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 21:31:01
 */
@Data
public class BasicArguments implements Serializable {
    // 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上
    // 此处的 rid 和 channelId 都是基于 UUID 来生成的, rid 用来标识一个请求-响应, 这一点在请求响应非常多的时候游泳
    protected String rid;
    protected String channelId;
}

  • 此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要.

定义返回值⽗类

和参数同理, 也需要构造⼀个类表⽰返回值, 作为 Response 的 payload

common.BasicReturns

/**
 * Created with IntelliJ IDEA.
 * Description:定义返回的父类
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 21:43:23
 */
@Data
public class BasicReturns implements Serializable {
    // 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上
    protected String rid;
    // 用来标识一个 channel
    protected String channelId;
    protected Boolean ok;
}

定义其他参数类

针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数.

  1. ExchangeDeclareArguments
/**
 * Created with IntelliJ IDEA.
 * Description:ExchangeDeclare 方法请求参数类
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 21:46:53
 */
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private ExchangeType exchangeType;
    private Boolean durable;
    private Boolean autoDelete;
    private Map<String,Object> arguments;
}

⼀个创建交换机的请求, 形如:

  • 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.
  • 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象.

在这里插入图片描述
2) ExchangeDeleteArguments

@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
    private String exchangeName;
}
  1. QueueDeclareArguments
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {
    private String queueName;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;
    private Map<String, Object> arguments;
}
  1. QueueDeleteArguments
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {
    private String queueName;
}
  1. QueueBindArguments
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String exchangeName;
    private String bindingKey;
}
  1. QueueUnbindArguments
@Data
public class QueueUnBindArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String exchangeName;
}

  1. BasicPublishArguments
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String routingKey;
    private BasicProperties basicProperties;
    private byte[] body;
}
  1. BasicConsumeArguments
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
}
  1. SubScribeReturns
  • 这个不是参数, 是返回值. 是服务器给消费者推送的订阅消息.
  • consumerTag 其实是 channelId.
  • basicProperties 和 body 共同构成了 Message.
/**
 * Created with IntelliJ IDEA.
 * Description:返回值, 是服务器给消费者推送的订阅消息.
 *
 * @author: zxj
 * @date: 2024-03-05
 * @time: 22:54:08
 */
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {
    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;
}

⼗⼆、 实现 BrokerServer

创建 BrokerServer 类

@Data
@Slf4j
public class BrokerServer {
    // 当前程序只考虑一个虚拟主机的情况
    private VirtualHost virtualHost = new VirtualHost("default-virtualHost");
    // 使用这个 哈希表, 表示当前的所有会话(也就是说有哪些客户端正在和服务器进行通信)
    // key 为 channelId, value 为 channel 对应的 socket 对象
    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();

    private ServerSocket serverSocket;
    // 引入一个线程池, 来处理多个客户端的需求
    private ExecutorService executorService;
    // 引入一个 boolean 变量控制服务器是否继续运行
    private volatile boolean runnable = true;
}
  • virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
  • sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket
  • serverSocket 是服务器⾃⾝的 socket
  • executorService 这个线程池⽤来处理响应
  • runnable 这个标志位⽤来控制服务器的运⾏停⽌.

启动/停⽌服务器

  • 这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的
  • 实现停⽌操作, 主要是为了⽅便后续开展单元测试.


    public BrokerServer(int port) throws IOException {
        serverSocket = new ServerSocket(port);
    }

    // begin: 单纯的 TCP 服务器模板
    public void start() {
        log.info("[BrokerServer] 服务器开始启动");
        executorService = Executors.newCachedThreadPool();
        try {
            while (runnable) {
                Socket clientSocket = serverSocket.accept();
                // 把这个处理连接的逻辑对给线程池
                executorService.submit(() -> {
                    // 处理连接的统一方法
                    processConnection(clientSocket);
                });
            }
        } catch (SocketException e) {
            log.info("[BrokerServer] 服务器停止运行!");
        } catch (IOException e) {
            log.error("[BrokerServer] 服务器出现异常!");
            e.printStackTrace();
        }
    }

    /**
     * @description: 一般来说, 停止服务器, 都是 kill 对应的进程就可以了
     * 此处还是搞一个单独的停止方法, 主要是用于后续的单元测试
     **/
    public void stop() throws IOException {
        runnable = false;
        // 把线程池中的人物都放弃了, 让线程都销毁
        executorService.shutdownNow();
        serverSocket.close();
    }
    // end: 单纯的 TCP 服务器模板

实现处理连接

  • 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
    • 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
    • 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException
    /**
     * @description: 服务方法
     * 通过这个方法, 来处里一个客户端的连接
     * 在这个连接中, 可能会涉及到多个请求和响应
     **/
    private void processConnection(@NotNull Socket clientSocket) {
        // 获取服务对象的 输入输出 流
        try (InputStream inputStream = clientSocket.getInputStream();
             OutputStream outputStream = clientSocket.getOutputStream()) {
            // 这里需要按照特定的格式来读取并解析, 此时就需要用到 DataInputStream 和 DataOutputStream
            try (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                // 循环进行服务, 保持连接, 以便处理多个请求
                while (true) {
                    // 1. 读取请求并解析
                    Request request = readRequest(dataInputStream);
                    log.info("接收到[client: {} : {}] 请求: {}",clientSocket.getInetAddress(),clientSocket.getPort(),request);
                    // 2. 根据请求计算响应
                    Response response = process(request, clientSocket);
                    log.info("响应给[client: {} : {}] 数据: {}",clientSocket.getInetAddress(),clientSocket.getPort(),response);
                    // 3. 把响应写回给客户端
                    writeResponse(dataOutputStream, response);
                }
            }
        } catch (EOFException | SocketException e) {
            log.info("connection 关闭! 客户端地址: {} : {}", clientSocket.getInetAddress(), clientSocket.getPort());
        } catch (IOException | MqException | ClassNotFoundException e) {
            log.error("connection 出现异常 e: {}", e.toString());
            e.printStackTrace();
        } finally {
            try {
                // 当连接处理完了, 一定要关闭 socket
                clientSocket.close();
                // 一个 TCP 连接中, 可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 也顺便清理掉
                clearClosedSession(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

实现 readRequest


    /**
     * @description: 反序列化请求消息
     **/
    private @NotNull Request readRequest(@NotNull DataInputStream dataInputStream) throws IOException, MqException {
        Request request = new Request();
        request.setType(dataInputStream.readInt());
        request.setLength(dataInputStream.readInt());
        byte[] payload = new byte[request.getLength()];
        int n = dataInputStream.read(payload);
        if (n != request.getLength()) {
            throw new MqException("读取请求格式出错");
        }
        request.setPayload(payload);
        return request;
    }

实现 writeResponse

  • 注意这⾥的 flush 操作很关键, 否则响应不⼀定能及时返回给客⼾端
    /**
     * @description: 将 Response 对象中的内容先后写入 dataOutputStream 中
     **/
    private void writeResponse(@NotNull DataOutputStream dataOutputStream, Response response) throws IOException {
        log.info("{writeResponse}: 即将发送响应为: {}",response);
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
        // 刷新缓冲区十分重要
        dataOutputStream.flush();
    }

实现处理请求

  • 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
  • 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
  • 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
  • 最后构造成统⼀的响应.

    /**
     * @description: 依据 request 中的信息, 执行相关方法, 并构造 Response 对象返回
     **/
    private @NotNull Response process(@NotNull Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1. 把 request 中的 payload 做一个初步的解析, 让父类来接受
        BasicArguments basicArguments = (BasicArguments) BinaryUtils.fromBytes(request.getPayload());
        log.info("request 中 payload 解析结果:  rid = {}, channelId = {}", basicArguments.getRid(),basicArguments.getChannelId());
        // 2. 根据 type 的值, 来近一步来区分这一次请求时要干啥的
        boolean ok = true; // 各个方法的返回结果基本都是 boolean
        if (request.getType() == 0x1) {
            // 创建 channel
            sessions.put(basicArguments.getChannelId(), clientSocket);
            log.info("创建 channel 完成! channelId: {}", basicArguments.getChannelId());
        } else if (request.getType() == 0x2) {
            // 销毁 channel
            sessions.remove(basicArguments.getChannelId());
            log.info("销毁 channel 完成! channelId: {}", basicArguments.getChannelId());
        } else if (request.getType() == 0x3) {
            // 创建交换机, 此时的 payload 就是 ExchangeDeclareArguments 对象了
            ExchangeDeclareArguments exchangeDeclareArguments = (ExchangeDeclareArguments) basicArguments;
            ok = virtualHost.exchangeDeclare(exchangeDeclareArguments.getExchangeName(), exchangeDeclareArguments.getExchangeType(), exchangeDeclareArguments.getDurable(), exchangeDeclareArguments.getAutoDelete(), exchangeDeclareArguments.getArguments());
        } else if (request.getType() == 0x4) {
            // 销毁交换机
            ExchangeDeleteArguments exchangeDeleteArguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(exchangeDeleteArguments.getExchangeName());
        } else if (request.getType() == 0x5) {
            // 创建队列
            QueueDeclareArguments queueDeclareArguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(queueDeclareArguments.getQueueName(), queueDeclareArguments.isDurable(), queueDeclareArguments.isExclusive(), queueDeclareArguments.isAutoDelete(), queueDeclareArguments.getArguments());
        } else if (request.getType() == 0x6) {
            // 销毁队列
            QueueDeleteArguments queueDeleteArguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete(queueDeleteArguments.getQueueName());
        } else if (request.getType() == 0x7) {
            // 创建绑定
            QueueBindArguments queueBindArguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(queueBindArguments.getQueueName(), queueBindArguments.getExchangeName(), queueBindArguments.getBindingKey());
        } else if (request.getType() == 0x8) {
            // 删除绑定
            QueueUnBindArguments queueUnBindArguments = (QueueUnBindArguments) basicArguments;
            ok = virtualHost.queueUnBind(queueUnBindArguments.getQueueName(), queueUnBindArguments.getExchangeName());
        } else if (request.getType() == 0x9) {
            // 发布消息
            BasicPublishArguments basicPublishArguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(basicPublishArguments.getExchangeName(), basicPublishArguments.getRoutingKey(), basicPublishArguments.getBasicProperties(), basicPublishArguments.getBody());
        } else if (request.getType() == 0xa) {
            // 订阅消息
            BasicConsumeArguments basicConsumeArguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(basicConsumeArguments.getConsumerTag(), basicConsumeArguments.getQueueName(), basicConsumeArguments.isAutoAck(), new Consumer() {
                /**
                 * 这个回调函数要做的工作, 就是把服务收到消息直接推送会给对应的消费者客户端即可, 在客户端进行对消息的消费
                 **/
                @Override
                public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                    // 先知道当前这个收到的消息, 要发给哪个客户端
                    // 此处 consumerTag, 其实就是 channelId (这里是规定的, 客户端填写该字段的时候, 就是以 channelId 来填写的),
                    // 根据 channelId 去 sessions 中查询, 就可以得到对应的 socket 对象, 就可以往里面发送数据了
                    // 1. 根据 channelId 找到 socket 对象
                    Socket clientSocket = sessions.get(consumerTag);
                    if (clientSocket == null || clientSocket.isClosed()) {
                        throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                    }

                    // 2. 构造响应数据
                    SubScribeReturns subScribeReturns = new SubScribeReturns();
                    subScribeReturns.setChannelId(consumerTag);
                    subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应, rid 暂时不需要
                    subScribeReturns.setOk(true);
                    subScribeReturns.setConsumerTag(consumerTag);
                    subScribeReturns.setBasicProperties(basicProperties);
                    subScribeReturns.setBody(body);
                    byte[] payload = BinaryUtils.toBytes(subScribeReturns);

                    Response response = new Response();
                    // 0xc 表示服务器给消费者客户端推送消息数据
                    response.setType(0xc);
                    // response 的 payload 就是一个 SubScribeReturns
                    response.setLength(payload.length);
                    response.setPayload(payload);

                    // 3. 把数据写回给客户端
                    //      注意! 此处的 dataOutputStream 不能close!
                    //      如果把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 OutputStream 也关闭了
                    //      此时就无法继续往 socket 中写入后续数据了
                    DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                    writeResponse(dataOutputStream, response);

                }
            });
        } else if (request.getType() == 0xb) {
            // 确认应答, 消费者确认收到消息
            BasicAckArguments basicAckArguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(basicAckArguments.getQueueName(), basicAckArguments.getMessageId());
        } else {
            // 当前的 type 是非法的
            throw new MqException("未知的 type: " + request.getType());
        }

        // 3. 构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryUtils.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);

        log.info("构造响应完成: {}", response);

        return response;
    }

实现 clearClosedSession

  • 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.
  • 注意迭代器失效问题.

    /**
     * @description: 用户关闭连接后, 清理对应的 channel 资源
     * 需要注意的是 迭代器失效的问题
     **/
    private void clearClosedSession(Socket clientSocket) {
        // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该关闭的 socket 对应的键值对, 统统删除
        List<String> toDeleteChannelId = new ArrayList<>();
        for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
            if (entry.getValue() == clientSocket) {
                // 不能在这里直接删除!!
                // 这属于使用集合类的一个大忌 -- 一边遍历, 一边删除!
                // sessions.remove(entry.getKey());
                toDeleteChannelId.add(entry.getKey());
            }
        }

        for (String channelId : toDeleteChannelId) {
            sessions.remove(channelId);
        }

        log.info("清理 session 完成! 被清理的 channelId: {}", toDeleteChannelId);
    }

⼗三、 实现客⼾端

创建包 mqclient

创建 ConnectionFactory

⽤来创建连接的⼯⼚类

/**
 * Created with IntelliJ IDEA.
 * Description:工厂类 -- 以工厂模式来创建 Connection 类
 *
 * @author: zxj
 * @date: 2024-03-06
 * @time: 21:55:32
 */
@Data
public class ConnectionFactory {
    // BrokerServer 的 IP 和 port
    private String host;
    private Integer port;

    // more ...

    // 建立一个 TCP 连接
    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host,port);
        return connection;
    }
}

Connection 和 Channel 的定义

⼀个客⼾端可以创建多个 Connection.

⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.

⼀个 Connection 可以包含多个 Channel

  1. Connection 的定义
@Data
@Slf4j
public class Connection {
    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;

    // 记录当前 Connection 包含的 Channel
    private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    // 执行消息回调的线程池
    private ExecutorService callbackPool = null;
}    
  • Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
  • channelMap ⽤来管理该连接中所有的 Channel.
  • callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.
  1. Channel 的定义
@Data
public class Channel {
    // channelId 为 channel 的身份标识, 使用 UUID 标识
    private String channelId;
    // connection 为 channel 对应的连接
    private Connection connection;
    // key 为 rid, 即 requestId / responseId
    // basicReturnsMap 用来保存响应的返回值, 放到这个哈希表中方便和请求匹配
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    // 订阅消息的回调 -- 为消费者的回调(用户注册的), 对应消息响应, 应该调用这个回调处理消息
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }
}    
  • channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
  • Connection 为 channel 对应的连接.
  • baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
  • consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.

封装请求响应读写操作

在 Connection 中, 实现下列⽅法


    /**
     * @description: 读取响应
     **/
    public Response readResponse() throws IOException, MqException {
        log.info("客户端: 开始等待读取消息");

        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());

        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if (n != response.getLength()) {
            throw new MqException("读取的响应数据不完整");
        }
        response.setPayload(payload);

        log.info("收到响应: type: {}, length: {}", response.getType(), response.getLength());

        return response;
    }

    /**
     * @description: 写请求
     **/
    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();

        log.info("发送请求:type: {}, length: {}", request.getType(), request.getLength());
    }

创建 channel

在 Connection 中, 定义下列⽅法来创建⼀个 channel


    public Channel createChannel() throws IOException {
        // 使用 UUID 生产 channelId, 以 C- 开头
        String channelId = "C-" + UUID.randomUUID().toString();
        Channel channel = new Channel(channelId, this);
        // 这里需要先把 channel 键值放到 Map 中 进行管理
        channelMap.put(channelId, channel);
        // 同时也需要把 "创建 channel" 的这个消息也告诉服务器
        boolean ok = channel.createChannel();
        if (!ok) {
            // 服务器这里创建失败了, 整个这次创建 channel 操作不顺利
            // 把刚才已经加入 hash 表的键值对, 再删了
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

发送请求

通过 Channel 提供请求的发送操作.

  1. 创建 channel

    /**
     * @description: 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了
     **/
    public boolean createChannel() throws IOException {
        // 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象
        BasicArguments basicArguments = new BasicAckArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryUtils.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        // 构造出完整请求之后, 就可以发送这个请求
        connection.writeRequest(request);

        // 等待服务器的响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());

        return basicReturns.getOk();
    }

generateRid 的实现

    private @NotNull String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }

waitResult 的实现

  • 由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果

    /*
     * @description: 期望使用这个方法来阻塞等待服务器的响应
     **/
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            // 如果查询结果为 null, 说明包裹还没有回来
            // 此时就需要阻塞等待
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return basicReturns;
    }

关闭 channel


    /**
     * @description: 关闭 channel, 给服务器发送一个 0x2 类型的请求
     **/
    public boolean close() throws IOException {
        BasicArguments basicArguments = new BasicAckArguments();
        basicArguments.setRid(generateRid());
        basicArguments.setChannelId(channelId);
        byte[] payload = BinaryUtils.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());

        return basicReturns.getOk();
    }

创建交换机


    /**
     * @description: 创建交换机
     **/
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryUtils.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.getOk();
    }

删除交换机


    /**
     * @description: 删除交换机
     **/
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
        exchangeDeleteArguments.setExchangeName(exchangeName);
        exchangeDeleteArguments.setChannelId(channelId);
        exchangeDeleteArguments.setRid(generateRid());
        byte[] payload = BinaryUtils.toBytes(exchangeDeleteArguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());
        return basicReturns.getOk();
    }

创建队列


    /**
     * @description: 创建队列
     **/
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setRid(generateRid());
        byte[] payload = BinaryUtils.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.getOk();
    }

删除队列


    /**
     * @description: 删除队列
     **/
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);
        byte[] payload = BinaryUtils.toBytes(queueDeleteArguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.getOk();
    }

创建绑定


    /**
     * @description: 创建绑定
     **/
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setBindingKey(bindingKey);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        byte[] payload = BinaryUtils.toBytes(queueBindArguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueBindArguments.getRid());
        return basicReturns.getOk();
    }

删除绑定


    /**
     * @description: 删除绑定
     **/
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();
        queueUnBindArguments.setExchangeName(exchangeName);
        queueUnBindArguments.setQueueName(queueName);
        queueUnBindArguments.setRid(generateRid());
        queueUnBindArguments.setChannelId(channelId);
        byte[] payload = BinaryUtils.toBytes(queueUnBindArguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());
        return basicReturns.getOk();
    }

发送消息


    /**
     * @description: 发送消息
     **/
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {
        BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
        basicPublishArguments.setBasicProperties(basicProperties);
        basicPublishArguments.setBody(body);
        basicPublishArguments.setExchangeName(exchangeName);
        basicPublishArguments.setRoutingKey(routingKey);
        basicPublishArguments.setRid(generateRid());
        basicPublishArguments.setChannelId(channelId);
        byte[] payload = BinaryUtils.toBytes(basicPublishArguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
        return basicReturns.getOk();
    }

订阅消息


    /**
     * @description: 订阅消息
     **/
    public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {
        // 先设置回调, 一个channel 只能设置一个回调方法
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
        }
        this.consumer = consumer;

        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来标识
        basicConsumeArguments.setAutoAck(autoAck);
        basicConsumeArguments.setQueueName(queueName);
        byte[] payload = BinaryUtils.toBytes(basicConsumeArguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.getOk();
    }

确认消息


    /**
     * @description: 确认消息
     **/
    public boolean basicAck(String queueName,String messageId) throws IOException {
        BasicAckArguments basicAckArguments = new BasicAckArguments();
        basicAckArguments.setMessageId(messageId);
        basicAckArguments.setQueueName(queueName);
        basicAckArguments.setRid(generateRid());
        basicAckArguments.setChannelId(channelId);
        byte[] payload = BinaryUtils.toBytes(basicAckArguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
        return basicReturns.getOk();
    }

⼩结

上述发送请求的操作, 逻辑基本⼀致. 构造参数 + 构造请求 + 发送 + 等待结果

处理响应

  1. 创建扫描线程

创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据

注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.


    public Connection(String host, Integer port) throws IOException {
        socket = new Socket(host, port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);


        callbackPool = Executors.newFixedThreadPool(4);

        // 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据, 把这个响应数据再交给对应的 Channel 负责处理
        Thread t = new Thread(() -> {
            try {
                while (!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                // 连接正常断开的. 此时这个异常直接忽略.
                System.out.println("[Connection] 连接正常断开!");
            } catch (IOException | ClassNotFoundException | MqException e) {
                // System.out.println("[Connection] 连接异常断开!");
                log.error("连接异常断开! e: {}", e);
                e.printStackTrace();
            }
        });
        t.start();
    }
  1. 实现响应的分发

给 Connection 创建 dispatchResponse ⽅法

  • 针对服务器返回的控制响应和消息响应, 分别处理.
    • 如果是订阅数据, 则调⽤ channel 中的回调.
    • 如果是控制消息, 直接放到结果集合中.

    /**
     * @description: 使用这个方法来分别处理, 当前的响应是针对控制请求的响应, 还是服务器推送的消息
     **/
    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if (response.getType() == 0xc) {
            // 服务器推送来的消息数据
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryUtils.fromBytes(response.getPayload());
            // 根据 ChannelId 找到对应的 Channel 对象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if (channel == null) {
                throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + subScribeReturns.getChannelId());
            }
            // 执行该 channel 对象内部的回调
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch (MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            // 当前相应是针对刚才的控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryUtils.fromBytes(response.getPayload());

            // 根据 ChannelId 找到对应的 Channel 对象
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if (channel == null) {
                throw new MqException("该消息对应的 Channel 在客户端中不存在, channelId: " + basicReturns.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }
  1. 实现 channel.putReturns
    /**
     * @description: 存入 basicReturns
     **/
    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            // 当前也不知道有多少个线程在等待上述的这个响应
            // 把所有的等待的线程都唤醒
            notifyAll();
        }
    }

关闭 Connection

    public void close() {
        // 关闭 Connection, 释放相关资源
        try {
            callbackPool.shutdownNow();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (Exception e) {
            log.error("关闭资源出现异常");
            e.printStackTrace();
        }
    }

测试代码

package en.edu.zxj.mq.mqclient;

import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.BrokerServer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * Description:
 *
 * @author: zxj
 * @date: 2024-03-07
 * @time: 10:55:32
 */
@SpringBootTest
class MqClientTest {
    private ConnectionFactory factory = null;
    private Thread t = null;

    private BrokerServer brokerServer = null;

    @BeforeEach
    public void setUp() throws IOException {
        // 1. 先启动服务器
        MqApplication.context = SpringApplication.run(MqApplication.class);
        brokerServer = new BrokerServer(9090);
        t = new Thread(() -> {
            brokerServer.start();
        });
        t.start();

        // 2. 配置 ConnectionFactory
        factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
    }

    @AfterEach
    public void tearDown() throws IOException, InterruptedException {
        // 停止服务器
        brokerServer.stop();
        MqApplication.context.close();
        t.join();
        // 删除必要的文件
        File file = new File("./data");
        FileUtils.deleteDirectory(file);

        factory = null;
    }


    @Test
    public void testConnection() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
    }

    @Test
    public void testChannel() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);
    }


    @Test
    public void testExchange() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);

        ok = channel.exchangeDelete("testExchangeName");
        Assertions.assertTrue(ok);

        // 该关闭的关闭
        channel.close();
        connection.close();
    }

    @Test
    public void testQueue() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue",true,false,false,null);
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        // 该关闭的关闭
        channel.close();
        connection.close();
    }

    @Test
    public void testBinding() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue",true,false,false,null);
        Assertions.assertTrue(ok);

        ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);

        ok = channel.queueBind("testQueue","testExchangeName","testBindingKey");
        Assertions.assertTrue(ok);

        ok = channel.queueUnbind("testQueue","testExchangeName");
        Assertions.assertTrue(ok);

        ok = channel.exchangeDelete("testExchangeName");
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        // 该关闭的关闭
        channel.close();
        connection.close();
    }

    @Test
    public void testMessage() throws IOException, MqException, InterruptedException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue",true,false,false,null);
        Assertions.assertTrue(ok);
        ok = channel.exchangeDeclare("testExchangeName", ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);

        byte[] requestBody = "hello".getBytes();
        ok = channel.basicPublish("testExchangeName","testQueue",null,requestBody);
        Assertions.assertTrue(ok);

        ok = channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                Assertions.assertArrayEquals(requestBody,body);
                System.out.println("[消费数据] 结束!");
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);


        ok = channel.exchangeDelete("testExchangeName");
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        // 该关闭的关闭
        channel.close();
        connection.close();
    }


}

项目结果

演示
首先启动 BrokerServer 类

@SpringBootApplication
public class MqApplication {
    public static ConfigurableApplicationContext context = null;

    public static void main(String[] args) throws IOException {
        context = SpringApplication.run(MqApplication.class, args);

        BrokerServer brokerServer = new BrokerServer(9090);
        brokerServer.start();
    }

}

在这里插入图片描述

接着分别启动消费者和生产者客户端, 不分先后启动顺序

在这里插入图片描述
在这里插入图片描述
此时消费者就会收到消息并进行处理

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/461725.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言数据结构基础笔记——树、二叉树简介

1.树 树是一种 非线性 的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09;个有限结点组成一个具有层次关系的集合。 把它叫做树是因 为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 &#xff08;图片来源于网络&#xff09;…

计算机考研|王道四本书够吗?

如果你是跨考生&#xff0c;王道的四本书只能覆盖你需要的80% 如果你是计算机专业的考生&#xff0c;王道四本书可以覆盖你需要的90% 我已经说的很明显了&#xff0c;王道的内容覆盖不了408考研的全部大纲&#xff0c;有的知识点虽然在王道书上提到了&#xff0c;但是因为不是…

拿捏指针(二)

个人主页&#xff1a;秋邱博客 所属栏目&#xff1a;C语言 &#xff08;感谢您的光临&#xff0c;您的光临蓬荜生辉&#xff09; 目录 前言 数组与指针 数组名的理解 指针数组与数组指针 指针数组 数组指针 数组传参 一维数组传参的本质 二维数组传参的本质 二维数组…

【数据结构与算法】:选择排序与快速排序

&#x1f525;个人主页&#xff1a; Quitecoder &#x1f525;专栏&#xff1a;数据结构与算法 我的博客即将同步至腾讯云开发者社区&#xff0c;邀请大家一同入驻&#xff1a;腾讯云 欢迎来到排序的第二个部分&#xff1a;选择排序与快速排序&#xff01; 目录 1.选择排序1.…

【网站项目】325企业OA管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

SpringBoot启动后出现Please sign in页面

1. 问题 项目启动后&#xff0c;出现莫名其妙的页面&#xff0c;如下 2. 原因 当您启动 Spring Web 应用程序后出现 “Please sign in” 页面时&#xff0c;这通常是由于引用依赖Spring Security默认的身份验证方式导致的。 <dependency><groupId>org.springfr…

Element 选择季度组件

<template><el-dialogtitle"选择季度":show-close"false":close-on-click-modal"false":close-on-press-escape"false":visible"visiable"class"dialog list"append-to-body><div><div>&…

如何本地搭建hMailServer邮件服务

文章目录 前言1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 前言 hMailServer 是一个邮件服务器,通过它我们可以搭建自己的邮件服务,通过cpola…

51单片机基础篇系列-定时/计数器的控制工作方式

&#x1f308;个人主页&#xff1a;会编程的果子君 &#x1f4ab;个人格言:“成为自己未来的主人~” 定时/计数器的控制 80C51单片机定时/计数器的工作由两个特殊功能寄存器控制&#xff0c;TMOD用于设置其工作方式&#xff1a; 1.工作方式寄存器TMOD 工作方式寄存器TMO…

C++的类和对象(七):友元、内部类

目录 友元 友元函数 友元类 内部类 匿名对象 拷贝对象时的一些编译器优化 再次理解类和对象 友元 基本概念&#xff1a;友元提供了一种突破封装的方式&#xff0c;有时提供了便利&#xff0c;但是友元会增加耦合度&#xff0c;破坏了封装&#xff0c;所以友元不宜多用&…

AXI CANFD MicroBlaze 测试笔记

文章目录 前言测试用的硬件连接Vivado 配置Vitis MicroBlaze CANFD 代码测试代码测试截图Github Link 前言 官网: CAN with Flexible Data Rate (CAN FD) (xilinx.com) 特征: 支持8Mb/s的CANFD多达 3 个数据位发送器延迟补偿(TDC, transmitter delay compensation)32-deep T…

Jenkins 面试题及答案整理,最新面试题

Jenkins中如何实现持续集成与持续部署&#xff1f; Jenkins通过自动化构建、测试和部署应用程序来实现持续集成与持续部署&#xff08;CI/CD&#xff09;。这个过程包括以下步骤&#xff1a; 1、源代码管理&#xff1a; Jenkins支持与多种版本控制系统集成&#xff0c;如Git、…

数据结构 之 优先级队列(堆) (PriorityQueue)

&#x1f389;欢迎大家观看AUGENSTERN_dc的文章(o゜▽゜)o☆✨✨ &#x1f389;感谢各位读者在百忙之中抽出时间来垂阅我的文章&#xff0c;我会尽我所能向的大家分享我的知识和经验&#x1f4d6; &#x1f389;希望我们在一篇篇的文章中能够共同进步&#xff01;&#xff01;&…

oops-framework框架 之 启动流程(三)

引擎&#xff1a; CocosCreator 3.8.0 环境&#xff1a; Mac Gitee: oops-game-kit 回顾 上篇博客中我们通过 oops-game-kit 模版构建了基础的项目&#xff0c;另外讲解了下assets目录结构和游戏配置文件的基本使用相关&#xff0c;详情内容可参考&#xff1a; oops-framewo…

mysql5.7离线安装 windows

windows上离线安装mysql5.7 下载安装包 去官网下载对应版本的mysql官网 点击archives,接着选择自己要下载的版本&#xff0c;选择windows系统&#xff0c;并根据自己电脑的位数选择相应的版本【找到“此电脑”&#xff0c;鼠标右击&#xff0c;出来下拉框&#xff0c;选择“属性…

频率响应概述与波特图

频率响应的定义 在放大电路中&#xff0c;存在电抗元件&#xff08;如电容、电感&#xff09;、半导体管&#xff08;存在极间电容&#xff09;。由于电抗元件和极间电容的存在&#xff0c;当输入信号频率过高或过低时&#xff0c;不但放大倍数的数值会减小&#xff0c;而且将…

Python 3.x 快速安装 pip 包管理工具

目录 ℹ️ 1. 查看是否安装 pip1.1 方法一1.2 方法二 &#x1f6e0;️ 2. 安装方法2.1 通过 ensurepip 进行安装2.2 通过 get-pip.py 进行安装 参考文档&#xff1a; pip 官方安装文档&#xff1a;https://pip.pypa.io/en/stable/installation/ ℹ️ 1. 查看是否安装 pip 【…

最详细数据仓库项目实现:从0到1的电商数仓建设(数仓部分)

1、数仓 数据仓库是一个为数据分析而设计的企业级数据管理系统&#xff0c;它是一个系统&#xff0c;不是一个框架。可以独立运行的&#xff0c;不需要你参与&#xff0c;只要运行起来就可以自己运行。 数据仓库不是为了存储&#xff08;但是能存&#xff09;&#xff0c;而是…

hcia复习总结7

1&#xff0c;AR2发送2.0网段的信息给AR1&#xff0c;如果&#xff0c;AR1本身并不存在该网段的路由 信息&#xff0c;则将直接 刷新 到本地的路由表中。 Destination/Mask Proto Pre Cost Flags NextHop Interface 2.2.2.0/24 RIP 100…

Codeql复现CVE-2018-11776学习笔记

基本使用 1、首先下载struts2漏洞版本源码&#xff1a; https://codeload.github.com/apache/struts/zip/refs/tags/STRUTS_2_3_20 2、构建codeql数据库&#xff08;构建失败文末有解决办法&#xff09;&#xff1a; codeql database create ~/CodeQL/databases/struts2-2.3.…