项目代码
提要:此处的消息队列是仿照RabbitMQ实现(参数之类的),实现一些基本的操作:创建/销毁交互机(exchangeDeclare,exchangeDelete),队列(queueDeclare,queueDelete),绑定(queueBind,queueUnbind),发送消息(basicPublish),订阅消息(basicConsume),手动确认消息(basicAck),并且只实现了单机系统下的消息队列,下面是具体实现过程。
ps:这里的创建采用的是Declare而不用create,意义在如果没有存在就创建,存在的话就不管。
1.思路
消息队列服务器本体的两个主要核心类:VirtualHost(表示一个虚拟主机,该主机管理着当前主机上的交换机,队列,绑定和为上层api(BrokerServer)提供调用)和BrokerServer(用来接受消费者和生产者的请求,并且根据请求(这里采用自定义的应用层协议)的类型来进行处理请求,并返回响应)。
客户端这边主要的两个核心类是:Connection和Channel,由于客户端(包括消费者和生产者)要想和服务器进行可靠的通信就需要进行一些比较复杂的连接过程,如果每一次连接创建完成后,将请求传输过去,服务器这边接收到了请求,并且通过当前连接返回响应,然后就将连接断开,那么当有很多请求来临的时候,每一个请求都要建立一个连接,这样开销的话会很大,所以这里引入Channel(频道),一个Connection(每一个客户端只需要建立一个连接就可以完成后续的所有请求)中有多个Channel,每次发送一个请求都是通过新建一个Channel,此时不需要进行复杂的可靠连接操作(Connection已经完成这部分的内容),所以此时的效率提高,销毁连接时的开销也减少了。
存储数据的方式是采用内存+硬盘,硬盘上的数据存储是为了消息队列在重启或者掉电的时候,能够恢复到之间的数据状态,在进行上述消息队列功能的时候都是在进行操作内存,这样能降低IO的读取次数,从而提高效率。
2.具体实现
ps:由于是客户端和服务器之间是通过网络进行通信,传输的数据是二进制的,所以需要对类进行序列化和反序列化(也就是实现Serializable接口)
(1)数据在硬盘上存储:
①数据类型:
这里采用的数据库不是MySql而是一个更轻量的数据库SQLite,由于此处只需要进行简单的创建表,存储和查询所有结果的操作,并且Mybaits也是支持的,所以用SQLite的话更适合一点。
在这个消息队列里面可能(可以手动传入参数来判定是否要持久化存储)需要持久化存储的数据有Message(消息,没有使用SQLite存储,后面重点介绍),Exchange(交换机),MSGQueue(消息队列),Binding(绑定)
a.Exchange:
这里为啥会引入交换机?直接将信息发送给队列不就好了吗?
RabbitMQ里面就有这个概念,先将消息发送给交换机,然后通过根据交换机的类型将消息发送队列。交互机不是具体的功能的实现的地点只是进行消息的转发,就相当于公司的前台。
一个交换机可以绑定多个队列,但是一个队列只能绑定一个交换机,这里就涉及到交换机与队列之间的对应关系。同时不同的交换机类型所转发的队列也不同,在交换机这里涉及两个关键的变量:bindingKey(交换机和队列进行绑定的字符串)和routingKey(发送消息到交换机指定的字符串)
交换机这里有实现三种类型:
直接交换机(DIRECT):将routingKey作为队列名称进行转发消息,也就是转发消息给以routingKey为名字的队列,此时bindingKey对直接交换机来说没有作用
扇出交换机(FANOUT):与当前交换机绑定的所有队列都转发消息。
主题交换机(TOPIC):当前bindingKey和routingKey对应上才能进行转发。
交换机里面主要有五种参数:name(相当于每一个交换机的身份标识),type(交换机的类型,可以用一个枚举类来实现),durable(是否持久化存储),autoDelete(是否在不进行使用的时候进行删除),arguments(扩展项,是一个Map<String,Object>)
关于arguments在数据库中的存储:(后续相关类中也会采用该种形式)
在数据库中arguments是采用varchar(变长字符串)的类型存储的,所以在进行存储的要对Map进行转化,转化成json字符串进行存储,而Mybaits调用java对象进行数据存储的时候是会调用对应变量的get方法,所以此时需要借用ObjectMapper类来进行将Map到json字符串的转化。
Exchange类代码实现:
@Data
public class Exchange implements Serializable {
//使用name来作为交换机的唯一身份标识
private String name;
//交换机类型
private ExchangeType type = ExchangeType.DIRECT;
//是否要持久化存储,true表示需要,false表示不需要
private boolean durable = false;
//如果没有人用了,当前交换机是否要进行删除,这部分在后续代码中并没有实现(扩展内容)
private boolean autoDelete = false;
//额外扩展项
private Map<String,Object> arguments = new HashMap<>();
//使用下面类进行json字符串和java对象的转化
private ObjectMapper objectMapper = new ObjectMapper();
public String getArguments() {
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
//下面的get和set方法就是在进行Map对象和json字符串的转化
public void setArguments(String jsonArguments) {
try {
arguments = objectMapper.readValue(jsonArguments, new
TypeReference<Map<String,Object>>(){});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public Object getArguments(String key) {
return arguments.get(key);
}
}
ps:代码中使用 @Data注解是lombok下的一个工具类,会实现当前类中所有变量的get和set方法。
ExchangeType类的具体实现:
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private ExchangeType(int type) {
}
}
b.MSGQueue:
当交换机类型确定后,就根据类型的不同进行转发消息到对应队列中。当前这个类只是存储一个队列名,而消息的具体存储是在硬盘上(文件中)。队列中主要有这几个参数:name(队列名),durable(是否持久化存储),autoDelete(是否开启自动删除),exclusive(是否独占,这个参数与后续消费者消费消息有关系,如果为false,表示当前队列中消息所有队列中的消费者都可以消费,如果为true,表示当前队列中的消息只能有当前队列中的消费者能够消费),arguments(扩展项,也需要进行json字符串转化),Consumer(代表消费者,后续会介绍),ArrayList(用来存放当前队列中的所有消费者),ConsumerSeq(一个atomicInteger变量,在多线程环境下变量也能够进行原子的操作,方便进行轮询,这也是采用顺序表的原因)
MSGQueue类代码实现:(关于消费者相关的在后续会介绍)
@Data
public class MSGQueue implements Serializable {
//表示当前队列的唯一身份标识
private String name;
//是否持久化
private boolean durable;
//自动删除,队列没有人用是否删除
private boolean autoDelete;
//是否要独占,如果为true,表示当队列只能一个消费者使用,为false,表示多个消费者都可以使用
//这个功能也暂时不实现(RabbitMQ中有的)
private boolean exclusive;
//由于如果要存进数据库,所以这里要将Map类型进行转化成json字符串,然后存储进去
//Mybaits操作数据库,是调用java对象的getter方法,然后赋给对应字段
//表示扩展的一些参数,先列在这里,后续如果要用,可以随时添加(RabbitMQ中有的)
private Map<String,Object> arguments = new HashMap<>();
//当前队列中管理哪些消费者
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//当前取到了第几个消费者,方便进行轮策略(考虑到线程安全问题,可以直接使用atomic原子类)
private AtomicInteger consumerSeq = new AtomicInteger(0);
//使用下面类进行json字符串和java对象的转化
private ObjectMapper objectMapper = new ObjectMapper();
public String getArguments() {
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void setArguments(String jsonArguments) {
try {
objectMapper.readValue(jsonArguments, new TypeReference<Map<String,Object>>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
//添加消费者,是在消费者订阅消息时所添加的
public void addConsumerEnv(ConsumerEnv consumerEnv) {
consumerEnvList.add(consumerEnv);
}
//通过轮询的方式选出一个消费者进行消费消息
public ConsumerEnv chooseConsumerEnvExclusive() {
if(consumerEnvList.size() == 0) {
//当前队列没人订阅
return null;
}
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.incrementAndGet();
return consumerEnvList.get(index);
}
}
c.Binding:
表示当前交换机绑定了哪些队列,主要有三个参数:exchangeName,queueName,bindingKey
关于Binding是否进行持久化存储,关键看当前对应的交换机和队列是否进行了持久化存储,如果其中有一个没有进行持久化存储,那么此时对Binding进行持久化存储是没有意义的(所以在进行代码编写的时候对于创建绑定这部分关于是否要进行持久化存储需要进行一定的判断)
Binding类代码实现:
@Data
public class Binding {
//交换机名
private String exchangeName;
//队列名
private String queueName;
//绑定字符串
private String bindingKey;
}
d.Message:
用当前类来表示一个消息,主要有下列几个参数:BasicProperties(每一个消息都拥有的基本属性:messageId(可以通过UUID类来生成唯一的id),routingKey,deliveryMode(1表示不持久化,2表示持久化)),body(消息的本体,是一个byte数组),offsetBeg和offsetEnd(由于当前传输的数据是二进制数据,容易产生粘包问题,所以需要引入两个偏移量来表示,当前消息从哪到哪表示一个完整的消息),isValid(表示当前消息是否有效,这里消息的删除是采用的逻辑删除的方式,并不是真正的删除,只是后续在进行垃圾回收(后续会介绍)的时候,才会回收这些被逻辑删除的消息。0x1表示有效,0x0表示无效)
在该类中还提供了一个生成消息(createMessageWithId)的工厂方法(里面messageId就是通过UUID生成的),它是一个静态方法。
Message类具体实现:
@Data
public class Message implements Serializable {
//这两个属性是Message的核心属性
private BasicProperties basicProperties;
//这里使用byte类型的正文,是为了能够序列化和反序列化
private byte[] body;
//下面是辅助的属性
//Message后续也会持久化存储,所以需要找到对应的正文部分
//由于是使用字节来传输数据,所以需要知道起始偏移量和结束偏移量才能知道从哪到哪是一个完整的正文
//使用下列两个偏移量来进行表示:[offsetBeg,offsetEnd)
private long offsetBeg = 0;
private long offsetEnd = 0;
//使用这个属性表示该消息在文件中是否为有效信息(这里是使用逻辑删除的方式来进行删除消息)
//0x1表示存在,0x0表示不存在
private byte isValid = 0x1;
//这里创建一个工厂方法(不使用构造方法封装对象了),可以直接通过调用该方法,来直接获得已经封装好的message对象
//使用UUID生成唯一id
public static Message createMessageWithId(BasicProperties basicProperties,byte[] body) {
Message message = new Message();
if(basicProperties != null) {
//以T-作为前缀
basicProperties.setMessageId("T-" + UUID.randomUUID());
message.setBasicProperties(basicProperties);
}
if(body != null) {
message.setBody(body);
}
return message;
}
}
②具体存储形式:
Exchange,MSGQueue,Binding是放在SQLite数据库中,而Message是放在对应文件中的。这里有几个关键的类:DataBaseManage,MessageFileManage,DiskDataManage
a.DataBaseManage
SQLite数据库上的相关操作的封装。由于此次操作数据使用的是Mybaits,所以操作数据库上数据是放在MetaMapper(方法调用)和MetaMapper.xml(具体实现,且当前xml文件的位置需要再application.yml中进行配置)上的。
MetaMapper代码具体实现:
@Mapper
public interface MetaMapper {
//持久化存储:表的创建
void createTableExchange();
void createTableBinding();
void createTableQueue();
//针对上述三个表,进行插入和删除
void insertExchange(Exchange exchange);
void deleteExchange(String exchangeName);
List<Exchange> selectExchanges();
void insertBinding(Binding binding);
void deleteBinding(Binding binding);
List<Binding> selectBindings();
void insertQueue(MSGQueue queue);
void deleteQueue(String QueueName);
List<MSGQueue> selectQueues();
}
ps:需要引入Mybaits的依赖,并且在相关类上实现@Mapper注解。
MetaMapper.xml具体实现:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">
<update id="createTableExchange">
create table if not exists exchange(
name varchar(50) primary key,
type varchar(20),
durable boolean,
autoDelete boolean,
arguments varchar(1024));
</update>
<update id="createTableBinding">
create table if not exists binding(
exchangeName varchar(50),
queueName varchar(50),
bindingKey varchar(1024)
);
</update>
<update id="createTableQueue">
create table if not exists queue(
name varchar(50) primary key,
durable boolean,
autoDelete boolean,
exclusive boolean,
arguments varchar(1024)
);
</update>
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#
{arguments});
</insert>
<delete id="deleteExchange">
delete from exchange where name = #{exchangeName};
</delete>
<select id="selectExchanges" resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
</select>
<insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values(#{exchangeName},#{queueName},#{bindingKey});
</insert>
<delete id="deleteBinding">
delete from binding where queueName = #{queueName}
and exchangeName = #{exchangeName}
and bindingKey = #{bindingKey};
</delete>
<select id="selectBindings" resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
</select>
<insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
insert into queue values (#{name},#{durable},#{autoDelete},#{exclusive},#
{arguments});
</insert>
<delete id="deleteQueue" >
delete from queue where name = #{QueueName};
</delete>
<select id="selectQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
select * from queue;
</select>
</mapper>
ps:创建表的操作是放在<update>标签中实现的。 其他的标签和使用MySQL的时候相同。
在DataManage类中,不仅对上述操作数据库的操作进行了封装,还对数据库文件进行了创建(具体的方法是init,通过该方法完成数据库的创建,数据库中表的创建,和数据库中默认数据的添加(主要是exchange的添加,仿照RabbitMQ来的))。
DataBaseManage类代码具体实现:
@Slf4j
public class DataBaseManage {
private MetaMapper metaMapper;
//创建数据库和表
public void init() {
metaMapper = MqApplication.context.getBean(MetaMapper.class);
//判断数据库是否存在
if(!isExistsDataBase()) {
File file = new File("./data/meta.db");
boolean ret = file.mkdirs();
if(ret) {
log.info("创建数据库成功");
}else {
log.info("创建数据库失败");
}
//创建表
createTable();
//添加默认数据
createDefaultData();
log.info("数据库初始化已完成");
}else {
log.info("数据库已存在");
}
}
public void deleteDB() {
//删除数据库中的文件
File file = new File("./data/meta.db");
log.info(file.getAbsolutePath());
boolean ret = file.delete();
if (ret) {
log.info("删除数据库中文件成功");
}else {
log.info("删除数据库中文件失败");
}
//删除目录
file = new File("./data");
log.info(file.getAbsolutePath());
ret = file.delete();
if(ret) {
log.info("删除目录成功");
}else {
log.info("删除目录失败");
}
}
//此处主要是添加一个默认的交换机
//构造一个DIRECT类型的交换机
//在RabbitMQ中有一个设定:带有一个匿名的交换机,类型是DIRECT
private void createDefaultData() {
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
log.info("插入exchange成功");
}
private void createTable() {
metaMapper.createTableQueue();
metaMapper.createTableBinding();
metaMapper.createTableExchange();
log.info("[createTable] 创建表完成");
}
private boolean isExistsDataBase() {
File file = new File("./data/meta.db");
return file.exists();
}
public void insertExchange(Exchange exchange) {
metaMapper.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
metaMapper.deleteExchange(exchangeName);
}
public List<Exchange> selectExchanges() {
return metaMapper.selectExchanges();
}
public void insertBinding(Binding binding) {
metaMapper.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
metaMapper.deleteBinding(binding);
}
public List<Binding> selectBindings() {
return metaMapper.selectBindings();
}
public void insertQueue(MSGQueue msgQueue) {
metaMapper.insertQueue(msgQueue);
}
public void deleteQueue(String queueName) {
metaMapper.deleteQueue(queueName);
}
public List<MSGQueue> selectQueues() {
return metaMapper.selectQueues();
}
}
ps:@Slf4j是日志打印注解,也是lombok工具类中的一个注解。
b .MessageFileManage
每个队列中的消息的写入,取出,删除,垃圾回收都是通过这个类来完成的。在这个类中消息的二进制形式的,所以在消息的写入和读出都需要用到Stream相关类。主要有InputStream/OutputStream,DataInputStream/DataOutputStream(继承于InputStream/OutputStream,让读和写的方法更多,不需要关注过多的数据类型转换),RandomAccessFile(可以在任意位置修改文件中的数据)
该类需要维护的文件有两个:queue_data.txt(message真实存储的地方,二进制的形式存储),queue_stat.txt(当前queue_data文件中总消息个数和有效消息个数,以便后续进行垃圾回收,形如:2000\t1000,2000:总消息个数,1000有效消息个数)
MessageFileManage代码具体实现:
@Slf4j
public class MessageFileManage {
public void init() {
}
//维护一个静态内部类Stat:表示文件之中总数和有效数
public static class Stat {
private int totalCount;
private int validCount;
public int getTotalCount() {
return totalCount;
}
public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}
public int getValidCount() {
return validCount;
}
public void setValidCount(int validCount) {
this.validCount = validCount;
}
}
//获取消息队列的文件路径
private String getQueueDir(String queueName) {
return "./data/" + queueName;
}
//获取消息队列中消息的文件路径
private String getQueueDataDir(String queueName) {
return getQueueDir(queueName) + "/queue_data.txt";
}
//获取消息中数据的文件路径
//由于是一个二进制文件,采用删除文件中消息的方式是逻辑删除(并不是真正的删除)
// 所以需要单独维护一个文件记录当前文件总消息数和有效消息数,以便在总消息数达到多少(拍脑门拍出来的),
// 且有效消息数占到多少是触发回收(这里是复制算法,单独搞一个文件,把有效消息拷贝过去,然后直接删除掉之前文件中的所有消息)
//上述垃圾回收的过程是比较消耗cpu资源的,所以不能频繁发生(具体情况具体分析)。
//约定文件中内容格式为:总数\t有效数
private String getQueueStatDir(String queueName) {
return getQueueDir(queueName) + "/queue_stat.txt";
}
//读取queue_data.txt文件中数据,并且赋值给stat
private Stat readStat(String queueName) {
Stat stat = new Stat();
//直接使用Scanner进行读取就好
try(InputStream inputStream = new FileInputStream(getQueueStatDir(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//向queue_data.txt文件写入数据(stat)
private void writeStat(String queueName,Stat stat) {
try(OutputStream outputStream = new FileOutputStream(getQueueStatDir(queueName))) {
PrintWriter writer = new PrintWriter(outputStream);
writer.write(stat.totalCount + "\t" + stat.validCount);
writer.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//创建队列目录和文件
public void createQueueFiles(String queueName) throws IOException {
//1.创建队列
File queueDir = new File(getQueueDir(queueName));
if(!queueDir.exists()) {
//还有个上级目录:data目录,所以用mkdirs
boolean ok = queueDir.mkdirs();
if(!ok) {
log.warn("创建文件失败");
throw new IOException("[createQueueFiles] 创建队列失败,queueDir:" +
queueDir.getAbsolutePath());
}
}
//2.创建消息文件
File queueDataDir = new File(getQueueDataDir(queueName));
if(!queueDataDir.exists()) {
boolean ok = queueDataDir.createNewFile();
if(!ok) {
log.warn("创建文件失败");
throw new IOException("[createQueueFiles] 创建消息文件失败,queueDataDir:"
+ queueDataDir.getAbsolutePath());
}
}
//3.创建消息统计文件
File queueStatDir = new File(getQueueStatDir(queueName));
if(!queueStatDir.exists()) {
boolean ok = queueStatDir.createNewFile();
if(!ok) {
log.warn("创建文件失败");
throw new IOException("[createQueueFiles] 创建消息统计文件失败,
queueStatDir:" + queueStatDir.getAbsolutePath());
}
}
//4.给消息统计文件赋初始值
Stat stat = new Stat();
stat.validCount = 0;
stat.totalCount = 0;
writeStat(queueName,stat);
log.info("创建文件成功");
}
//删除队列对应目录和文件
//如果队列删除了,那么队列中的文件都会被删除
public void deleteQueue(String queueName) throws IOException {
//先删除里面文件
File queueDataFile = new File(getQueueDataDir(queueName));
if(queueDataFile.exists()) {
boolean ok = queueDataFile.delete();
if(!ok) {
log.warn("删除文件失败");
throw new IOException("删除文件失败");
}
}
File queueStatFile = new File(getQueueStatDir(queueName));
if(queueStatFile.exists()) {
boolean ok = queueStatFile.delete();
if(!ok) {
log.warn("删除文件失败");
throw new IOException("删除文件失败");
}
}
File queueDir = new File(getQueueDir(queueName));
if(queueDir.exists()) {
boolean ok = queueDir.delete();
if(!ok) {
log.warn("删除文件失败");
throw new IOException("删除文件失败");
}
}
log.info("删除文件成功");
}
//检查当前队列目录和文件是否存在
private boolean checkFileExist(String queueName) {
File queueDir = new File(getQueueDir(queueName));
if(!queueDir.exists()) {
return false;
}
File queueDataDir = new File(getQueueDataDir(queueName));
if(!queueDataDir.exists()) {
return false;
}
File queueStatDir = new File(getQueueStatDir(queueName));
if(!queueStatDir.exists()) {
return false;
}
return true;
}
//这个方法是将message写入queue中
public void writeMessage(MSGQueue queue, Message message) throws IOException {
//1.检查当前队列目录和文件是否存在
if(!checkFileExist(queue.getName())) {
throw new IOException("队列或队列中文件不存在,queueName:" + queue.getName());
}
//2.将message序列化成二进制数据
byte[] messageBinary = BinaryTool.toBytes(message);
//对当前队列进行加锁,防止在多线程环境下出现线程安全问题
synchronized(queue) {
//3.设置message中的offsetBeg和offsetEnd
File queueDataFile = new File(getQueueDataDir(queue.getName()));
message.setOffsetBeg(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + messageBinary.length + 4);
messageBinary = BinaryTool.toBytes(message);
try(OutputStream outputStream = new
FileOutputStream(getQueueDataDir(queue.getName()),true)) {
try(DataOutputStream dataOutputStream = new
DataOutputStream(outputStream)) {
//约定以下面格式进行写入
//先写入当前bytes的长度
//再写入具体的bytes(二进制数据)
dataOutputStream.writeInt(messageBinary.length);
dataOutputStream.write(messageBinary);
}
}
//4.更新统计文件中的数据
Stat stat = readStat(queue.getName());
//可能出现线程安全问题
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(),stat);
}
}
//只是进行逻辑删除,触发垃圾回收时,才真正进行删除
public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {
if (!checkFileExist(queue.getName())) {
throw new IOException("队列或队列中文件不存在,queueName:" + queue.getName());
}
synchronized (queue) {
try (RandomAccessFile randomAccessFile = new
RandomAccessFile(getQueueDataDir(queue.getName()), "rw")) {
//1.先从文件中把对应message的信息读取出来
//构造一个bytes数组,长度为message的length
byte[] bytes = new byte[(int) (message.getOffsetEnd() -
message.getOffsetBeg())];
//移动光标到offsetBeg为止
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bytes);
//2.反序列化成java对象
Message message1 = (Message) BinaryTool.fromBytes(bytes);
message1.setIsValid((byte) 0x0);
//3.在序列化成二进制数据写入文件中
//注意光标的位置
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(BinaryTool.toBytes(message1));
}
//可以看到上述一顿操作也只是改变了文件中的一个字节
//4.更新统计文件中的数据
Stat stat = readStat(queue.getName());
if (stat.validCount > 0) {
stat.validCount -= 1;
}
writeStat(queue.getName(), stat);
}
}
//加载队列中的所有消息
public LinkedList<Message> loadAllMessagesFromQueue(String queueName) throws ClassNotFoundException, IOException {
LinkedList<Message> messages = new LinkedList<>();
try(InputStream inputStream = new FileInputStream(getQueueDataDir(queueName))){
try(DataInputStream dataInputStream = new DataInputStream(inputStream)) {
//通过下面一个while循环就可以将文件读完
//到文件末尾会抛出EOFException异常
//可以通过捕获该异常来表示已经读取到文件末尾
long currentOffset = 0;
while(true) {
//1.读取message的长度
int messageSize = dataInputStream.readInt();
//2.读取message的内容
byte[] buffer = new byte[messageSize];
if(buffer.length != messageSize) {
throw new MqException("[MessageLoad]文件读取有误,queueName:" +
queueName);
}
dataInputStream.read(buffer);
//3.反序列化成message对象
Message message = (Message) BinaryTool.fromBytes(buffer);
//判断是否为有效数据
if(message.getIsValid() == 0x0) {
currentOffset += (4 + messageSize);
continue;
}
//4.设置当前message的偏移量
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
} catch (EOFException e) {
//如果发生了该异常,说明文件已经读取完成
//只需要捕获一下当前异常,然后说明一下读取完成
log.info("文件读取完成");
}
}
return messages;
}
//检查当前是否要进行GC
//回收规则是:当总数达到2000,且有效数据个数超过总数的一半时,就触发GC回收
private boolean checkGC(String queueName) {
Stat stat = readStat(queueName);
if(stat.totalCount > 2000 && stat.validCount * 2 > stat.totalCount) {
return true;
}
return false;
}
//新文件位置
private String getNewQueueDataDir(String queueName) {
return getQueueDir(queueName) + "/queue_data_new.txt";
}
//GC回收
//思路:创建一个新文件(queue_data_new.txt),然后把旧文件中的有效数据拷贝到新文件中
// 然后直接删除掉旧文件,然后将原来旧文件的名称给新文件(queue_data.txt)
public void gc(MSGQueue queue) throws IOException, ClassNotFoundException {
//线程安全问题需要注意
synchronized (queue) {
//由于gc是比较耗时的操作,所以可以统计一哈消耗的时间
long begTime = System.currentTimeMillis();
//1.准备好新文件
File newFile = new File(getNewQueueDataDir(queue.getName()));
if (newFile.exists()) {
throw new MqException("gc过程中数据没有删除干净");
}
boolean ok = newFile.createNewFile();
if (!ok) {
throw new MqException("新文件创建失败");
}
//2.将文件里的有效数据个数拷贝到新文件中
List<Message> messages = loadAllMessagesFromQueue(queue.getName());
try (OutputStream outputStream = new FileOutputStream(getNewQueueDataDir(queue.getName()),true)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for (Message message : messages) {
if (message.getIsValid() == 0x1) {
byte[] buffer = BinaryTool.toBytes(message);
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
}
//3.删除旧文件queue_data.txt
File oldFile = new File(getQueueDataDir(queue.getName()));
if (oldFile.exists()) {
boolean ok1 = oldFile.delete();
if (!ok1) {
throw new MqException("删除queue_data.txt文件失败");
}
}
//4.将新文件重新命名为queue_data.txt
boolean ok2 = newFile.renameTo(oldFile);
if (!ok2) {
throw new MqException("重命名queue_data_new.txt文件为queue_data.txt失败");
}
//5.更新queue_stat.txt文件中的信息
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
long endTime = System.currentTimeMillis();
log.info("gc消耗的时间:" + (endTime - begTime));
}
}
}
关于Stat类:
是一个静态内部类,用来维护总消息个数(totalCount)和有效消息个数(validCount),队列中有消息写入来totalCount和validCount都会+1(初始化放在了创建队列相关文件方法里面);有消息删除时,validCount会-1;这里约定当toalCount>2000且validCount * 2>totalCount时候,才进行垃圾回收。这里变量的加减操作可能会出现线程安全问题,所以在相关步骤实现了加锁。
关于queue_stat文件的写和读:
主要是readStat和writeStat方法,就是正常的文件读写操作,需要注意两个变量之间以\t作为分隔符,方便后续的数据读取(使用的Scanner中nextInt方法)
关于序列化和反序列化:
在这里封装成了一个类(BinaryTool),里面有两个静态方法:toBytes(序列化,使用ObjectOutputStream),fromBytes(反序列化,使用ObjectInputStream)
具体实现:
public class BinaryTool {
//序列化
public static byte[] toBytes(Object o) throws IOException {
//由于传入的参数是一个变长的对象,所以使用ByteArrayOutputStream流对象就相当于一个变长的数组
//就可以把object序列化的二进制数据写入ByteArrayOutputStream,最终在统一转化成byte数组
try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
//这个方法就是将object对象序列化,并写入到ObjectOutputStream中
//由于ObjectOutputStream关联到ByteArrayOutputStream,所以最终序列化后的数据会写入到ByteArrayOutputStream中
objectOutputStream.writeObject(o);
}
return byteArrayOutputStream.toByteArray();
}
}
//反序列化
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();
}
}
return object;
}
}
关于写入信息:
也就是writeMessage方法,需要注意的是offsetBeg和offsetEnd的设置和打开OutputStream中拼接参数的选项。
序列化是整个message,注意在设置完offsetBeg和offsetEnd之后,后续要在针对当前message进行一次序列化,防止写入的message中这两个变量的值为0。还有不要忘了对queue_stat.txt文件的更新。
关于删除信息:
也就是deleteMessage方法,需要注意的是RandomAccessFile(可读可写)类中光标的移动,所以就需要维护一个变量来存储修改前光标的位置,先将对应位置的数据读取出来(通过两个偏移量,还涉及到反序列化),然后在进行数据修改,完成之后,还需要将光标移动回修改前的位置,最后在将修改完成的数据重新写入刚才的位置(涉及到序列化)。这个过程也只是修改message属性中一个值isValid(将其修改成0x0,无效)
关于加载所有信息:
需要维护一个currentOffset变量,来记录当前数据读取到哪了。在读取过程中使用while(true)死循环,直到抛出EOFException异常,代表读取到文件末尾,只需捕获该异常,就可以停止文件地读取。currentOffset每次是进行加messageSize+4。
关于垃圾回收(gc):
这是采用复制算法,也就是当触发gc回收(总消息个数>2000且有效消息个数 * 2>总消息个数)的时候将有效消息全部拷贝到一个新的文件(queue_new_data.txt)中,然后删除刚才的queue_data.txt文件,最后在将新文件(queue_new_data.txt)命名为(queue_data.txt)。
c.DiskDataManage
该类做的主要工作是对上述两个进行了进一步的封装,为上层提供api的调用。
代码具体实现:
public class DiskDataManage {
//使用这个类来操作数据库中数据
private DataBaseManage dataBaseManage = new DataBaseManage();
//使用这个类来操作文件中的相关数据
private MessageFileManage messageFileManage = new MessageFileManage();
public void init() {
dataBaseManage.init();
//此处的init是空方法,等后续需要用的时候,直接在这初始化就行
messageFileManage.init();
}
public void deleteDB() {
dataBaseManage.deleteDB();
}
//封装交换机相关操作
public void insertExchange(Exchange exchange) {
dataBaseManage.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManage.deleteExchange(exchangeName);
}
public List<Exchange> selectExchanges() {
return dataBaseManage.selectExchanges();
}
//封装队列相关操作
public void insertQueue(MSGQueue queue) {
dataBaseManage.insertQueue(queue);
}
public void deleteQueue(String queueName) {
dataBaseManage.deleteQueue(queueName);
}
public List<MSGQueue> selectQueues() {
return dataBaseManage.selectQueues();
}
//封装Binding相关操作
public void insertBinding(Binding binding) {
dataBaseManage.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManage.deleteBinding(binding);
}
public List<Binding> selectBindings() {
return dataBaseManage.selectBindings();
}
//封装文件相关操作
public void createQueueFiles(String queueName) throws IOException {
messageFileManage.createQueueFiles(queueName);
}
public void deleteQueueFiles(String queueName) throws IOException {
messageFileManage.deleteQueue(queueName);
}
public void writeMessage(MSGQueue queue, Message message) throws IOException {
messageFileManage.writeMessage(queue,message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
messageFileManage.deleteMessage(queue,message);
}
public LinkedList<Message> loadMessages(String queueName) throws IOException, ClassNotFoundException {
return messageFileManage.loadAllMessagesFromQueue(queueName);
}
public void gc(MSGQueue queue) throws IOException, ClassNotFoundException {
messageFileManage.gc(queue);
}
}
(2)数据在内存中存储:
主要是MemoryDataManage类,里面有主要有六个Map和若干方法。考虑到线程安全问题Map是直接采用ConcurrentHashMap。
存储所有信息的Map:
ConcurrentHashMap<String,Message> messageMap(key-messageId,value-message)
存储所有交换机的Map:
ConcurrentHashMap<String,Exchange> exchangeMap(key-exchangeName,value-Exchange)
存储所有队列的Map:
ConcurrentHashMap<String,MSGQueue> queueMap(key-queueName,value-MSGQueue)
存储所有绑定关系的Map:
ConcurrentHashMap<String,ConcurrentHashMap<String,Binding>> bindingMap(key-exchangeName,value-ConcurrentHashMap(field-queueName,value-Binding))
存储每一个队列中的信息的Map:
ConcurrentHashMap<String,LinkedList<Message>> queueMessageMap(key-queueName,value-LinkedList<Message>)
存储所有未被确认的消息:
ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> messageWithAckMap(key-queueName,value-ConcurrentHashMap(field-messageId,value-Message))
代码的具体实现:
@Slf4j
public class MemoryDataManage {
//考虑到线程安全问题,直接使用ConcurrentHashMap即可
private DiskDataManage diskDataManage = new DiskDataManage();
//缓存exchange:key-exchangeName,value-Exchange
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
//缓存queue:key-queueName,value-MSGQueue
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
//缓存binding:key-exchangeName,value-HashMap(key:queueName,value:Binding)
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
//缓存message:key-messageId(字符串类型),value-Message
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
//缓存队列与消息之间的关系:key-queueName,value-LinkedList(Message)
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
//缓存当前未被确认的消息(被消费者取走,但还没有应答的消息):key-queueName,value-ConcurrentHashMap<messageId(字符串类型),Message>
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
public void insertExchange(Exchange exchange) {
log.info("插入exchange成功");
exchangeMap.put(exchange.getName(),exchange);
}
public void deleteExchange(String exchangeName) {
log.info("删除exchange成功");
exchangeMap.remove(exchangeName);
}
public Exchange getExchange(String exchangeName) {
log.info("根据exchangeName获取exchange成功");
return exchangeMap.get(exchangeName);
}
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(),queue);
log.info("插入queue成功");
}
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
log.info("删除queue成功");
}
public MSGQueue getQueue(String queueName) {
log.info("根据queueName获取queue成功");
return queueMap.get(queueName);
}
public void insertBinding(Binding binding) {
//根据exchangeName查询bindingMap如果不存在就创建,否则不管
//ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//if(bindingMap == null) {
//bindingMap = new ConcurrentHashMap<>();
//bindingsMap.put(binding.getExchangeName(),bindingMap);
//}
//下面代码就是上述代码效果
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), t -> new ConcurrentHashMap<>());
synchronized (bindingMap) {
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MqException("当前队列已经被绑定,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(), binding);
log.info("绑定成功,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());
}
}
public void deleteBinding(Binding binding) {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap == null) {
throw new MqException("绑定不存在,exchangeName:" + binding.getExchangeName() + " queueName:" + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
log.info("删除binding成功");
}
//这里写两个版本:
//根据exchangeName和queueName获取唯一的Binding
//根据exchangeName获取所有的Binding
public Binding getBinding(String queueName,String exchangeName) {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
if(bindingMap == null) {
return null;
}
return bindingMap.get(queueName);
}
public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}
public void addMessage(Message message) {
messageMap.put(message.getBasicProperties().getMessageId(),message);
log.info("插入message成功");
}
public void removeMessage(String messageId) {
messageMap.remove(messageId);
log.info("删除message成功");
}
public Message getMessage(String messageId) {
log.info("根据messageId获取message成功");
return messageMap.get(messageId);
}
//发送消息到指定队列
public void sendMessage(String queueName, Message message) {
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queueName,t -> new LinkedList<>());
synchronized (messages) {
messages.add(message);
}
addMessage(message);
log.info("发送消息到指定队列成功");
}
//从指定队列中获取消息
public Message peekMessage(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null || messages.size() == 0) {
log.info("未找到对应队列");
return null;
}
//采用头删的方式获取消息
synchronized (messages) {
log.info("获取队列中消息成功");
return messages.getFirst();
}
}
public Message pollMessage(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null || messages.size() == 0) {
log.info("未找到对应队列");
return null;
}
//采用头删的方式获取消息
synchronized (messages) {
log.info("获取队列中消息成功");
return messages.pollFirst();
}
}
//获取指定消息队列中的消息个数
public int getCounts(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null || messages.size() == 0) {
log.info("当前队列中消息个数为0");
return 0;
}
synchronized (messages) {
log.info("获取指定队列中消息个数成功");
return messages.size();
}
}
//添加未确认的消息(已经被取走了,但消费者还没有做出确认,确认确实已经进行了处理该消息)
public void addMessageWaitAck(String queueName,Message message) {
ConcurrentHashMap<String,Message> messages = queueMessageWaitAckMap.computeIfAbsent(queueName,t -> new ConcurrentHashMap<>());
synchronized (messages) {
messages.put(message.getBasicProperties().getMessageId(), message);
log.info("添加未确认的消息成功");
}
}
//删除未确认的消息(消费者这边已经确认了)
public void removeMessageWaitAck(String queueName,String messageId) {
ConcurrentHashMap<String, Message> messages = queueMessageWaitAckMap.get(queueName);
if (messages == null && messages.size() == 0) {
return;
}
synchronized (messages) {
messages.remove(messageId);
log.info("删除未确认的消息成功");
}
}
//获取指定的未确认的消息
public Message getMessageWaitAck(String queueName,Message message) {
ConcurrentHashMap<String, Message> messages = queueMessageWaitAckMap.get(queueName);
if (messages == null && messages.size() == 0) {
log.info("获取指定队列未确认消息失败");
return null;
}
log.info("获取指定队列未确认消息成功");
return messages.get(message.getBasicProperties().getMessageId());
}
//加载data文件中的数据到内存中(重启后应该做的工作)
public void recovery(DiskDataManage diskDataManage) throws IOException, ClassNotFoundException {
//1.删除内存中的所有数据
messageMap.clear();
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
queueMessageMap.clear();
//2.恢复queueMap中数据
List<MSGQueue> queues = diskDataManage.selectQueues();
for(MSGQueue queue : queues) {
queueMap.put(queue.getName(),queue);
}
log.info("queueMap中数据恢复完成");
//3.恢复messageMap和queueMessageMap中的数据
for(MSGQueue queue : queues) {
LinkedList<Message> messages = diskDataManage.loadMessages(queue.getName());
queueMessageMap.put(queue.getName(), messages);
for(Message message : messages) {
messageMap.put(message.getBasicProperties().getMessageId(),message);
}
}
//4.恢复exchangeMap中的数据
List<Exchange> exchanges = diskDataManage.selectExchanges();
for(Exchange exchange : exchanges) {
exchangeMap.put(exchange.getName(),exchange);
}
//5.恢复bindsingMap中的数据
List<Binding> bindings = diskDataManage.selectBindings();
for(Binding binding : bindings) {
ConcurrentHashMap<String,Binding> bind = new ConcurrentHashMap<>();
bind.put(binding.getQueueName(),binding);
bindingsMap.put(binding.getExchangeName(),bind);
}
log.info("bindsingMap中数据恢复完成");
//注意:"未被确认的消息"不需要从硬盘上恢复
// 在等待ack的过程中(服务器),一旦当服务器重启了过后这部分数据就会变成"未取走的消息"
//未取走的消息就是在硬盘上存储的,此时消费者需要重新从对列中取
}
}
上述类中的方法主要是对对应map的增删查改(在这个过程需要注意线程安全问题,主要是针对未使用线程安全的集合类来说,比如ArrayList,LinkedList)。
关于发送消息到指定队列:
不仅需要再messageMap中添加,还需要再queueMessageMap中添加。
关于未确认消息的集合:
消费者如果在对某个消息进行消费过后(相当于第一次确认消息已经被消费了),是需要进行消息确认(进行二次确认消息已经被消费了),这个消息确认分为自动确认(消息一经第一次消费过后,数据就会进行删除)和手动确认(手动调用最开始说的basicAck方法),在进行消息确认之后才会进行内存上相关信息数据的删除,否则不进行删除。这个消息确认是通过一个变量(autoAck)来进行的,true:自动确认,false:手动确认
关于内存中数据的恢复:
也就是recovery方法,通过这个方法可以在服务器进行重启的时候,进行内存中数据的恢复,所以该方法应该是放在最初初始化的时候执行。需要注意的是未确认消息的集合不需要进行恢复,如果已经进行了消费但是还没有进行确认的话,此时正好服务器又重启了,就可能会导致重复消费的问题,这个问题是消费者这边所考虑的问题。
(3)两个核心类:
VirtualHost和BrokerServer(消息队列服务器本体)
①VirtualHost
数据库和表的初始化操作放在了VirtualHost的构造方法中,在该方法中还需要调用recovery方法,对内存中数据的恢复。为了不同虚拟主机中能使用的队列/交换机相同,所以在下面所有方法中对queueName和exchangeName进行了处理(在对应的交换机名和队列名前面加上当前的主机名)
然后就是为上层提供九个api:exchangeDeclare,exchangeDelete,queueDeclare,queueDelete,queueBind,queueUnbind,basicPublish,basicConsume,basicAck
ps:在这个类中还有两个锁对象:queueLocker和exchangeLocker(考虑到多线程环境下操作可能会出问题,所以会对相关操作加锁)
a.exchangeDeclare(创建交换机):
在创建之前会先查看内存中是否已经存在所需创建的交换机,需要传入的参数:exchangeName(String,交换机名称),type(ExchangeType,交换机类型),durable(boolean,是否持久化),autoDelete(boolean,是否自动删除,未实现),arguments(Map<String,Object>,扩展项)
代码具体实现:
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete,
Map<String,Object> arguments) {
//区分不同虚拟主机上的交换机
exchangeName = VirtualHostName + exchangeName;
synchronized (exchangeLocker) {
try {
//1.判断对应交换机(内存上)是否存在
Exchange toAdd = memoryDataManage.getExchange(exchangeName);
if (toAdd != null) {
throw new MqException("[VirtualHost] 当前需要创建的交换机已经存在");
}
//2.如果不存在就进行创建
Exchange newExchange = new Exchange();
newExchange.setArguments(arguments);
newExchange.setType(type);
newExchange.setDurable(durable);
newExchange.setName(exchangeName);
newExchange.setAutoDelete(autoDelete);
//3.写入硬盘(持久化)
if (durable) {
diskDataManage.insertExchange(newExchange);
}
//4.写入内存
memoryDataManage.insertExchange(newExchange);
log.info("[VirtualHost] 交换机创建成功");
return true;
}catch (Exception e) {
//一旦上述过程发生异常,就说明交换机创建失败
log.info("[VirtualHost] 交换机创建失败");
e.printStackTrace();
return false;
}
}
}
ps:上述的参数是仿照这RabbitMQ来搞的(包括下面的所有方法的参数)。
b.exchangeDelete(销毁交换机):
只需要传入一个参数:exchangeName(String),根据exchangeName去查看内存中是否存在,如果存在,则根据查出来的exchange,看是否进行了持久化存储,如果是持久化存储的,则进行删除,否则则只需要删除内存中的exchange即可(也就是存储在exchangeMap中的exchange)
代码具体实现:
public boolean exchangeDelete(String exchangeName) {
exchangeName = VirtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
//1.首先判断对应交换机是否存在
Exchange toDelete = memoryDataManage.getExchange(exchangeName);
if (toDelete == null) {
throw new MqException("[VirtualHost] 当前交换机不存在");
}
//2.进行硬盘上的删除
if(toDelete.isDurable()) {
diskDataManage.deleteExchange(exchangeName);
}
//3.进行内存删除
memoryDataManage.deleteExchange(exchangeName);
log.info("[VirtualHost] 删除交换机成功");
return true;
}
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 删除交换机失败");
return false;
}
}
c.queueDeclare(创建队列):
需要传入的参数是queueName(String,队列名称),durable(boolean,是否持久化),autoDelete(boolean,是否自动删除,未实现),exclusive(boolean,是否独占),arguments(Map<String,Object>,扩展项)
代码具体实现:
public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,
Map<String,Object> argument) {
queueName = VirtualHostName + queueName;
try {
synchronized (queueLocker) {
//首先创建queue相关文件
diskDataManage.createQueueFiles(queueName);
//1.判断当前队列是否存在
MSGQueue queue = memoryDataManage.getQueue(queueName);
if (queue != null) {
throw new MqException("[VirtualHost] 当前队列已经存在");
}
//2.创建新的队列
MSGQueue toAdd = new MSGQueue();
toAdd.setExclusive(exclusive);
toAdd.setArguments(argument);
toAdd.setName(queueName);
toAdd.setDurable(autoDelete);
toAdd.setDurable(durable);
//3.存储在硬盘中
if (durable) {
diskDataManage.insertQueue(toAdd);
}
//4.存储在内存中
memoryDataManage.insertQueue(toAdd);
log.info("[VirtualHost] 创建队列成功");
return true;
}
}catch (Exception e) {
//如果出现异常,则说明创建队列失败
e.printStackTrace();
log.info("[VirtualHost] 创建队列失败");
return false;
}
}
d.queueDelete(销毁队列):
需要传入的参数:queueName(String,队列名称),核心逻辑跟销毁队列一样
代码具体实现:
public boolean queueDelete(String queueName) {
//首先对queueName进行处理
queueName = VirtualHostName + queueName;
try {
synchronized (queueLocker) {
//1.检查队列是否存在
MSGQueue queue = memoryDataManage.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 当前队列不存在");
}
//2.进行判断当前队列是否持久化,如果持久化才进行硬盘删除
if (queue.isDurable()) {
diskDataManage.deleteQueue(queueName);
}
//3.删除内存中的存储的数据
memoryDataManage.deleteQueue(queueName);
//4.删除queue目录文件
diskDataManage.deleteQueueFiles(queueName);
}
log.info("[VirtualHost] 队列删除成功");
return true;
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 队列删除失败");
return false;
}
}
e.queueBind(创建绑定):
需要传入的参数:exchangeName(String,交换机名称),queueName(String,队列名称),bindingKey(String)
具体的代码实现:
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
queueName = VirtualHostName + queueName;
exchangeName = VirtualHostName + exchangeName;
try {
synchronized(exchangeLocker) {
synchronized (queueLocker) {
//1.先根据queueName和exchangeName查看绑定是否存在
Binding binding = memoryDataManage.getBinding(queueName,exchangeName);
if(binding != null) {
throw new MqException("[VirtualHost] 当前绑定已存在");
}
//2.检查一下bindingKey是否合法
if(!Router.checkBindingKey(bindingKey)) {
throw new MqException("[VirtualHost] bindingKey不合法,创建绑定失败");
}
//3.检查对应的交换机和队列是否存在
MSGQueue queue = memoryDataManage.getQueue(queueName);
Exchange exchange = memoryDataManage.getExchange(exchangeName);
if(queue == null || exchange == null) {
throw new MqException("[VirtualHost] 交换机或队列不存在,创建队列失败");
}
//4.构造binding对象
Binding newBinding = new Binding();
newBinding.setBindingKey(bindingKey);
newBinding.setQueueName(queueName);
newBinding.setExchangeName(exchangeName);
//5.看队列和交换机是否持久化
if(queue.isDurable() && exchange.isDurable()) {
diskDataManage.insertBinding(newBinding);
}
//6.写入内存
memoryDataManage.insertBinding(newBinding);
log.info("[VirtualHost] 创建绑定成功");
}
}
return true;
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 创建绑定失败");
return false;
}
}
ps:只要创建绑定的交换机或者队列不存在,那么此时创建绑定都是无意义的,如果都存在的话,交换机和队列都进行持久化了,此时绑定进行持久化才有意义。
f.queueUnbind(销毁绑定):
需要传入的参数:exchangeName(String,交换机名称),queueName(String,队列名称)
首先根据exchangeName和queueName在bindingMap中查找当前的binding是否存在,如果不存在直接抛异常然后返回,如果存在则直接进行硬盘(不管硬盘上是否存在都进行删除,省去查看交换机和队列是否持久化的操作)和内存上的binding的删除。
具体代码实现:
public boolean queueUnBind(String queueName,String exchangeName) {
queueName = VirtualHostName + queueName;
exchangeName = VirtualHostName + exchangeName;
try {
synchronized(exchangeLocker) {
synchronized (queueLocker) {
//1.检查当前绑定是否存在
Binding binding = memoryDataManage.getBinding(queueName,exchangeName);
if(binding == null) {
throw new MqException("[VirtualHost] 当前绑定不存在");
}
//2.无论交换机和队列是否存在都进行删除,先删除硬盘
diskDataManage.deleteBinding(binding);
//3.再删内存
memoryDataManage.deleteBinding(binding);
log.info("[VirtualHost] 删除绑定成功");
}
}
return true;
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 删除绑定失败");
return false;
}
}
g.basicPublish(发送消息):
需要传入的参数:exchangeName(String,交换机名称),routingKey(String),basicProperties(BasicProperties,消息的基本属性),body(byte[],消息的内容)
具体的代码实现:
public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body) {
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
exchangeName = VirtualHostName + exchangeName;
//1.查询当前交换机/队列是否存在
Exchange exchange = memoryDataManage.getExchange(exchangeName);
if(exchange == null) {
throw new MqException("[VirtualHost] 当前交换机不存在,发送消息失败");
}
//2.检查routingKey的合法性
if(!Router.checkRoutingKey(routingKey)) {
throw new MqException("[VirtualHost] routingKey不合法,发送消息失败");
}
//3.判断当前交换机的类型,不同类型转发方式不同
if(exchange.getType() == ExchangeType.DIRECT) {
//直接交换机,转发到以routingKey作为队列的名字,添加到对应队列中去
String queueName = VirtualHostName + routingKey;
MSGQueue queue = memoryDataManage.getQueue(queueName);
if(queue == null) {
throw new MqException("[VirtualHost] 队列不存在,发送消息失败");
}
//构造message对象
basicProperties.setRoutingKey(routingKey);
Message message = Message.createMessageWithId(basicProperties,body);
//发送到指定队列中
sendMessage(queueName,message);
}else {
//还有扇出(每一个队列发送信息)和主题交换机(routingKey和BindingKey对应上的队列才发送信息)
//使用这两种交换机的话,肯定需要遍历所有队列
ConcurrentHashMap<String,Binding> bindingsMap = memoryDataManage.getBindings(exchangeName);
for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {
//(1)获取当前队列名称和binding
String queueName = entry.getKey();
Binding binding = entry.getValue();
//(2)判定当前队列是否存在,如果不存在,直接抬走下一位
MSGQueue queue = memoryDataManage.getQueue(queueName);
if(queue == null) {
log.info("[VirtualHost] 当前队列不存在");
continue;
}
basicProperties.setRoutingKey(routingKey);
//3.判断交换机类型是否为主题交换机
if(!Router.route(exchange.getType(),binding,basicProperties)) {
continue;
}
//4.队列已经存在,构造message对象
Message message = Message.createMessageWithId(basicProperties,body);
//5.添加信息
sendMessage(queueName,message);
}
}
log.info("[VirtualHost] 添加信息成功");
return true;
}
}
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 发送信息失败");
return false;
}
}
private void sendMessage(String queueName, Message message) throws IOException, InterruptedException {
//1.判断队列是否存在
MSGQueue queue = memoryDataManage.getQueue(queueName);
if(queue == null) {
throw new MqException("[VirtualHost] 当前要写的队列不存在");
}
//2.判断消息是否需要持久化
if(message.getBasicProperties().getDeliveryMode() == 2) {
diskDataManage.writeMessage(queue,message);
}
//3.写入内存
memoryDataManage.sendMessage(queueName,message);
//4.提醒消息者消费消息
consumerManager.notifyConsumer(queueName);
}
关于routingKey和bindingKey的校验:
约定routingKey的形式:包含数字/字母(大小写)/下划线,用.进行分割
约定bindingKey的形式:包含数字/字母(大小写)/下划线,用.进行分割,*表示1个部分/#表示0个或多个部分。ps:aaa.a*.ccc/aaa.*c.ccc是不合法的
eg:routingKey:aaa.bbb.ccc.ddd bindingKey:aaa.#.ddd/aaa.*.*.ddd 可以匹配成功
但bindingKey里面不允许出现aaa.*.#.ddd/aaa.#.*.ddd/aaa.#.#.ddd的形式,这种形式在后续进行匹配的时候不好进行处理,所以可以提前对这三种形式进行一下判断。
如果当前为直接交换机,bindingKey用不到,消息需要转发到以routingKey作为名称的队列,此时只主要对routingKey做一下合法性校验。
如果当前为扇出交换机,bindingKey和routingKey都用不到,消息转发到与当前交换机进行了绑定的队列中(只需要交换机的名称)
如果当前为主题交换机,bindingKey和routingKey都会用到,此时就会涉及到两个key的校验了,如果校验成功了才进行转发消息,否则则不进行转发。
具体代码实现放在一个Router类中:
public class Router {
/*
bindingKey命名规则:
1.数字,下划线,字母
2.使用.进行分割
3.允许*或#作为一部分或多部分
*/
public static boolean checkBindingKey(String bindingKey) {
//也是合法的情况,直接和扇出交换机用不到,所以可以设置为""
if(bindingKey.length() == 0) {
return true;
}
for (int i = 0; i < bindingKey.length(); i++) {
if(bindingKey.charAt(i) >= 'A' && bindingKey.charAt(i) <= 'Z') {
continue;
}
if(bindingKey.charAt(i) >= 'a' && bindingKey.charAt(i) <= 'z') {
continue;
}
if(bindingKey.charAt(i) == '.' && bindingKey.charAt(i) == '_') {
continue;
}
if(bindingKey.charAt(i) >= '0' && bindingKey.charAt(i) <= '9') {
continue;
}
if(bindingKey.charAt(i) == '*' && bindingKey.charAt(i) == '#') {
continue;
}
//除此之外其他情况都是返回不合法的
return false;
}
//检查*和#是否为独立的部分
//aaa.*.bbb合法,aaa.a*.bbb/aaa.*a.bbb/aaa.#a.bbb不合法
String[] strs = bindingKey.split("\\.");
for(String s : strs) {
if(s.length() > 1 && (s.contains("*") || s.contains("#"))) {
return false;
}
}
//这里认为在进行约定一下
//如果使用前三种的话,匹配起来就有点麻烦,同时功能性不大
//a.#.*.b 不合法
//a.#.#.b 不合法
//a.*.#.b 不合法
//a.*.*.b 合法
for(int i = 0; i < strs.length-1; i++) {
if(strs[i].equals("#") && strs[i+1].equals("*")) {
return false;
}
if(strs[i].equals("*") && strs[i+1].equals("#")) {
return false;
}
if(strs[i].equals("#") && strs[i+1].equals("#")) {
return false;
}
}
return true;
}
/*
routingKey命名规则:
1.数字。下划线,字母
2.用.进行分割
*/
public static boolean checkRoutingKey(String routingKey) {
//进行判空,有些时候(扇出交换机用不到)不需要routingKey就可以设置为"",但是并不代表是错误的
if(routingKey.length() == 0) {
return true;
}
for (int i = 0; i < routingKey.length(); i++) {
if(routingKey.charAt(i) >= 'A' && routingKey.charAt(i) <= 'Z') {
continue;
}
if(routingKey.charAt(i) >= 'a' && routingKey.charAt(i) <= 'z') {
continue;
}
if(routingKey.charAt(i) == '.' && routingKey.charAt(i) == '_') {
continue;
}
if(routingKey.charAt(i) >= '0' && routingKey.charAt(i) <= '9') {
continue;
}
//除此之外其他情况都是返回不合法的
return false;
}
return true;
}
public static boolean route(ExchangeType type, Binding binding, BasicProperties basicProperties) {
if(type == ExchangeType.FANOUT) {
return true;
}else if(type == ExchangeType.TOPIC) {
return routeTopic(binding.getBindingKey(),basicProperties.getRoutingKey());
}else {
System.out.println("传入的交换机类型非发");
return false;
}
}
// [测试用例]
// binding key routing key result
// aaa aaa true
// aaa.bbb aaa.bbb true
// aaa.bbb aaa.bbb.ccc false
// aaa.bbb aaa.ccc false
// aaa.bbb.ccc aaa.bbb.ccc true
// aaa.* aaa.bbb true
// aaa.*.bbb aaa.bbb.ccc false
// *.aaa.bbb aaa.bbb false
// # aaa.bbb.ccc true
// aaa.# aaa.bbb true
// aaa.# aaa.bbb.ccc true
// aaa.#.ccc aaa.ccc true
// aaa.#.ccc aaa.bbb.ccc true
// aaa.#.ccc aaa.aaa.bbb.ccc true
// #.ccc ccc true
// #.ccc aaa.bbb.ccc true
public static boolean routeTopic(String bindingKey,String routingKey) {
//进行routingKey和bindingKey的匹配
//将bindingKey和routingKey进行分割(按照".")
String[] bindings = bindingKey.split("\\.");
int bindLen = bindings.length;
String[] routings = routingKey.split("\\.");
int routeLen = routings.length;
int i = 0;
int j = 0;
while(i < bindLen && j < routeLen) {
if(bindings[i].equals("*")) {
i++;
j++;
continue;
}else if(bindings[i].equals("#")) {
i++;
if(i == bindLen) {
//已经到bindings末尾了,直接返回成功
return true;
}
System.out.println(i);
//还没到末尾,需要去routings去找到bindings[i],如果没找到此时两个key已然不相同
int routingIndex = findRoutingIndex(bindings,routings,i,j);
if(routingIndex == -1) {
//没找到
return false;
}
j = routingIndex;
}else if(bindings[i].equals(routings[j])){
//bindings[i]和routings[i]相同
i++;
j++;
continue;
}else {
return false;
}
}
if(i == bindLen && j == routeLen) {
return true;
}
return false;
}
private static int findRoutingIndex(String[] bindings,String[] routings, int i, int j) {
for(int k = j; k < routings.length; k++) {
if(routings[k].equals(bindings[i])) {
return k;
}
}
return -1;
}
}
h.basicConsume(订阅消息):
需要传入的参数:consumerTag(String,消费者身份标识),queueName(String,队列名称),autoAck(boolean,自动确认)),consumer(Consumer,消费者执行回调的入口)
Consumer:是一个函数式接口,里面只有一个方法:handleDelivery,delivery是投递的意思,当某个消费者订阅了消息,服务器接收到了某个生产者生产的消息就会将这个消息推送给当前订阅消息的消费者,消费者接收到这个消息,就是通过这里的这个回调的方法来进行消费信息。
主要有三个参数:
@FunctionalInterface
public interface Consumer {
//deliver是“投递”的意思,这个方法意在每次服务器接收到信息后,
// 都把消息交给消费者进行消费
//这里的参数也是参考RabbitMQ来搞的
//consumerTag表示消费者的身份标识
void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
消费者可以通过重写这个方法来进行自己的逻辑代码的实现。
在这个方法中还涉及到两个类:ConsumerEnv(真正的消费者)和ConsumerManager(消息者管理)
ConsumerEnv:
@Data
public class ConsumerEnv {
//消费者身份标识
private String consumerTag;
//队列名称
private String queueName;
//是否是自动确认消息,还是手动确认
private boolean autoAck;
private Consumer consumer;
public ConsumerEnv(String consumerTag,String queueName,boolean autoAck,Consumer consumer) {
this.autoAck = autoAck;
this.consumer = consumer;
this.consumerTag = consumerTag;
this.queueName = queueName;
}
}
ConsumerManager:
是消费者消费信息的核心类,消费者如何能够检测到服务器这边接收到了信息,以及如何将信息推送给消费者进行消费在这个类中得到了很好的回答。
在这个类中维护了一个阻塞队列,这个队列用来存放有消息的队列,一旦有生产者生产了消息,都会调用这个方法将该队列名放进该阻塞队列;而这个取阻塞队列中的元素的操作放在一个单独运行的后台线程中实现,如果队列中没有元素,那么整个线程将会被阻塞,并且这个过程是循环的(也就是在取到元素,完成后续操作(通过队列名查找队列,然后进行消费信息)后,又会回到最开始去取元素,然后继续完成后续操作);同时还维护了一个ArrayList<ConsumerEnv>顺序表,用来存放当前所有队列中的消费者,为后序队列非独占的实现做准备(eg:queue1中的消息所有队列中的消费者都能进行消费,否则就只能queue1中的消费者进行消费)
核心方法:
addConsumer:
根据queueName找到对应队列,然后将消费者添加进入队列中维护的ArrayList中,如果没有查找到对应队列,直接抛异常。还需要往ConsumerManager中维护的ArrayList添加;如果当前队列中有消息的话,还需要进行消费完。
下面两个元素是在MSGQueue中维护的:
轮询:
consumerSeq是用来进行轮询的,如果当前队列中有两个消费者,第一次取出消费者1进行消费,第二次取出消费者2进行消费,第三次取出消费者1进行消费........
每次取出consumerEnvList.get(consumerSeq.get()%consumerEnvList.size()),然后consumerSeq++,就可以完成轮询操作(也就是先完成消费的,需要重新排队进行消费,但并不是真正的把消费者移动到顺序表末尾,每次取对首元素)
consumeMessage:
当前方法的执行是在上述所说的后台线程中执行,执行的契机是阻塞队列中有元素时(也就是服务器成功接收到消息,并且成功转发到某个队列中),一旦执行到这个方法,就需要找一个消费者进行消费,这里有两种情况:
队列独占(exclusive:true):直接通过当前队列中的存放当前当前队列所有消费者的顺序表轮询出一个消费者即可,如果没有消费者直接抛异常。
MSGQueue中的一个方法:
队列不独占(exclusive:false):此时需要从ConsumerManager中维护的存放所有队列的消费者的顺序表轮询出一个消费者,如果没有消费者直接抛异常。
ConsumerManager中的一个方法:
消费消息的过程可以交给线程池来执行,这样在同一时刻可以消费多个消息。同时如果当前消费者采用的是自动确认,那么在消费完成信息过后,需要进行相关数据的删除,如果是手动确认还需要调用方法basicAck。
notifyConsumer:
向当前阻塞队列中添加有消息的队列,是在basicPublish中调用的。
具体代码实现:
@Slf4j
public class ConsumerManager {
//存储上层的VirtualHost的对象引用,用来操作数据
private VirtualHost parent;
//创建一个线程池,其中线程用来调用回调函数,消费消息
private ExecutorService executorService = Executors.newFixedThreadPool(4);
//存放令牌的队列
private BlockingQueue<String> tokens = new LinkedBlockingQueue<>();
//扫描整体队列的线程
private Thread thread;
//用来存放当前所有队列中有哪些有消费者(为后续队列实现非独占做准备)
private ArrayList<ConsumerEnv> consumerLists = new ArrayList<>();
//方便采用轮询(第一次消费过后需要重新排队进行消费)
private AtomicInteger seqNumber = new AtomicInteger(0);
public ConsumerManager(VirtualHost virtualHost) {
parent = virtualHost;
thread = new Thread(() -> {
while(true) {
try {
//1.拿到令牌(队列名)
String queueName = tokens.take();
//2.根据令牌找到队列
MSGQueue queue = parent.getMemoryDataManage().getQueue(queueName);
if(queue == null) {
throw new MqException("当前队列不存在,queueName: " + queueName);
}
//3.消费消息(有可能产生线程安全问题所以需要对当前队列加锁)
synchronized(queue) {
try {
consumeMessage(queue);
}catch (MqException e) {
e.printStackTrace();
log.info("消费信息失败");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//设置为后台线程
thread.setDaemon(true);
thread.start();
}
public void notifyConsumer(String queueName) throws InterruptedException {
tokens.put(queueName);
}
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
try {
//1.根据队列名找到队列
MSGQueue queue = parent.getMemoryDataManage().getQueue(queueName);
if(queue == null) {
throw new MqException("[consumerManager] 当前队列不存在,queueName: " + queueName);
}
//2.构造消费者
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
//3.添加消费者
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
consumerLists.add(consumerEnv);
//如果当前队列中已经有了消息就直接消费掉
int totalMessages = parent.getMemoryDataManage().getCounts(queueName);
while(totalMessages > 0) {
//每一次循环就消费掉一个消息
consumeMessage(queue);
totalMessages--;
}
}
log.info("[consumerManager] 添加消费者到队列中成功");
}catch (Exception e) {
e.printStackTrace();
log.info("[consumerManager] 添加消费者到队列中失败,queueName: " + queueName + " ,consumerTag: " + consumerTag);
}
}
private void consumeMessage(MSGQueue queue){
//1.按照轮询的方式,查询出一个消费者
ConsumerEnv luckier = null;
log.info("queueName: " + queue.getName() + ",exclusive: " + queue.isExclusive());
if(queue.isExclusive()) {
//独占
luckier = queue.chooseConsumerEnvExclusive();
if (luckier == null) {
throw new MqException("[consumerManager] 当前队列中没有消费者,queueName: " + queue.getName());
}
log.info("独占 queueName: " + luckier.getQueueName() + ",exclusive: " + queue.isExclusive());
}else {
//非独占
luckier = chooseConsumerEnvNotExclusive();
if (luckier == null) {
throw new MqException("[consumerManager] 当前所有队列中没有消费者,queueName: " + queue.getName());
}
log.info("非独占 queueName: " + luckier.getQueueName() + ",exclusive: " + queue.isExclusive());
}
ConsumerEnv finalLuckier = luckier;
//2.取出一个消息(实际内存还存在)
Message message = parent.getMemoryDataManage().peekMessage(queue.getName());
if(message == null) {
throw new MqException("[consumerManager] 当前队列中没有消息,queueName " + queue.getName());
}
//3.消费消息caiyon
executorService.submit(() -> {
try {
//将未确认的消息存储在内存中
parent.getMemoryDataManage().addMessageWaitAck(queue.getName(),message);
//执行回调函数,消费消息
finalLuckier.getConsumer().handleDelivery(finalLuckier.getConsumerTag(),message.getBasicProperties(),message.getBody());
//这里有两种确认方式:
// 手动确认:需要手动basicAck才能彻底将消息进行消费
// 自动确认:无需确认消息一经调用就被消费掉
if(finalLuckier.isAutoAck()) {
//删除在硬盘上存储的消息
if(message.getBasicProperties().getDeliveryMode() == 2) {
parent.getDiskDataManage().deleteMessage(queue, message);
}
//删除在内存中的存储的消息(未确认)
parent.getMemoryDataManage().removeMessageWaitAck(queue.getName(),message.getBasicProperties().getMessageId());
//删除在内存中的存储的消息(实际存储的),删除总的存储message地方中的message
parent.getMemoryDataManage().removeMessage(message.getBasicProperties().getMessageId());
//删除对应对列中message
parent.getMemoryDataManage().pollMessage(queue.getName());
}
}catch (Exception e) {
e.printStackTrace();
log.info("消费消息失败");
}
});
log.info("消费消息成功");
}
private ConsumerEnv chooseConsumerEnvNotExclusive() {
if(consumerLists.size() == 0) {
return null;
}
int index = seqNumber.get() % consumerLists.size();
seqNumber.incrementAndGet();
return consumerLists.get(index);
}
}
i.basicAck(手动确认):
需要传入的参数有:queueName(String),messageId(String)
根据当前两个参数查找对应的队列和message,如果都存在再进行数据的删除
具体代码实现:
public boolean basicAck(String queueName,String messageId) {
try {
//1.获取到对应的队列和消息
MSGQueue queue = memoryDataManage.getQueue(queueName);
if(queue == null) {
throw new MqException("[VirtualHost] 当前队列不存在");
}
Message message = memoryDataManage.getMessage(messageId);
if(message == null) {
throw new MqException("[VirtualHost] 当前消息不存在");
}
//2.进行硬盘和内存上数据的删除
memoryDataManage.removeMessage(messageId);
memoryDataManage.pollMessage(queueName);
if(message.getBasicProperties().getDeliveryMode() == 2) {
//表示消息已经持久化,需要删除
diskDataManage.deleteMessage(queue,message);
}
memoryDataManage.removeMessageWaitAck(queueName,messageId);
log.info("[VirtualHost] 手动确认消息消费完成成功");
return true;
}catch (Exception e) {
e.printStackTrace();
log.info("[VirtualHost] 手动确认消息消费完成失败");
return false;
}
}
②BrokerServer
这个类主要就是接收请求(包括消费者,生产者发送的消息请求),然后解析请求,计算响应并且返回。
所以首先需要做的就是构造一个请求,这个过程是在Channel中做的,而提到Channel就不得不提到Connection,在文章开头也提到过这两者之间的关系,一个客户端只需要建立一次连接,通过这一次连接就可以发送多个请求,大大提高了网络传输效率。
这里采用自定义应用层协议:
客户端只需要调用对应的方法,然后服务器根据请求中的type来判断当前客户端调用的何种方法,从而进行响应计算和返回,客户端这边只会收到成功/失败的信息,客户端并不知道具体的处理过程,这种方式叫做:RPC(远程方法调用)
协议类型:
a.Channel类:
该类主要完成开始的9大方法的请求的构造。
该类中为维护了channelId(当前channel的身份标识),consumer(执行回调,一个channel中只能允许有一个consumer来执行回调,所以在订阅消息的时候,要首先进行判空),connection(当前channel属于哪个连接),ConcurrentHashMap<String,BasicReturns>(key为:rid,当前请求/响应的id,value:返回的响应)
如果是普通的响应(0x1-0xb请求返回的响应),那么使用BasicReturns(参数有:rid,channelId,ok(请求是否成功))这个类就好了,这里使用rid作为key,是为了防止请求发送的顺序与响应返回的顺序不同(eg:请求1先发送,请求2后发送,但是请求2却先返回,请求1后返回,使用map通过rid查询就可以查询出对应请求的响应)。每一次发送请求会通过UUID类生成一个唯一的rid(不同之间请求/响应的rid肯定不同嘛)
如果是服务器向客户端推送消息的响应(0xc,响应独有的类型),那么使用SubScribeReturns(继承于BasicReturns,在此之上还有参数:consumerTag,BasicProperties,body)。
类中还有个重要的方法:waitBasicReturns(等待响应),在方法实现的内部,如果当前响应未返回则会wait进行等待(就是通过查看上述Map中是否有元素),直到有方法(这里采用notifyAll,因为不知道有几个线程在进行等待,唤醒之后只需要在方法中进行再次的判断Map是否有元素,有的就直接返回当前取到的返回值,否则则继续等待)进行唤醒。
其他方法就是针对上述9大方法的请求构造(按照自定义应用层协议),先写type,再写length,最后写payload(负载:请求内容)
在实现具体代码之前还需要对相关请求类进行构造:
BasicArguments:(下面所有类(由于通过网络传输,都需要实现Serializable接口)的父类,公有属性)
@Data
public class BasicArguments implements Serializable {
//表示一次请求/响应的身份标识,可以将请求与响应对应起来
private String rid;
//当前channel的身份标识
private String channelId;
}
ExchangeDeclareArguments:
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private boolean durable;
private boolean autoDelete;
private ExchangeType type;
private Map<String,Object> arguments;
}
ExchangeDeleteArguments:
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
private String exchangeName;
}
QueueDeclareArguments:
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {
private String queueName;
private boolean durable;
private boolean autoDelete;
private boolean exclusive;
private Map<String,Object> arguments;
}
QueueDeleteArguments:
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {
private String queueName;
}
QueueBindArguments:
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
private String bindingKey;
}
QueueUnbindArguments:
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
}
BasicPublishArguments:
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties basicProperties;
private byte[] body;
}
BasicConsumeArguments:
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {
private String consumerTag;
private String queueName;
private boolean autoAck;
private Consumer consumer;
}
BasicAckArguments:
@Data
public class BasicAckArguments extends BasicArguments implements Serializable {
private String queueName;
private String messageId;
}
Channel类实现:
@Slf4j
@Data
public class Channel {
private String channelId;
//标识当前channel属于哪个连接
private Connection connection;
//维护一个哈希表,用来存储服务器返回来的响应
//key-rid
private ConcurrentHashMap<String,BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
//如果某个channel订阅了消息,那么就需要服务器在推送回来消息的时候,执行一下回调函数,也就是消费推送回来的消息
//这里约定一个channel中只能有一个consumer来执行回调
private Consumer consumer;
public Channel(String channelId,Connection connection) {
this.connection = connection;
this.channelId = channelId;
}
//与服务器之间的交互
//通过这个类告诉服务器我要创建一个channel
public boolean createChannel() throws IOException {
//1.构造payload
BasicArguments basicArguments = new BasicAckArguments();
basicArguments.setRid(createNewRid());
basicArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(basicArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x1);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(basicArguments.getRid());
return basicReturns.isOk();
}
private BasicReturns waitBasicReturns(String rid) {
BasicReturns basicReturns = null;
while((basicReturns = basicReturnsMap.get(rid)) == null) {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
log.info("[Channel] 服务器响应返回,等待结束");
}
}
}
//在返回结果之前还需要把对应哈希表中的数据给进行清除
//虽然每一次生成的rid都不同,好像也不会影响后续连接但是这个记录会一直在哈希表中,直到服务器停止
//但是可以让内存压力小一点
basicReturnsMap.remove(rid);
return basicReturns;
}
private String createNewRid() {
return "R-" + UUID.randomUUID();
}
//在Connection中扫描到了对应的响应在调用这个方法
public void putReturns(BasicReturns basicReturns) {
basicReturnsMap.put(basicReturns.getRid(),basicReturns);
synchronized (this) {
//由于当前不知道等待的线程有多少,所以直接全部唤醒
notifyAll();
}
}
//告诉服务器销毁channel
public boolean closeChannel() throws IOException {
//1.构造payload
BasicArguments basicArguments = new BasicAckArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(createNewRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x2);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(basicArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器创建一个交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete,
Map<String,Object> arguments) throws IOException {
//1.构造payload
ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
exchangeDeclareArguments.setExchangeName(exchangeName);
exchangeDeclareArguments.setType(type);
exchangeDeclareArguments.setDurable(durable);
exchangeDeclareArguments.setAutoDelete(autoDelete);
exchangeDeclareArguments.setArguments(arguments);
exchangeDeclareArguments.setChannelId(channelId);
exchangeDeclareArguments.setRid(createNewRid());
byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x3);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(exchangeDeclareArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器销毁交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
//1.构造payload
ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
exchangeDeleteArguments.setExchangeName(exchangeName);
exchangeDeleteArguments.setChannelId(channelId);
exchangeDeleteArguments.setRid(createNewRid());
byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x4);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(exchangeDeleteArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器创建一个队列
public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,
Map<String,Object> arguments) throws IOException {
//1.构造payload
QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
queueDeclareArguments.setQueueName(queueName);
queueDeclareArguments.setDurable(durable);
queueDeclareArguments.setExclusive(exclusive);
queueDeclareArguments.setAutoDelete(autoDelete);
queueDeclareArguments.setRid(createNewRid());
queueDeclareArguments.setArguments(arguments);
queueDeclareArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x5);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(queueDeclareArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器销毁队列
public boolean queueDelete(String queueName) throws IOException {
//1.构造payload
QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
queueDeleteArguments.setQueueName(queueName);
queueDeleteArguments.setChannelId(channelId);
queueDeleteArguments.setRid(createNewRid());
byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x6);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(queueDeleteArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器创建一个绑定
public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {
//1.构造payload
QueueBindArguments queueBindArguments = new QueueBindArguments();
queueBindArguments.setBindingKey(bindingKey);
queueBindArguments.setQueueName(queueName);
queueBindArguments.setExchangeName(exchangeName);
queueBindArguments.setRid(createNewRid());
queueBindArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(queueBindArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x7);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(queueBindArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器销毁一个绑定
public boolean queueUnBind(String queueName,String exchangeName) throws IOException {
//1.构造payload
QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();
queueUnBindArguments.setQueueName(queueName);
queueUnBindArguments.setExchangeName(exchangeName);
queueUnBindArguments.setRid(createNewRid());
queueUnBindArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(queueUnBindArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x8);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(queueUnBindArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器发送一个消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
//1.构造payload
BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
basicPublishArguments.setBasicProperties(basicProperties);
basicPublishArguments.setBody(body);
basicPublishArguments.setRoutingKey(routingKey);
basicPublishArguments.setChannelId(channelId);
basicPublishArguments.setExchangeName(exchangeName);
basicPublishArguments.setRid(createNewRid());
byte[] payload = BinaryTool.toBytes(basicPublishArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0x9);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(basicPublishArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器某个消费者要订阅消息
public boolean basicConsumer(String queueName, boolean autoAck, Consumer consumer) throws IOException {
//首先设置一下回调
if(this.consumer != null) {
throw new IOException("[Channel] 当前channel中已经设置过回调,不能重复设置");
}
this.consumer = consumer;
//1.构造payload
BasicConsumerArguments basicConsumerArguments = new BasicConsumerArguments();
//直接用当前channelId代表消费者
basicConsumerArguments.setConsumerTag(channelId);
basicConsumerArguments.setRid(createNewRid());
basicConsumerArguments.setAutoAck(autoAck);
basicConsumerArguments.setChannelId(channelId);
basicConsumerArguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(basicConsumerArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0xa);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(basicConsumerArguments.getRid());
return basicReturns.isOk();
}
//告诉服务器手动确认
public boolean basicAck(String queueName,String messageId) throws IOException {
//1.构造payload
BasicAckArguments basicAckArguments = new BasicAckArguments();
basicAckArguments.setMessageId(messageId);
basicAckArguments.setQueueName(queueName);
basicAckArguments.setRid(createNewRid());
basicAckArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(basicAckArguments);
//2.构造响应
Request request = new Request();
request.setPayload(payload);
request.setLength(payload.length);
request.setType(0xb);
//3.发送请求
connection.writeRequest(request);
//4.等待服务器的响应
BasicReturns basicReturns = waitBasicReturns(basicAckArguments.getRid());
return basicReturns.isOk();
}
}
b.Connection类:
该类主要做的工作是读取响应,然后进行返回。
首先该类通过Socket与服务器进行通信,需要传入主机ip和端口号,可以直接在构造方法中实现,在此方法中还维护了一个线程负责不断从Socket中读取响应,并且进行响应类型的判断:
如果为0xc则代表为服务器向消费者推送消息,此时就需要交给线程池进行回调方法的执行,然后进行响应的返回。
如果为0x1-0xb则代表为其他的普通地操作队列的响应,此时只需要进行响应的返回就行。
其次维护了一个ConcurrentHashMap<String,Channel>,表示当前连接里面有哪些channel,后续需要通过这个channel来完成方法的回调和响应的返回。
最后还提供了两个方法,createChannel(创建频道)和writeRequest(写请求,记得flush进行刷新)
具体代码实现:
@Slf4j
public class Connection {
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
private DataOutputStream dataOutputStream;
private DataInputStream dataInputStream;
private ExecutorService callbackPool;
//保存当前连接里面有哪些channel
private ConcurrentHashMap<String,Channel> channelMap;
public Connection(String host, int port) throws IOException {
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
channelMap = new ConcurrentHashMap<>();
callbackPool = Executors.newFixedThreadPool(4);
// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.
Thread t = new Thread(() -> {
try {
while (!socket.isClosed()) {
Response response = readResponse();
dispatchResponse(response);
}
} catch (SocketException e) {
// 连接正常断开的. 此时这个异常直接忽略.
log.info("[Connection] 连接正常断开!");
} catch (IOException | ClassNotFoundException | MqException e) {
log.info("[Connection] 连接异常断开!");
e.printStackTrace();
}
});
t.start();
}
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int n = dataInputStream.read(payload);
if (n != response.getLength()) {
throw new IOException("读取的响应数据不完整!");
}
response.setPayload(payload);
log.info("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
return response;
}
//当前类用来判断当前接收到的响应是正常的操作消息队列的操作(0x1-0xb)还是服务器推送消息的(0xc)
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException {
if(response.getType() == 0xc) {
SubScribeReturns scribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
Channel channel = channelMap.get(scribeReturns.getConsumerTag());
if(channel == null) {
throw new IOException("[Connection] 当前要找的channel不存在 channelId: " + scribeReturns.getConsumerTag());
}
//执行channel内部对象的回调(交给线程池来执行)
callbackPool.submit(() -> {
try {
channel.getConsumer().handleDelivery(scribeReturns.getConsumerTag(),scribeReturns.getBasicProperties(),
scribeReturns.getBody());
}catch (Exception e) {
e.printStackTrace();
log.info("[Connection] 执行回调过程中出现错误");
}
});
channel.putReturns(scribeReturns);
}else {
//普通操作消息队列
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
Channel channel = channelMap.get(basicReturns.getChannelId());
if(channel == null) {
throw new IOException("[Connection] 当前要找的channel不存在 channelId: " + basicReturns.getChannelId());
}
channel.putReturns(basicReturns);
}
}
//关闭所有连接
public void close() {
try {
callbackPool.shutdownNow();
inputStream.close();
outputStream.close();
socket.close();
channelMap.clear();
} catch (IOException e) {
e.printStackTrace();
log.info("[Connection] 连接释放失败");
}
}
//通过该方法在当前Connection中创建一个channel
public Channel createChannel() throws IOException {
//使用UUID生成唯一的身份标识
String channelId = "C-" + UUID.randomUUID();
Channel channel = new Channel(channelId,this);
//通知一下服务器,向服务器发送请求
boolean ok = channel.createChannel();
if(!ok) {
throw new IOException("[Connection] 创建channel失败");
}
channelMap.put(channelId,channel);
return channel;
}
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
log.info("[Connection] 发送请求 type: " + request.getType() + ",length: " + request.getLength());
}
}
c.BrokerServer类:
在BrokerServer中主要就是接收channel这边发送的请求,然后根据类型调用VirtualHost对应的方法,然后进行响应的构造并且在Connection类中读取响应将响应添加到channel的响应集合中,最终channel里面等待响应结束并进行返回。
具体代码实现:
@Slf4j
public class BrokerServer {
private ServerSocket serverSocket;
//当前只考虑一个BrokerServer上只有一个虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
//使用一个哈希表来表示当前所有的会话(就是当前有哪些客户端正在与服务器进行通信)
//key-channelId value-Socket(每一个socket就代表一个客户端)
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
//引入一个线程池,来处理多个客户端的请求
private ExecutorService executorService;
//引入一个变量,控制当前服务器的是否运行
private volatile boolean runnable = true;
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
//服务器启动
public void start() throws IOException {
log.info("[BrokerServer] 启动");
executorService = Executors.newCachedThreadPool();
try {
while(runnable) {
Socket clientSocket = serverSocket.accept();
executorService.submit(() -> {
processConnection(clientSocket);
});
}
}catch (SocketException e) {
log.info("[BrokerServer] 服务器停止运行");
}
}
//通过这方法来处理一个客户端的连接
//但在这个连接中可能设计很多的请求与响应
private void processConnection(Socket clientSocket) {
try(InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
//由于需要按照特定格式进行数据的读取,所以还需要使用下面的stream类
try(DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
//1.读取请求
Request request = readRequest(dataInputStream);
//2.根据请求计算响应
Response response = process(request,clientSocket);
//3.返回响应
writeResponse(dataOutputStream,response);
}
}
}catch (EOFException | SocketException e) {
//如果当前是这个两个异常,代表要么文件读到末尾,要么socket已经关闭
//那么此时服务器可以停止了(借助该异常)
log.info("[BrokerServer] connection关闭! 客户端的地址: " + clientSocket.getInetAddress().toString() +
": " + clientSocket.getPort());
} catch (IOException | ClassNotFoundException e) {
//如果是该异常,那么是在写读数据过程中出现了问题
e.printStackTrace();
log.info("[BrokerServer] 出现异常!");
}finally {
//关闭clientSocket
try {
clientSocket.close();
//一个TCP连接里面有多个channel,还需要清理掉与该socket相关的所有channel
clearSocketChannel(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
//该方法就是遍历上述哈希表,然后将里面与clientSocket相关联的所有的channel进行删除
private void clearSocketChannel(Socket clientSocket) {
List<String> toDelete = new LinkedList<>();
for(Map.Entry<String,Socket> entry : sessions.entrySet()) {
if(entry.getValue().equals(clientSocket)) {
toDelete.add(entry.getKey());
}
}
for(String channelId : toDelete) {
sessions.remove(channelId);
}
log.info("[BrokerServer] 删除与socket相关channel成功");
}
private Request readRequest(DataInputStream dataInputStream) throws IOException, ClassNotFoundException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if(n != request.getLength()) {
throw new IOException("[BrokerServer] 数据解析出现错误");
}
request.setPayload(payload);
log.info("[BrokerServer] 读取请求成功 type: " + request.getType() + ",length: " + n);
return request;
}
private Response process(Request request,Socket clientSocket) throws IOException, ClassNotFoundException {
//1.首先解析一下当前请求中的payload数据
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
log.info("rid: " + basicArguments.getRid() + ",channelId: " + basicArguments.getChannelId() +
",length: " + request.getLength() + ",type: " + request.getType());
//2.根据请求中type的类型解析一下当前请求是要干啥
int type = request.getType();
boolean ok = true;
if(type == 0x1) {
//创建channel
sessions.put(basicArguments.getChannelId(),clientSocket);
log.info("[BrokerServer] 创建channel成功 channelId: " + basicArguments.getChannelId());
}else if(type == 0x2) {
//销毁channel
sessions.remove(basicArguments.getChannelId());
log.info("[BrokerServer] 销毁channel成功 channelId: " + basicArguments.getChannelId());
}else if(type == 0x3) {
//创建交换机
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getType(),arguments.isDurable(),
arguments.isAutoDelete(),arguments.getArguments());
log.info("[BrokerServer] 创建交换机成功 exchangeName: " + arguments.getExchangeName());
}else if(type == 0x4) {
//销毁交换机
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
log.info("[BrokerServer] 销毁交换机成功 exchangeName: " + arguments.getExchangeName());
}else if(type == 0x5) {
//创建队列
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isAutoDelete(),
arguments.isExclusive(),arguments.getArguments());
log.info("[BrokerServer] 创建队列成功 queueName: " + arguments.getQueueName());
}else if(type == 0x6) {
//销毁队列
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete(arguments.getQueueName());
log.info("[BrokerServer] 销毁队列成功 queueName: " + arguments.getQueueName());
}else if(type == 0x7) {
//创建绑定
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(),arguments.getBindingKey());
log.info("[BrokerServer] 创建绑定成功 queueName: " + arguments.getQueueName() +
",exchangeName: " + arguments.getExchangeName() +
",bindingKey: " + arguments.getBindingKey());
}else if(type == 0x8) {
//销毁绑定
QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;
ok = virtualHost.queueUnBind(arguments.getQueueName(),arguments.getExchangeName());
log.info("[BrokerServer] 销毁绑定成功 queueName: " + arguments.getQueueName() +
",exchangeName: " + arguments.getExchangeName());
}else if(type == 0x9) {
//发送消息
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey(),
arguments.getBasicProperties(),arguments.getBody());
log.info("[BrokerServer] 发送消息成功 exchangeName: " + arguments.getExchangeName() +
",routingKey: " + arguments.getRoutingKey());
}else if(type == 0xa) {
//订阅消息
BasicConsumerArguments arguments = (BasicConsumerArguments) basicArguments;
ok = virtualHost.basicConsumer(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
//该回调函数的用处是:当有生产者往对应队列中生产了消息,服务器就可以将当前队列中的消息推送给消费者客户端
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
//(1)首先需要知道当前推送的消费者是谁
//其实这里的channelId就是当前消费者的一个身份标识,也就是consumerTag
//所以就需要从存储的哈希表(sessions)中找到当前消费者所对应的socket,进而将消息给推送回去
Socket clientSocket = sessions.get(consumerTag);
if(clientSocket == null) {
throw new MqException("[BrokerServer] 订阅消息的客户端已断开连接");
}
//(2)构造payload
SubScribeReturns scribeReturns = new SubScribeReturns();
scribeReturns.setBody(body);
scribeReturns.setConsumerTag(consumerTag);
scribeReturns.setBasicProperties(basicProperties);
scribeReturns.setOk(true);
//由于这里只有响应没有请求,暂时不需要要rid进行对应
scribeReturns.setRid("");
scribeReturns.setChannelId(consumerTag);
//(3)构造响应
Response response = new Response();
try {
byte[] payload = BinaryTool.toBytes(scribeReturns);
response.setPayload(payload);
response.setLength(payload.length);
response.setType(0xc);
//(4)写回当前响应
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream,response);
} catch (IOException e) {
e.printStackTrace();
log.info("[BrokerServer] 序列化失败|写回响应失败");
}
}
});
}else if(type == 0xb) {
//手动确认
BasicAckArguments arguments = (BasicAckArguments)basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());
log.info("[BrokerServer] 手动确认成功 queueName: " + arguments.getQueueName() +
",messageId: " + arguments.getMessageId());
}else {
throw new MqException("[BrokerServer] 未知的type type: " + type);
}
//3.构造响应并返回
Response response = new Response();
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
log.info("[Response] rid: " + basicArguments.getRid() + ",channelId: " + basicArguments.getChannelId() +
",type: " + response.getType() + ",length:" + response.getLength());
return response;
}
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
//记得刷新
dataOutputStream.flush();
}
//一般来说,都是直接kill掉进程
//但这个写了一个stop方法,主要是为了后续的单元测试
public void stop() throws IOException {
log.info("[BrokerServer] 停止");
runnable = false;
executorService.shutdownNow();
serverSocket.close();
}
}
关于SubscribeReturns响应的返回:
这个响应是在当前的某个队列中有元素并且有消费者(考虑独占的时候),才会调用basicConsume(这里就将channelId作为了consumerTag作为参数传入)中的回调方法(此时这个回调方法是服务器内部回调的执行,只是构造一个0xc的响应,消费者所设定的回调方法还是通过channel中consumer来执行),构造一个类型为0xc的响应,并且此时还会构造一个0xa的响应(用于返回basicConsume这个方法是否执行成功),而0xc这个响应在被Connection中的方法读取到后,会根据响应(SubscribeReturns)中的consumerTag(等同于channelId)来获取到对应得channel,然后让channel中得consumer来执行消费者所设定的回调方法,然后将响应放入channel的响应集合中,从而终止等待,响应返回完毕。
(4)工厂方法:
ConnectionFactory类:
用来创建一个Connecting。
具体代码实现:
@Data
public class ConnectionFactory {
private String host;
private int port;
//访问BrokerServer中的哪个虚拟主机
//这几个属性先不考虑
//private String virtualHostName;
//private String password;
//private String userName;
public Connection getNewConnection() throws IOException {
Connection connection = new Connection(host,port);
return connection;
}
}
这里没有涉及到用户和密码的校验,由于是单机系统,而RabbitMQ是需要进行用户和密码验证的。