目录
一、虚拟主机 + Consume设计
1.1、承接问题
1.2、具体实现
1.2.1、消费者订阅消息实现思路
1.2.2、消费者描述自己执行任务方式实现思路
1.2.3、消息推送给消费者实现思路
1.2.4、消息确认
一、虚拟主机 + Consume设计
1.1、承接问题
前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可以通过 转换机 根据对应的转发规则发送给对应的 队列 了.
那么接下来要解决的问题就是,消费者该如何订阅消息(队列),如何把消息推送给消费者,以及消费者如何描述自己怎么执行任务~
1.2、具体实现
1.2.1、消费者订阅消息实现思路
消费者是以队列为维度订阅消息的,并且一个队列可以被多个消费者订阅,那么一旦队列中有消息,这个消息到底因该给谁呢?此处就约定,消费者之间按照 “轮询” 的方式来进行消费.
这里我们就需要定义一个类(ConsumerEnv),用来描述一个消费者,如下
public class ConsumerEnv {
private String consumerTag;
private String queueName;
private boolean autoAck;
//通过这个回调来处理收到的消息
private Consumer consumer;
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
public String getConsumerTag() {
return consumerTag;
}
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public boolean isAutoAck() {
return autoAck;
}
public void setAutoAck(boolean autoAck) {
this.autoAck = autoAck;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}
再给每个队列对象(MSGQueue 对象)添加一个属性 List,用来包含若干个上述消费者(有哪些消费者订阅了当前队列),如下图:
//当前队列都有哪些消费者订阅了
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//记录当取到了第几个消费者(AtomicInteger 是线程安全的)
private AtomicInteger consumerSeq = new AtomicInteger(0);
/**
* 添加一个新的订阅者
* @param consumerEnv
*/
public void addConsumerEnv(ConsumerEnv consumerEnv) {
consumerEnvList.add(consumerEnv);
}
/**
* 删除订阅者暂时先不考虑
*/
/**
* 挑选一个订阅者,来处理当前的消息(按照轮询的方式)
* @return
*/
public ConsumerEnv chooseConsumer() {
if(consumerEnvList.size() == 0) {
//该队列暂时没有人订阅
return null;
}
//计算当前要取的下标
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();// 自增
return consumerEnvList.get(index);
}
VirtualHost 中订阅消息实现
/**
* 订阅消息
* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
* @param consumerTag 消费者的身份标识
* @param queueName
* @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答
* @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了
* @return
*/
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把 Consumer 对象添加到队列中
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
return true;
} catch (Exception e) {
e.printStackTrace();
System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
return false;
}
}
1.2.2、消费者描述自己执行任务方式实现思路
当执行订阅消息的时候,我们就让消费者自己去实现处理消息的操作(消息的内容通过参数传递,具体要干啥,取决于消费者自己的业务路基),最后再让线程池来执行回调函数.
这里我们使用函数式接口(回调函数)的方式(lambda 表达式),让消费者在订阅消息的时候,就可以实现未来收到消息后如何去处理消息的操作.
@FunctionalInterface
public interface Consumer {
/**
* Delivery 的意思是 ”投递“,这个方法预期是在服务器收到消息之后来调用
* 通过这个方法,把消息推送给对应的消费者
* (注意,这里的方法名和参数,也都是参考 RabbitMQ 来展开的)
* @param consumerTag
* @param basicProperties
* @param body
*/
void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);
}
为什么要这样实现?
一方面,这种思路也是参考 RabbitMQ。
另一方面,这是由于Java 的函数是不能脱离类存在的,为了实现这种 lambda,java 曲线救国,引入 函数式接口.
对于函数式接口来说:
- 首先是 interface 类型
- 只能有一个方法
- 添加 @FunctionalInterface 注解.
实际上,这也是 lambda 的底层实现(本质)
1.2.3、消息推送给消费者实现思路
这里我们可以添加一个扫描线程,让他来去队列中拿任务.
为什么用了扫描线程还需要用线程池?
如果就一个扫描线程,既要获取消息,又要执行回调,这一个线程可能会忙不过来,因为消费者给出的回调,具体干什么的,咱们是不知道的.
扫描线程怎么知道哪个队列来了新的消息?
- 一个简单粗暴的办法,就是直接让扫描线程不停的循环遍历所有队列,发现有元素就立即处理。
- 另一个更优雅的办法(我采取的办法),就是用一个阻塞队列,队列中的元素就是接收消息的队列的名字,扫描线程只需要盯住这一个阻塞对垒即可,此时阻塞队列中传递的队列名,就相当于 “令牌”
每次拿到令牌,才能调动一次军队,也就是从对应的队列中取一个消息.
具体的,实现一个 ConsumerManager 类,用来管理消费者的上述行为.
public class ConsumerManager {
// 持有上层的 VirtualHost 对象的引用,用来操作数据
private VirtualHost parent;
// 指定一个线程池,负责取执行具体的回调任务
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//存放令牌的队列
private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//扫描线程
private Thread scannerThread = null;
/**
* 初始化
* @param parent
*/
public ConsumerManager(VirtualHost parent) {
this.parent = parent;
//创建扫描线程,取队列中消费消息
scannerThread = new Thread(() -> {
while(true) {
try {
//1.拿到令牌
String queueName = tokenQueue.take();
//2.根据令牌,找到队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null) {
throw new MqException("[ConsumerManager] 取到令牌后发现,该队列名不存在!queueName=" + queueName);
}
//3.从这个队列中消费一个消息
synchronized (queue) {
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
throw new RuntimeException(e);
}
}
});
//设置为后台线程
scannerThread.setDaemon(true);
scannerThread.start();
}
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
/**
* 添加消费者
* 找到对应队列的 List 列表, 把消费者添加进去,最后判断,如果有消息,就立刻消费
* @param consumerTag 消费者身份标识
* @param queueName
* @param autoAck 消息被消费之后,应答的方式,true 标识自动应答,false 标识手动应答
* @param consumer 是一个回调函数,此处设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子了
* @throws MqException
*/
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//找到对应的队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null) {
throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
//如果当前队列中已经有一些消息了,需要立即消费掉
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for(int i = 0; i < n; i++) {
//这个方法调用一次就消费一条消息
consumeMessage(queue);
}
}
}
/**
* 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费
* @param queue
*/
private void consumeMessage(MSGQueue queue) {
//1.按照轮询的方式,找个消费者出来
ConsumerEnv luckDog = queue.chooseConsumer();
if(luckDog == null) {
//当前队列中没有消费者,暂时不用消费,等后面有消费者了再说
return;
}
//2.从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if(message == null) {
//当前队列中还没有消息,也不需要消费
return;
}
//3.把消息带入到消费者的回调方法中,丢给线程池执行
workerPool.submit(() -> {
try {
//1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)
parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);
//2.真正执行回调操作
luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
//3.如果当前是 ”自动应答“ ,就可以直接把消息删除了
// 如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理
if(luckDog.isAutoAck()) {
//1) 删除硬盘上的消息
if(message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
//2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
//3) 删除内存上的消息中心的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
1.2.4、消息确认
消息确认,就是保证消息被正确消费~~
正确消费就是指消费者的回调方法顺利执行完了(没有抛异常之类的),这条消息的使命就完成了,此时就可以删除了。
为了达成消息不丢失这样的效果,具体步骤如下:
- 在真正执行回调之前,把消息放到 “待确认的集合” 中,避免应为回调失败,导致消息丢失.
- 执行回调
- 当去消费者采取的是 autoAck=true ,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了
- 硬盘
- 内存中的消息中心
- 待确认的消息集合
- 当前消费者若采取的是 autoAck=false,手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API 表示应答.
basicAck 完成主动应答
/**
* 确认消息
* 各个维度删除消息即可
* @param queueName
* @param messageId
* @return
*/
public boolean basicAck(String queueName, String messageId) {
queueName = virtualHostName + queueName;
try {
//1.获取消息和队列
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null) {
throw new MqException("[VirtualHost] 要确认的队列不存在!queueName=" + queueName);
}
Message message = memoryDataCenter.getMessage(messageId);
if(message == null) {
throw new MqException("[VirtualHost] 要确认的消息不存在!messageId=" + messageId);
}
//2.各个维度删除消息
if(message.getDeliverMode() == 2) {
diskDataCenter.deleteMessage(queue, message);
}
memoryDataCenter.removeMessage(messageId);
memoryDataCenter.removeMessageWaitAck(queueName, messageId);
System.out.println("[VirtualHost] basicAck 成功,消息确认成功!queueName=" + queueName +
", messageId=" + messageId);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicAck 失败,消息确认失败!queueName=" + queueName +
", messageId=" + messageId);
e.printStackTrace();
return false;
}
}
扫描线程完成自动应答
/**
* 扫描线程:找到对应的队列后,消费者从队列中拿出消息并消费
* @param queue
*/
private void consumeMessage(MSGQueue queue) {
//1.按照轮询的方式,找个消费者出来
ConsumerEnv luckDog = queue.chooseConsumer();
if(luckDog == null) {
//当前队列中没有消费者,暂时不用消费,等后面有消费者了再说
return;
}
//2.从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if(message == null) {
//当前队列中还没有消息,也不需要消费
return;
}
//3.把消息带入到消费者的回调方法中,丢给线程池执行
workerPool.submit(() -> {
try {
//1.把消息放到待确认的集合当中,这个操作一定要在执行回调之前(防止执行回调过程中出现异常,导致消息丢失)
parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(), message);
//2.真正执行回调操作
luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
//3.如果当前是 ”自动应答“ ,就可以直接把消息删除了
// 如果当前是 ”手动应答“ ,则先不处理,交给后续消费者调用 basicAck 方法来处理
if(luckDog.isAutoAck()) {
//1) 删除硬盘上的消息
if(message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
//2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
//3) 删除内存上的消息中心的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
如果在回调方法中抛异常了?
回调方法中抛异常了,后续逻辑执行不到,这个消息就会始终呆在待确认的集合中, RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 中不叫线程,人家是叫进程,但是注意,这个进程不是操作系统中的进程,而是 erlang 中的概念),负责关注这个 待确认集合中,每个消息待了多久了,如果超出了一定的时间范围,就会把这个消息放到一个特定的队列 —— “死信队列”(这里就不展示了,需要的可以私聊我)
如果在执行回调过程中,broker server 崩了,内存数据全没了?
此时硬盘的数据还在,broker server 重启之后,这个消息就又被加载回内存了,就像从来没有被消费过一样,消费者就又机会重新拿到这个消息,重新消费(重复消费的问题,是由消费者的业务代码负责保证的,broker server 管不了).