目录
引言
函数式接口
消费者订阅消息 实现思路
关于消息确认
引言
函数式接口
- Lambda 表达式的本质是匿名函数
- Java 函数无法脱离类而存在,所以 Java 通过引入函数式接口以支持 Lambda 表达式
特性:
- 函数式接口为一个 interface 类
- 该类中有且仅有一个方法
- 该类需加上 @FunctionalInterface 注解
注意:
- 上述三点其实就是 Lambda 的本质,即底层实现
消费者订阅消息 实现思路
1、让 broker server 把有哪些消费者管理好
- 消费者调用 basicConsume 方法就是订阅某个指定队列的消息
注意:
- 消费者是以队列为纬度订阅的
- 一个队列可以有多个消费者
- 约定 消费者之间按照 轮询 的方式进行消费
代码编写:
- 定义一个 ConsumerEnv 类,用来描述一个消费者
- 该类中也会包含一些消费者消费过程中用到的数据
import lombok.Data; /* * 表示一个消费者(完整的执行环境) * */ @Data public class ConsumerEnv { private String consumerTag; private String queueName; private boolean autoAck; // 通过这个回调来处理收到的消息 private Consumer consumer; public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { this.consumerTag = consumerTag; this.queueName = queueName; this.autoAck = autoAck; this.consumer = consumer; } }
- 给每个队列对象(MSGQueue 对象)添加属性 List,用于存储该队列的 消费者对象
// 当前队列都有哪些消费者订阅了 private List<ConsumerEnv> consumerEnvList = new ArrayList<>(); // 记录当前取到了第几个消费者,方便实现轮询策略 private AtomicInteger consumerSeq = new AtomicInteger(0); // 添加一个新的订阅者 public void addConsumerEnv(ConsumerEnv consumerEnv) { consumerEnvList.add(consumerEnv); } // 订阅者的删除暂时不考虑 // 挑选一个订阅者,用来处理当前的消息 (按照轮询的方式) public ConsumerEnv chooseConsumer() { if(consumerEnvList.size() == 0) { // 该队列没有人订阅 return null; } // 计算一下当前要取的元素的下标 int index = consumerSeq.get() % consumerEnvList.size(); consumerSeq.getAndDecrement(); return consumerEnvList.get(index); }
2、消费者 订阅队列消息,并使用该消息完成明确好的业务逻辑
- 所谓 消费者 消费消息,其实就是让线程池 执行对应消费者中的回调函数
- 通过回调函数,将消息的内容通过参数传递
- 回调函数中的内容由消费者编写,具体里面要干啥,取决于消费者自己的业务逻辑
代码编写:
- 此处我们使用 函数式接口 的方式,让消费者在订阅消息时,明确使用该消息进行的业务逻辑是什么
import com.example.demo.mqserver.core.BasicProperties; /* * 只是一个单纯的函数式接口(回调函数),收到消息之后要处理消息时调用的方法 * */ @FunctionalInterface public interface Consumer { // Delivery 的意思是 "投递",这个方法预期是在每次服务器收到消息之后,来调用 // 通过这个方法把消息推送给对应的消费者 // (注意!!这里的方法名和参数,也都是参考 RabbitMQ 展开的) void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body); }
- 在 VirtualHost 中实现消费者订阅某个队列的消息
// 订阅消息 // 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者 // consumerTag:表示消费者的身份标识 // autoAck:消息被消费完成后,应答的方式,为 true 自动应答,为 false 手动应答 // consumer:是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子 public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { // 构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 Consumer 对象添加到该队列中 queueName = virtualHostName + queueName; try { consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer); System.out.println("[VirtualHost] basicConsume 成功! queueName = " + queueName); return true; }catch (Exception e) { System.out.println("[VirtualHost] basicConsume 失败! queueName = " + queueName); e.printStackTrace(); return false; } }
3、队列收到消息,并将消息推送给订阅该队列的消费者
- 为了能够让 线程池 知道要执行哪个回调函数及其参数中的 消息 来自于哪个队列
- 我们定义一个单独的扫描线程,用于感知哪个队列收到了新消息
问题一:
- 为啥搞了扫描线程,还要再搞个线程池呢?
- 既让该扫描线程获取 消息和 消费者的回调函数,又让其执行回调函数不就行了?
回答:
- 由于消费者编写的回调函数,具体是干啥的,我们并不知道
- 如果是比较耗时的业务逻辑的话,此时仅由一个线程来完成上述这些操作,就可能周转不开了!
问题二:
- 当前有多个队列,但扫描线程就一个,扫描线程如何知道是哪个队列中 来了新消息呢?
方案一:
- 直接让扫描线程不停的循环遍历所有对列,如果发现有新的元素就立即处理
方案二:
- 引入一个阻塞队列,哪个队列新增了一个消息,就将哪个队列的名字放入 阻塞队列中
- 此时 扫描线程 仅需要盯住这阻塞队列即可
- 阻塞队列中队列名相当于 "令牌",扫描线程从阻塞队列中取队列名,进而再根据队列名,从对应的队列中取一个消息
回答:
- 此处我们采用方案二!
代码编写:
- 此处我们实现一个 ConsumerManager 类,用于实现消费消息的核心逻辑
import com.example.demo.common.Consumer; import com.example.demo.common.ConsumerEnv; import com.example.demo.common.MqException; import com.example.demo.mqserver.VirtualHost; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /* * 通过这个类来实现消费消息的核心逻辑 * */ public class ConsumerManager { // 持有上层的 VirtualHost 对象的引用,用来操作数据 private VirtualHost parent; // 指定一个线程池,负责执行具体的回调任务 private ExecutorService workerPool = Executors.newFixedThreadPool(4); // 存放令牌的阻塞队列 private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>(); // 扫描线程 private Thread scannerThread = null; public ConsumerManager(VirtualHost p) { this.parent = p; scannerThread = new Thread(() -> { while (true) { try { // 1、拿到令牌 String queueName = tokenQueue.take(); // 2、根据令牌找到队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if(queue == null) { throw new MqException("[ConsumerManager] 取令牌后发现,该队列名不存在!queueName = " + queueName); } // 3、从队列中消费消费一个消息 synchronized (queue) { consumeMessage(queue); } } catch (InterruptedException | MqException e) { e.printStackTrace(); } } }); // 把线程设为后台线程 scannerThread.setDaemon(true); scannerThread.start(); } // 这个方法的调用时机就是发送消息的时候 public void notifyConsume(String queueName) throws InterruptedException { tokenQueue.put(queueName); } public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException { // 找到对应的队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if(queue == null) { throw new MqException("[ConsumerManager] 队列不存在! queueName = " + queueName); } ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer); synchronized (queue) { queue.addConsumerEnv(consumerEnv); // 如果当前队列中已经有一些消息了,需要立即就消费掉 int n = parent.getMemoryDataCenter().getMessageCount(queueName); for (int i = 0;i < n;i++) { // 这个方法调用一次就消费一条消息 consumeMessage(queue); } } } private void consumeMessage(MSGQueue queue) { // 1、按照轮询的方式,找个消费者出来 ConsumerEnv luckyDog = queue.chooseConsumer(); if(luckyDog == null) { // 当前队列没有消费者,暂时不消费,等后面有消费者出现再说 return; } // 2、从队列中取出一个消息 Message message = parent.getMemoryDataCenter().pollMessage(queue.getName()); if(message == null) { // 当前队列中还没有消息,也不需要消费 return; } // 3、把消息带入到消费者的回调方法中,丢给线程池执行 workerPool.submit(() -> { try { // 1) 把消息放入待确认的集合中,这个操作必须要在执行回调之前 parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message); // 2) 真正执行回调函数 luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody()); // 3) 如果当前是 "自动应答",就可以直接把消息删除了 // 如果当前是 "手动应答",则先不处理,交给后续消费者调用 basicAck 方法来处理 if(luckyDog.isAutoAck()) { // a.删除硬盘上的消息 if(message.getDeliverMode() == 2) { parent.getDiskDataCenter().deleteMessage(queue,message); } // b.删除上面的待确认集合中的消息 parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId()); // c.删除内存中消息中心里的消息 parent.getMemoryDataCenter().removeMessage(message.getMessageId()); System.out.println("[ConsumerManager] 消息被成功消费 queueName = " + queue.getName()); } }catch (Exception e) { e.printStackTrace(); } }); } }
关于消息确认
- 为了能够确保消息是被正确的消费掉了,我们需要引入 消息确认 机制
- 即消费者的回调方法 顺利执行完(未抛异常啥的),那么这条消息的历史使命就算完成了,该消息也就可以被删除了
- 消息确认机制 也就是为了保证 消息不丢失
具体思路
- 在真正执行回调之前,先将该消息放到 "待确认集合" 中,避免因为回调失败,导致消息的丢失
- 真正执行回调
- 当前消费者采取的是 autoAck=true 自动应答,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了(硬盘、消息中心、待确认集合)
- 当前消费者采取的是 autoAck=false 手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API
basicAck 代码编写:
public boolean basicAck(String queueName,String messageId) { queueName = virtualHostName + queueName; try { // 1、获取到消息和队列 Message message = memoryDataCenter.getMessage(messageId); if(message == null) { throw new MqException("[VirtualHost] 要确认的消息不存在!messageId = " + messageId); } MSGQueue queue = memoryDataCenter.getQueue(queueName); if(queue == null) { throw new MqException("[VirtualHost] 要确认的消息不存在!queueName = " + queueName); } // 2、删除硬盘上的数据 if(message.getDeliverMode() == 2) { diskDataCenter.deleteMessage(queue,message); } // 3、删除消息中心的数据 memoryDataCenter.removeMessage(messageId); // 4、删除待确认的集合中的消息 memoryDataCenter.removeMessageWaitAck(queueName,messageId); System.out.println("[VirtualHost] basicAck 成功!消息被确认成功!queueName = " + queueName + ", messageId = " + messageId); return true; }catch (Exception e) { System.out.println("[VirtualHost] basicAck 失败!消息确认失败!queueName = " + queueName + ", messageId = " + messageId); e.printStackTrace(); return false; } }
问题一:
- 执行回调方法的过程中抛异常了会产生什么影响?
回答:
- 当回调方法抛异常,后续逻辑便会执行不到,此时该消息就会始终在 待确认的集合中
- RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 里面不叫线程,人家是叫线程,但是这个进程不是操作系统的进程,而是 erlang 中的概念)
- 由该线程负责关注 待确认集合中,每个待确认的消息呆多久了,如果呆的时间超出了范围就会把这个消息放到一个特定的队列 "死信队列"
- 当然,死信对列 也是程序员手动配置的,但此处我们并未实现 死信队列逻辑
问题二:
- 执行回调过程中,broker server 崩溃了,其中的内存数据全没了,此时有什么影响?
回答:
- 此时硬盘数据还是在的!
- 正在消费的这个消息,在硬盘中仍然存在
- broker server 重启之后,这个消息就又被加载回内存了,就像从来没有消费过一样
- 消费者就有机会重新消费到这个消息
- 当然重复消费的问题,应该由消费者的业务代码负责保证,broker server 管不了!