Java项目——设计一个消息队列
- 四. 项⽬创建
- 五. 创建核⼼类
- 创建 Exchange(名字、类型、持久化)
- 创建 MSGQueue(名字、持久化、独占标识)
- 创建 Binding(交换机名字、队列名字、bindingKey用于与routingKey匹配)
- 创建 Message(序列化、消息属性、消息体、起始位置和结束位置、有效、工厂方法)
- 六. 数据库设计
- 配置 sqlite
- 实现创建表
- 实现数据库基本操作
- 实现 DataBaseManager
- 测试 DataBaseManager
四. 项⽬创建
创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 8.
依赖引⼊ Spring Web 和 MyBatis
五. 创建核⼼类
创建包 mqserver.core
创建 Exchange(名字、类型、持久化)
package com.example.mq.mqserver.core;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
/*
* 这个类表示一个交换机
*/
public class Exchange {
// 此处使用 name 来作为交换机的身份标识. (唯一的)
private String name;
// 交换机类型, DIRECT, FANOUT, TOPIC
private ExchangeType type = ExchangeType.DIRECT;
// 该交换机是否要持久化存储. true 表示需要持久化; false 表示不必持久化.
private boolean durable = false;
// 如果当前交换机, 没人使用了, 就会自动被删除.
// 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
private boolean autoDelete = false;
// arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的)
// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
private Map<String, Object> arguments = new HashMap<>();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ExchangeType getType() {
return type;
}
public void setType(ExchangeType type) {
this.type = type;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
// 这里的 get set 用于和数据库交互使用.
public String getArguments() {
// 是把当前的 arguments 参数, 从 Map 转成 String (JSON)
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 如果代码真异常了, 返回一个空的 json 字符串就 ok
return "{}";
}
// 这个方法, 是从数据库读数据之后, 构造 Exchange 对象, 会自动调用到
public void setArguments(String argumentsJson) {
// 把参数中的 argumentsJson 按照 JSON 格式解析, 转成
// 上述的 Map 对象
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
// 在这里针对 arguments, 再提供一组 getter setter , 用来去更方便的获取/设置这里的键值对.
// 这一组在 java 代码内部使用 (比如测试的时候)
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
创建 MSGQueue(名字、持久化、独占标识)
package com.example.mq.mqserver.core;
import com.example.mq.common.ConsumerEnv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.aop.AopAutoConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/*
* 这个类表示一个存储消息的队列
* MSG => Message
*/
public class MSGQueue {
// 表示队列的身份标识.
private String name;
// 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.
private boolean durable = false;
// 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
// 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.
private boolean exclusive = false;
// 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
// 这个 自动删除 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.
private boolean autoDelete = false;
// 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现
private Map<String, Object> arguments = new HashMap<>();
// 当前队列都有哪些消费者订阅了.
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
// 记录当前取到了第几个消费者. 方便实现轮询策略.
private AtomicInteger consumerSeq = new AtomicInteger(0);
// 添加一个新的订阅者
public void addConsumerEnv(ConsumerEnv consumerEnv) {
consumerEnvList.add(consumerEnv);
}
// 订阅者的删除暂时先不考虑.
// 挑选一个订阅者, 用来处理当前的消息. (按照轮询的方式)
public ConsumerEnv chooseConsumer() {
if (consumerEnvList.size() == 0) {
// 该队列没有人订阅的
return null;
}
// 计算一下当前要取的元素的下标.
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();
return consumerEnvList.get(index);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public boolean isExclusive() {
return exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean isAutoDelete() {
return autoDelete;
}
public void setAutoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
}
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "{}";
}
public void setArguments(String argumentsJson) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key, Object value) {
arguments.put(key, value);
}
public void setArguments(Map<String, Object> arguments) {
this.arguments = arguments;
}
}
创建 Binding(交换机名字、队列名字、bindingKey用于与routingKey匹配)
package com.example.mq.mqserver.core;
/*
* 表示队列和交换机之间的关联关系
*/
public class Binding {
private String exchangeName;
private String queueName;
// bindingKey 就是在出题, 要求领红包的人要画个 "桌子" 出来~~
private String bindingKey;
// Binding 这个东西, 依附于 Exchange 和 Queue 的!!!
// 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,
// 此时你针对 Binding 持久化是没有任何意义的
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
创建 Message(序列化、消息属性、消息体、起始位置和结束位置、有效、工厂方法)
package com.example.mq.mqserver.core;
import java.io.Serializable;
import java.util.Arrays;
import java.util.UUID;
/*
* 表示一个要传递的消息
* 注意!!! 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.
* 此时就需要针对 Message 进行序列化和反序列化.
* 此处使用 标准库 自带的 序列化/反序列化 操作.
*/
public class Message implements Serializable {
// 这两个属性是 Message 最核心的部分.
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
// 下面的属性则是辅助用的属性.
// Message 后续会存储到文件中(如果持久化的话).
// 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?
// 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd)
// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.
// 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.
private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置偏移(字节)
// 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式)
// 0x1 表示有效. 0x0 表示无效.
private byte isValid = 0x1;
// 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.
// 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
// 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
// 此处只是在内存中创建一个 Message 对象.
return message;
}
public String getMessageId() {
return basicProperties.getMessageId();
}
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
@Override
public String toString() {
return "Message{" +
"basicProperties=" + basicProperties +
", body=" + Arrays.toString(body) +
", offsetBeg=" + offsetBeg +
", offsetEnd=" + offsetEnd +
", isValid=" + isValid +
'}';
}
}
六. 数据库设计
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即
可直接使⽤, 不必安装其他的软件
配置 sqlite
引⼊ pom.xml 依赖
配置数据源 application.yml
Username 和 password 空着即可.
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.
实现创建表
@Mapper
public interface MetaMapper {
// 提供三个核心建表方法
void createExchangeTable();
void createQueueTable();
void createBindingTable();
}
本⾝ MyBatis 针对 MySQL / Oracle ⽀持执⾏多个 SQL 语句的, 但是针对 SQLite 是不⽀持的, 只能写
成多个⽅法
<update id="createExchangeTable">
create table if not exists exchange (
name varchar(50) primary key,
type int,
durable boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update>
<update id="createQueueTable">
create table if not exists queue (
name varchar(50) primary key,
durable boolean,
exclusive boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update>
<update id="createBindingTable">
create table if not exists binding (
exchangeName varchar(50),
queueName varchar(50),
bindingKey varchar(256)
);
</update>
实现数据库基本操作
// 针对上述三个基本概念, 进行 插入 和 删除
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(String exchangeName);
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(String queueName);
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});
</insert>
<select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
</select>
<delete id="deleteExchange" parameterType="java.lang.String">
delete from exchange where name = #{exchangeName};
</delete>
<insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});
</insert>
<select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
select * from queue;
</select>
<delete id="deleteQueue" parameterType="java.lang.String">
delete from queue where name = #{queueName};
</delete>
<insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
</insert>
<select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
</select>
<delete id="deleteBinding" parameterType="com.example.mq.mqserver.core.Binding">
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
</delete>
实现 DataBaseManager
package com.example.mq.mqserver.datacenter;
import com.example.mq.MqApplication;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.mapper.MetaMapper;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
import java.util.List;
/*
* 通过这个类, 来整合上述的数据库操作.
*/
public class DataBaseManager {
// 要做的是从 Spring 中拿到现成的对象
private MetaMapper metaMapper;
// 针对数据库进行初始化
public void init() {
// 手动的获取到 MetaMapper
metaMapper = MqApplication.context.getBean(MetaMapper.class);
if (!checkDBExists()) {
// 数据库不存在, 就进行建建库表操作
// 先创建一个 data 目录
File dataDir = new File("./data");
dataDir.mkdirs();
// 创建数据表
createTable();
// 插入默认数据
createDefaultData();
System.out.println("[DataBaseManager] 数据库初始化完成!");
} else {
// 数据库已经存在了, 啥都不必做即可
System.out.println("[DataBaseManager] 数据库已经存在!");
}
}
public void deleteDB() {
File file = new File("./data/meta.db");
boolean ret = file.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库文件成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库文件失败!");
}
File dataDir = new File("./data");
// 使用 delete 删除目录的时候, 需要保证目录是空的.
ret = dataDir.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库目录成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库目录失败!");
}
}
private boolean checkDBExists() {
File file = new File("./data/meta.db");
if (file.exists()) {
return true;
}
return false;
}
// 这个方法用来建表.
// 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)
// 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)
private void createTable() {
metaMapper.createExchangeTable();
metaMapper.createQueueTable();
metaMapper.createBindingTable();
System.out.println("[DataBaseManager] 创建表完成!");
}
// 给数据库表中, 添加默认的数据.
// 此处主要是添加一个默认的交换机.
// RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT.
private void createDefaultData() {
// 构造一个默认的交换机.
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
System.out.println("[DataBaseManager] 创建初始数据完成!");
}
// 把其他的数据库的操作, 也在这个类中封装一下.
public void insertExchange(Exchange exchange) {
metaMapper.insertExchange(exchange);
}
public List<Exchange> selectAllExchanges() {
return metaMapper.selectAllExchanges();
}
public void deleteExchange(String exchangeName) {
metaMapper.deleteExchange(exchangeName);
}
public void insertQueue(MSGQueue queue) {
metaMapper.insertQueue(queue);
}
public List<MSGQueue> selectAllQueues() {
return metaMapper.selectAllQueues();
}
public void deleteQueue(String queueName) {
metaMapper.deleteQueue(queueName);
}
public void insertBinding(Binding binding) {
metaMapper.insertBinding(binding);
}
public List<Binding> selectAllBindings() {
return metaMapper.selectAllBindings();
}
public void deleteBinding(Binding binding) {
metaMapper.deleteBinding(binding);
}
}
测试 DataBaseManager
package com.example.mq;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.datacenter.DataBaseManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
// 加上这个注解之后, 改类就会被识别为单元测试类.
@SpringBootTest
public class DataBaseManagerTests {
private DataBaseManager dataBaseManager = new DataBaseManager();
// 接下来下面这里需要编写多个 方法 . 每个方法都是一个/一组单元测试用例.
// 还需要做一个准备工作. 需要写两个方法, 分别用于进行 "准备工作" 和 "收尾工作"
// 使用这个方法, 来执行准备工作. 每个用例执行前, 都要调用这个方法.
@BeforeEach
public void setUp() {
// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的.
// 所以就需要先把 context 对象给搞出来.
MqApplication.context = SpringApplication.run(MqApplication.class);
dataBaseManager.init();
}
// 使用这个方法, 来执行收尾工作. 每个用例执行后, 都要调用这个方法.
@AfterEach
public void tearDown() {
// 这里要进行的操作, 就是把数据库给清空~~ (把数据库文件, meta.db 直接删了就行了)
// 注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!!
// 此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件.
// 如果 meta.db 被别人打开了, 此时的删除文件操作是不会成功的 (Windows 系统的限制, Linux 则没这个问题).
// 另一方面, 获取 context 操作, 会占用 8080 端口. 此处的 close 也是释放 8080.
MqApplication.context.close();
dataBaseManager.deleteDB();
}
@Test
public void testInitTable() {
// 由于 init 方法, 已经在上面 setUp 中调用过了. 直接在测试用例代码中, 检查当前的数据库状态即可.
// 直接从数据库中查询. 看数据是否符合预期.
// 查交换机表, 里面应该有一个数据(匿名的 exchange); 查队列表, 没有数据; 查绑定表, 没有数据.
List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
List<MSGQueue> queueList = dataBaseManager.selectAllQueues();
List<Binding> bindingList = dataBaseManager.selectAllBindings();
// 直接打印结果, 通过肉眼来检查结果, 固然也可以. 但是不优雅, 不方便.
// 更好的办法是使用断言.
// System.out.println(exchangeList.size());
// assertEquals 判定结果是不是相等.
// 注意这俩参数的顺序. 虽然比较相等, 谁在前谁在后, 无所谓.
// 但是 assertEquals 的形参, 第一个形参叫做 expected (预期的), 第二个形参叫做 actual (实际的)
Assertions.assertEquals(1, exchangeList.size());
Assertions.assertEquals("", exchangeList.get(0).getName());
Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());
Assertions.assertEquals(0, queueList.size());
Assertions.assertEquals(0, bindingList.size());
}
private Exchange createTestExchange(String exchangeName) {
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.FANOUT);
exchange.setAutoDelete(false);
exchange.setDurable(true);
exchange.setArguments("aaa", 1);
exchange.setArguments("bbb", 2);
return exchange;
}
@Test
public void testInsertExchange() {
// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.
Exchange exchange = createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);
// 插入完毕之后, 查询结果
List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(2, exchangeList.size());
Exchange newExchange = exchangeList.get(1);
Assertions.assertEquals("testExchange", newExchange.getName());
Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
Assertions.assertEquals(false, newExchange.isAutoDelete());
Assertions.assertEquals(true, newExchange.isDurable());
Assertions.assertEquals(1, newExchange.getArguments("aaa"));
Assertions.assertEquals(2, newExchange.getArguments("bbb"));
}
@Test
public void testDeleteExchange() {
// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!
Exchange exchange = createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);
List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(2, exchangeList.size());
Assertions.assertEquals("testExchange", exchangeList.get(1).getName());
// 进行删除操作
dataBaseManager.deleteExchange("testExchange");
// 再次查询
exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(1, exchangeList.size());
Assertions.assertEquals("", exchangeList.get(0).getName());
}
private MSGQueue createTestQueue(String queueName) {
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setAutoDelete(false);
queue.setExclusive(false);
queue.setArguments("aaa", 1);
queue.setArguments("bbb", 2);
return queue;
}
@Test
public void testInsertQueue() {
MSGQueue queue = createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);
List<MSGQueue> queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(1, queueList.size());
MSGQueue newQueue = queueList.get(0);
Assertions.assertEquals("testQueue", newQueue.getName());
Assertions.assertEquals(true, newQueue.isDurable());
Assertions.assertEquals(false, newQueue.isAutoDelete());
Assertions.assertEquals(false, newQueue.isExclusive());
Assertions.assertEquals(1, newQueue.getArguments("aaa"));
Assertions.assertEquals(2, newQueue.getArguments("bbb"));
}
@Test
public void testDeleteQueue() {
MSGQueue queue = createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);
List<MSGQueue> queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(1, queueList.size());
// 进行删除
dataBaseManager.deleteQueue("testQueue");
queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(0, queueList.size());
}
private Binding createTestBinding(String exchangeName, String queueName) {
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey("testBindingKey");
return binding;
}
@Test
public void testInsertBinding() {
Binding binding = createTestBinding("testExchange", "testQueue");
dataBaseManager.insertBinding(binding);
List<Binding> bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(1, bindingList.size());
Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());
Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());
Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());
}
@Test
public void testDeleteBinding() {
Binding binding = createTestBinding("testExchange", "testQueue");
dataBaseManager.insertBinding(binding);
List<Binding> bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(1, bindingList.size());
// 删除
Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");
dataBaseManager.deleteBinding(toDeleteBinding);
bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(0, bindingList.size());
}
}