文章目录
- 一、实现 broker server 服务器
- 1.1 创建一个SpringBoot项目
- 1.2 创建Java类
- 二、硬盘持久化存储 broker server 里的数据
- 2.1 数据库存储
- 2.1.1 浅谈SQLite
- MyBatis
- 2.1.2 如何使用SQLite
- 2.2 文件存储
- 三、将broker server 里的数据存储在内存上
- 四、使用DataBaseManager类封装数据库操作
一、实现 broker server 服务器
上次我们提到生产者-消费者模型中,最重要的是实现broker server(服务器模块)。 为什么最重要的是实现 broker server 呢?broker server 作为一个通用服务器,它可以给任何具有数据存储、转发需求的客户端服务器使用。就像现在的数据库程序一样,不论开发者实现什么业务,只要该业务中涉及数据的持久化存储,就需要使用数据库。
broker server 中管理着许多重要的概念(虚拟主机、交换机、队列、消息、绑定),这些概念的数据组织是实现 broker server 的关键。我们 以 如何使用代码实现一个 broker server 为核心,对 broker server 的实现进行详细介绍。
1.1 创建一个SpringBoot项目
之前的博客有介绍过如何使用IDEA创建一个 SpringBoot 项目,可以点击此处链接浅谈如何创建一个SpringBoot项目进行学习。
创建好一个SpringBoot项目之后,我们在pom.xml文件中引入SQLite的依赖,
再在配置文件application.yaml中针对SQLite、MyBatis的使用进行配置。
1.2 创建Java类
我们为 broker server 的实现创建一个包mqServer(这只是一个包名,可以自己随意进行自定义):
再在mqServer包下创建一个core包,core包里存放交换机、队列、消息、绑定这些概念的实体类:
交换机类(Exchange):
/*
* 这个类表示一个交换机
*/
public class Exchange {
// 此处使用 name 来作为交换机的身份标识. (唯一的)
private String name;
// 交换机类型, DIRECT, FANOUT, TOPIC
private ExchangeType type = ExchangeType.DIRECT;
// 该交换机是否要持久化存储. true 表示需要持久化; false 表示不必持久化.
private boolean durable = false;
// 如果当前交换机, 没人使用了, 就会自动被删除.
// 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
private boolean autoDelete = false;
// arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的)
// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
private Map<String, Object> arguments = new HashMap<>();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ExchangeType getType() {
return type;
}
public void setType(ExchangeType type) {
this.type = type;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
// 这里的 get set 用于和数据库交互使用.
public String getArguments() {
// 是把当前的 arguments 参数, 从 Map 转成 String (JSON)
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 如果代码真异常了, 返回一个空的 json 字符串就 ok
return "{}";
}
// 这个方法, 是从数据库读数据之后, 构造 Exchange 对象, 会自动调用到
public void setArguments(String argumentsJson) {
// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成
// 上述的 Map 对象
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
// 在这里针对 arguments, 再提供一组 getter setter , 用来去更方便的获取/设置这里的键值对.
// 这一组在 java 代码内部使用 (比如测试的时候)
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
交换机类型 类(ExchangeType):
这个类是个枚举类,作为Exchange类的成员变量。
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
队列类(MSGQueue):
/*
* 这个类表示一个存储消息的队列
* MSG => Message
*/
public class MSGQueue {
// 表示队列的身份标识.
private String name;
// 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.
private boolean durable = false;
// 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
// 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.
private boolean exclusive = false;
// 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
// 这个 自动删除 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.
private boolean autoDelete = false;
// 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现
private Map<String, Object> arguments = new HashMap<>();
/**
* 因为消息是存在队列里的,每个队列下面都有相关的两个消息文件,生产者生产出消息之后,将
* 消息投递给了broker server 服务器,消费者通过服务器订阅消息,消费者订阅到消息之后
* 消费者具体想要使用这些消息做些什么,我们不知道,这得由消费者自己决定,所以消费者使用回调函数
* Consumer之后,使用里面的抽象方法handleDelivery自己决定自己想要将此消息用来做些什么(重写)
*
*
* 因此在MSGQueue类里面,我们需要定义一个具有ConsumerEnv类型的List,
* 表示当前队列里的消息都有哪些消费者订阅了
*/
// 当前队列都有哪些消费者订阅了.
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
// 记录当前取到了第几个消费者. 方便实现轮询策略.
// 压根没懂这个 todo
private AtomicInteger consumerSeq = new AtomicInteger(0);
// 添加一个新的订阅者
public void addConsumerEnv(ConsumerEnv consumerEnv){
synchronized (this){
consumerEnvList.add(consumerEnv);
}
}
// 删除一个订阅者
// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)
public ConsumerEnv chooseConsumer(){
if(consumerEnvList.size() == 0){
// 当前队列没人订阅
return null;
}
// 压根没懂这个 todo
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();
return consumerEnvList.get(index);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "{}";
}
public void setArguments(String argumentsJson) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
绑定类(Binding):
/*
* 表示队列和交换机之间的关联关系
*/
public class Binding {
private String exchangeName;
private String queueName;
// bindingKey 就是在出题, 要求领红包的人要画个 "桌子" 出来~~
private String bindingKey;
// Binding 这个东西, 依附于 Exchange 和 Queue 的!!!
// 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,
// 此时你针对 Binding 持久化是没有任何意义的
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
消息类(Message):
/*
* 表示一个要传递的消息
*/
public class Message implements Serializable {
// 这两个属性是 Message 最核心的部分.
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
// 下面的属性则是辅助用的属性.
// Message 后续会存储到文件中(如果持久化的话).
// 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?
// 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd)
// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.
// 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.
private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置偏移(字节)
// 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式)
// 0x1 表示有效. 0x0 表示无效.
private byte isValid = 0x1;
// 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.
// 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
// 此处只是在内存中创建一个 Message 对象.
return message;
}
public String getMessageId() {
return basicProperties.getMessageId();
}
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
@Override
public String toString() {
return "Message{" +
"basicProperties=" + basicProperties +
", body=" + Arrays.toString(body) +
", offsetBeg=" + offsetBeg +
", offsetEnd=" + offsetEnd +
", isValid=" + isValid +
'}';
}
}
BasicProperties 类(是Message类的成员变量):
public class BasicProperties implements Serializable {
// 消息的唯一身份标识. 此处为了保证 id 的唯一性, 使用 UUID 来作为 message id
private String messageId;
// 是一个消息上带有的内容, 和 bindingKey 做匹配.
// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.
// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).
// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.
private String routingKey;
// 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样搞的....)
private int deliverMode = 1;
// 其实针对 RabbitMQ 来说, BasicProperties 里面还有很多别的属性. 其他的属性暂时先不考虑了.
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public int getDeliverMode() {
return deliverMode;
}
public void setDeliverMode(int deliverMode) {
this.deliverMode = deliverMode;
}
@Override
public String toString() {
return "BasicProperties{" +
"messageId='" + messageId + '\'' +
", routingKey='" + routingKey + '\'' +
", deliverMode=" + deliverMode +
'}';
}
}
二、硬盘持久化存储 broker server 里的数据
上述我们创建了 交换机、队列、绑定、消息 的实体Java类,这些概念的数据需要在内存、硬盘上各存一份。所以此时我们首先来讨论 交换机(Exchange)、队列(MSGQueue)、消息(Message)、绑定(Binding) 他们以何种方式存储在硬盘上。 至于他们在内存中以什么样的数据结构进行存储,我们后续再讨论。
2.1 数据库存储
对于交换机、队列、绑定来说,由于他们的数据量没有消息多,同时他们还需经常性进行 增删改查 操作,因此他们使用数据库进行数据的持久化存储。一般我们都是使用MySQL进行数据存储,但是由于MySQL是个客户端-服务器结构的程序,它本身具有一定的重量。在这里,我们为了简便,采用轻量的SQLite数据库进行数据存储。
2.1.1 浅谈SQLite
可能很多同学一开始学习使用数据库进行数据存储时,接触到的就是SQL Server、MySQL、Oracle这些数据库(包括我寄自),没怎么听说过SQLite。
但SQLite的应用也很广泛。
SQLite很轻量,它是一个可执行文件exe,主要在一些性能不高的设备上使用,尤其是在移动端(手机、APP)和嵌入式设备(投影仪、冰箱、交换机、路由器…),Android系统就内置了SQLite。虽然SQLite 轻量,但其具有的功能不输MySQL,其sql语句使用起来与MySQL基本无异,也支持通过MyBatis这样的框架来使用。(学习过安卓开发课程的同学肯定知道SQLite)。
SQLite 是一个本地数据库,把数据存储在当前硬盘的某个指定文件中,其无法跨网络使用,这个数据库相当于直接操作本地的硬盘文件,其将每个数据库database抽取成一个单独文件。
MyBatis
在介绍如何使用SQLite之前,先了解一下如何使用MyBatis吧。因为我发现有些同学还不太理解如何使用MyBatis操作数据库,当然了,掌握了MyBatis的同学可以直接跳过此处内容科普。
MyBatis是Spring框架集成的一个操作数据库的框架,以前我们想要在项目中连接数据库来进行数据的存储,使用的是JDBC,但是使用JDBC将后端代码与数据库连接时,发现JDBC其代码量较大,代码重复率高,同时sql语句以及Java代码都杂揉在了一起,代码显得不太优雅。因此现在Java项目中主流使用MyBatis操作数据库。
使用MyBatis操作数据库的流程:
1、一般是在项目中创建一个名叫mapper的包,在该包下创建interface(interface的数量由自己项目的业务代码决定),描述有哪些方法要给Java代码使用。
2、创建对应的xml文件,通过xml文件来实现上述interface中的抽象方法。
如果觉得MyBatis这里讲得比较抽象,不理解,我再找个时间专门出一篇关于MyBatis详解的博客。
2.1.2 如何使用SQLite
1、在Java中使用SQLite,无需安装以及下载任何东西。直接使用Maven,将SQLite的依赖引进pom.xml文件中即可,此时Maven会自动从中央仓库加载SQLite的jar包和dll(动态库)。
SQLite的依赖:
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>
2、我们还需要在项目的配置文件application.yaml中,针对SQLite、MyBatis进行配置:
此时,我们就可以在项目中使用数据库SQLite进行数据持久化存储了。
在将 交换机、队列、绑定 持久化存储在数据库时,有一个需要注意的点:交换机类、队列类,他们都有一个Map类型的成员变量arguments。
由于数据库存储数据时,不支持Map类型的数据,因此当我们需要将Map类型的arguments变量在数据库中存储时,Map类型的arguments在数据库中就需要使用字符串类型表示。
我们知道MyBatis操作数据库进行写数据操作时,是调用对象属性对应的getter方法,将从getter方法获取到的属性的值写入数据库中;数据库进行读数据操作时,是调用对象的对应属性的setter方法,将从数据库获取到的值赋值给对应属性。
那么针对arguments这个变量,其数据类型在对象中是Map类型,但在数据库中其数据类型是字符串,那么如果数据库想要进行写操作时,就需要通过arguments变量的getter方法获取到一个String(字符串)类型的arguments,此时获取到的arguments类型才会与数据库中的字符串类型arguments相匹配,才能正确的进行数据库的写操作。但是由于在对象中的arguments它的数据类型是Map类型,此时对象中提供的getArguments()方法,其返回值类型是Map类型,当数据库调用这个getter方法时,数据库获取到的是一个Map类型的arguments,由于arguments在数据库中的存储类型是字符串,因此此时数据类型不匹配,获取到的Map类型的arguments无法存入数据库,就会出错。
所以我们需要在交换机类、队列类中里再提供一个返回值类型是字符串(String)的getArguments()方法,在该getter方法内部,将对象中是Map类型的arguments转化成字符串类型的arguments,此时数据库进行写数据时,调用getter方法将获取到的argumrnts是字符串类型的了,此时就可以将此字符串类型的argumrnts数据写入数据库中了,不会出错。
如果数据库想要进行读操作,就需要通过arguments变量的setter方法将从数据库中获取到的字符串类型的arguments设置给对象中的是Map类型的arguments,此时由于从数据库中获取到的arguments值是字符串类型的,而对象中的arguments是Map类型的,因此此时数据类型不匹配,无法设置,会出错。
所以我们需要在 交换机类、队列类中提供再提供一个参数是字符串类型的arguments,该setter方法内部使用JSON将字符串类型的arguments转化成一个Map类型的arguments,当数据库进行读操作时,就会调用这个setArguments(String arguments) 方法,就从数据库中获取到的字符串类型的arguments通过setter方法将其转换成Map类型的arguments,此时就不会出错了。
2.2 文件存储
由于咱们的消息数量庞大,且不需要频繁使用数据库进行增删改查,因此此时我们使用文件存储消息。一个文件是需要存储多条消息的,不是一个文件只存储一条消息哈!
由于一个文件可以存储多条消息,因此当我们想要从一个文件中获取某条消息时,就需要确定某条消息的位置,这会比较困难。因此我们在消息类(Message)中,定义了两个成员变量,分别是 offsetBeg、offsetEnd。offsetBeg 表示此条消息的头部到文件头部的距离,offsetEnd 表示此条消息的尾部到文件头部的距离。
三、将broker server 里的数据存储在内存上
这里先不讲。
四、使用DataBaseManager类封装数据库操作
在我们实现 broker server 的代码中,进行数据库操作的类主要有 mqserver 下的mapper包下的MetaMapper接口,此时我们在mqserver包下新创建一个datacenter的包,datacenter包下创建一个DataBaseManager类,在该类中对数据库操作进行整合和调用。
该类中会定义一个初始化方法 init(),使用该方法进行数据库初始化:在 broker server 启动时进行逻辑判定,如果数据库已经存在,数据库表也已经存在,此时我们就不创建数据库了,只打印一个日志提醒数据库已经存在;如果数据库不存在,此时我们就创建数据库、创建表、插入一些默认的数据,同时打印日志提醒数据库初始化完成!
由于DataBaseManager类是进行数据库的封装,MataMapper中含有一系列数据库操作的方法,因此DataBaseManager类中需要引入MetaMapper实例。
DataBaseManager类中除了 metaMapper实例,init()初始化数据库方法外,还需要封装MetaMapper接口中的一系列操作数据库的方法,同时还有一个删除数据库的方法。
封装数据库操作这里还有一个需要注意的点:
**有没有发现引入DataaBaseManager类中的metaMapper实例是空引用?**我们从Spring框架中拿到Bean,一般都是使用注解@Autowired,但是此时我们并没有使用注解将MetaMapper的Bean拿到,所以此时写在DataBaseManager类中的metaMapper是个空对象。
那么有同学可能觉得,那就加个注解不就行了。但是只在MetaMapper对象上加还不行,外面使用它的类DataBaseManager也需要加上五大注解注册到Spring中,此时metaMapper对象才有效。
但是我们这里由于业务逻辑,打算自己管理这个类,因此并不打算将DataBaseManager这个类交给Spring管理。因此此时我们需要手动将MetaMapper类的Bean构造出来:
1、找到项目的入口启动类,在里面添加一个静态成员变量。
入口启动类:打开项目的src目录,点击main目录之后出现的XXApplication(是一个蓝色的带着C的图标),每个项目由于项目命名不同,其入口启动类都有差别,以自己的为准。
2、点击入口启动类,在类里面添加一个静态的成员变量context,同时在main方法中使用静态成员context接收run方法的返回值:
3、回到DataBaseManager类里的数据库初始化方法init(),对metaMapper对象进行手动获取:
metaMapper = MqApplication.context.getBean(MetaMapper.class);
此时metaMapper的Bean就获取到了,不再是空对象。