目录
引言
创建 Spring Boot 项目
编写 Exchange 实体类
编写 Queue 实体类
编写 Binding 实体类
编写 Message 实体类
引言
- 上图为模块设计图
- 此处实现核心类为了简便,我们引用 Lombok(可点击下方链接了解 Lombok 的使用)
IDEA 配置 Lombok
创建 Spring Boot 项目
1、创建一个 Spring Boot 项目 并 创建相应的目录结构
注意:
- 消息队列中存在下列比较核心的概念
- 交换机(exchange)
- 队列(queue)
- 绑定(binding)
- 消息(message)
- 上述这些均存在于 Broker Server 中,所以我们在 mqserver 目录中进行创建实体类
编写 Exchange 实体类
1、使用一个枚举类 枚举出三种交换机类型
public enum ExchangeType { DIRECT(0), FANOUT(1), TOPIC(2); private final int type; private ExchangeType(int type) { this.type = type; } private int getType() { return type; } }
2、编写 Exchange 实体类
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Data; import java.util.HashMap; import java.util.Map; /* * 这个类表示一个交换机 * */ @Data public class Exchange { // 此处使用 name 来作为交换机的身份标识 (唯一的) private String name; // 交换机类型,DIRECT,FANOUT,TOPIC private ExchangeType type = ExchangeType.DIRECT; // 该交换机是否要持久化存储 ture 表示需要持久化;false 表示不必持久化 private boolean durable = false; // 针对 交换机 队列,绑定,消息.... 内存中也需要存储(执行效率高),硬盘上也需要存储(持久化) // 有些交换机,队列,绑定等,是需要持久化存储,但是有些则不需要 // 用户使用的时候,就可以通过 开关(boolean 值)来决定是否真的需要持久化 // 如果当前交换机没人使用了,就会自动被删除(对于生产者而言) // 这个属性暂时先列在这里,后续的代码中并没有真的实现这个自动删除功能(RabbitMQ 是有的) private boolean autoDelete = false; // argument 表示的是创建交换机时指定的一些额外的参数选项,后续代码中并没有真的实现对应的功能,先列出来(RabbitMQ 也是有的) // 为了把这个 argument 给存到数据库中,就需要把 Map 转成 json 格式的字符串 private Map<String,Object> arguments = new HashMap<>(); // argument 可以称为 "参数" 或 "选项", 可以有,也可以没有,通过选项来开启不同的功能 }
编写 Queue 实体类
1、编写 Queue 实体类
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Data; import java.util.HashMap; import java.util.Map; /* * 这个类表示一个存储消息的队列 * MSG ——> Message * */ @Data public class MSGQueue { // 表示队列的身份标识 private String name; // 标识队列是否持久化,true 持久化保存,false 表示不持久化 private boolean durable = false; // 这个属性为 true,表示这个队列只能被一个消费者使用(别人用不了)如果为 false 则是大家都能使用 // 这个 独占 功能,也是先把字段列在这里,具体的独占功能暂时先不实现 private boolean exclusive = false; // 为 ture 表示没有人使用之后,就自动删除 false 则是不会自动删除(对于消费者而言) private boolean autoDelete = false; // 表示拓展参数,当前也是先列在这里,先暂时不实现 private Map<String,Object> arguments = new HashMap<>(); }
注意:
- 此处为了与标准库中的 Queue 类稍作区分,我们直接取名为 MSGQueue 类
编写 Binding 实体类
1、编写 Binding 实体类
import lombok.Data; /* * 表示队列和交换机之间的关联关系 * */ @Data public class Binding { private String exchangeName; private String queueName; // 这里的 bindingKey 就是在出题 private String bindingKey; // binding 这个东西,依附于 Exchange 和 Queue 的 // 比如,对于持久化来说,如果 Exchange 和 Queue 任何一个都没有持久化 // 此时你针对 Binding 持久化是没有任何意义的 }
编写 Message 实体类
- Message 主要包含三个部分
- 属性部分 BasicProperties
- 正文部分 byte[]
- 辅助属性 offsetBeg、offsetEnd 、isValid
1、编写 Message 实体类
import lombok.Data; import java.io.Serializable; import java.util.UUID; /* * 表示一个要传递的消息 * 注意!!此处的 Message 对象,是需要能够在网络上删除,并且也需要能写入到文件中 * 此时就需要针对 Message 进行序列化和反序列化 * 此处使用 标准库自带的 序列化/反序列化 操作 * */ @Data public class Message implements Serializable { // 这两个属性是 Message 最核心的部分 private BasicProperties basicProperties = new BasicProperties(); private byte[] body; // 下面的属性则是辅助用的属性 // Message 后续会持久化到文件中(如果持久化的话) // 一个文件中会存储很多的消息,如何找到某个消息,在文件中的具体位置呢? // 使用下面两个偏移量来进行表示 [offsetBeg,offsetEnd) // 这俩属性并不需要被序列化保存到文件中 此时消息一旦被写入文件之后,所在的位置就固定了,并不需要单独存储 // 这俩属性存在的目的,主要就是为了让内存中的 Message 对象,能够快速找到对应的硬盘上的 Message 的位置 private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节) private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置便宜(字节) // 使用这个属性表示该消息在文件中是否是有效消息(针对文件中的消息,如果删除,使用逻辑删除的方式) // 0x1 表示有效,0x0 表示无效 private byte isValid = 0x1; // 创建一个工厂方法,让工厂方法来帮我们封装一下创建 Message 对象的过程 // 这个方法中创建 Message 对象,会自动生成唯一的 MessageId // 万一 routingKey 和 basicProperties 里的 routingKey 冲突,以外面的为主 public static Message createMessageWithId(String routingKey, BasicProperties basicProperties,byte[] body) { Message message = new Message(); if (basicProperties != null) { message.setBasicProperties(basicProperties); } // 此处生成的 MessageId 以 M- 作为前缀 message.setMessageId("M-" + UUID.randomUUID()); message.setRoutingKey(routingKey); message.body = body; // 此处是把 body 和 basicProperties 先设置出来,这俩个是 Message 的核心内容 // 而 offsetBeg,offsetEnd,isValid,则是消息持久化的时候才会用到,在把消息写入文件之前再进行设定 // 此处只是在内存中创建一个 Message 对象 return message; } public String getMessageId() { return basicProperties.getMessageId(); } private void setMessageId(String messageId) { basicProperties.setMessageId(messageId); } public String getRoutingKey() { return basicProperties.getRoutingKey(); } public void setRoutingKey(String routingKey) { basicProperties.setRoutingKey(routingKey); } public int getDeliverMode() { return basicProperties.getDeliverMode(); } public void setDeliverMode(int deliverMode) { basicProperties.setDeliverMode(deliverMode); } }
注意点一:
- 此处我们将使用标准库自带的方式进行序列化/反序列化,而不使用 JSON 方式
- JSON 本质上是文本格式,即里面放的也是文本类型的数据
- 而 Message 中存储的是 二进制数据
- 所以我们直接让需要被序列化的类实现一个 Serializable 接口即可
注意点二:
- 以往实现一个接口,都是为了重写里面的某个/某些方法
- 但是 Serializable 接口无需重写任何方法!
注意点三:
- 如图所示,辅助属性 offsetBeg 和 offsetEnd 仅用于标识某一消息在文件中的具体位置
- 而不是将这俩属性,像如同 属性部分 和 正文部分 序列化保存到文件中!
- 所以我们给这两个成员变量加上 trasient 关键字,用于防止被序列化
注意点四:
- UUID 是编程中,一种用来生成唯一 id 的算法
注意点五:
- 数据库也好,硬盘文件也好,在删除数据时,很多时候均会使用到 逻辑删除
- 不是真删除了,而是标记成 "无效"
2、编写属性部分 BasicProperties
import lombok.Data; import java.io.Serializable; @Data public class BasicProperties implements Serializable { // 消息的唯一身份标识 此处为了保证 id 的唯一性,使用 UUID 来作为 messageId private String messageId; // 是一个消息上带有的内容,和 bindingKey 做匹配 // 如果当前的交换机类型是 DIRECT,此时 routingKey 就表示要转发的队列名 // 如果当前的交换机类型是 FANOUT,此时 routingKey 无意义(不使用) // 如果当前的交换机类型是 TOPIC,此时 routingKey 就要和 bindingKey 做匹配,符合要求才能转发给对应队列 private String routingKey; // 表示消息是否要持久化, 1 表示不持久化, 2 表示持久化 (RabbitMQ 就是这样做的) private int deliverMode = 1; // 其实针对 RabbitMQ 来说,BasicProperties 里面还有很多别的属性,其他的属性暂时先不考虑了 }
注意:
- Message 实体类实现了 Serializable 接口用于序列化
- BasicProperties 类作为 Message 的属性部分,也需实现 Serializable 接口用于序列化