文章目录
- 虚拟主机设计
- 虚拟主机分析
- 交换机和虚拟主机之间的从属关系
- 核心 API
- 发布消息
- 订阅消息
- 应答消息
- 消费者管理类
虚拟主机设计
虚拟主机分析
类似于 MySQL 的 database,把交换机,队列,绑定,消息…进⾏逻辑上的隔离,⼀个服务器可以有多
个虚拟主机~,此处我们项⽬就设计了⼀个虚拟主机(VirtualHost)来提供 API 供上层调用
咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字. 即 客户要在虚拟机
VirtualHostA中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.
交换机和虚拟主机之间的从属关系
- ⽅案⼀:参考数据库设计,“⼀对多”⽅案,⽐如给交换机表,添加个属性,虚拟主机 id/name
- ⽅案⼆:交换机的名字 = 虚拟主机名字 + 交换机的真实名字(按照⽅案⼆,也可以去区分不同的队列,进⼀步由于,绑定和队列和交换机都相关,直接就隔离开了,再进⼀步,消息和队列是强相关的,队列名区分开,消息⾃然区分开。)
核心 API
发布消息
发布消息API:
- 其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.
- 绑定关系 Binding 中有一个 bindingKey 属性
- 消息 Message 中 有一个 routingKey 属性
下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.
-
直接交换机 DIRECT 转发规则
- 在直接交换机中,bindingKye是无意义的,routingKey是要转发到的队列的队列名.
- 直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.
-
扇出交换机 FANOUT 转发规则
- 在扇出交换机中,bindingKye是绑定的要转发的队列,routingKey是无意义的.
- 扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.
-
主题交换机 TOPIC 转发规则
- 在主题交换机中,
- bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
- routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).
- 主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.
匹配规则 - AMQP 协议
- routingKey规则
由数字,字母,下划线组成,使用 . 将routingKey分成多个部分. - bindingKey规则
由数字,字母,下划线组成,使用 . 将routingKey分成多个部分(支持两种特殊的符号作为通配符 * 与 # (和#必须是作为被 . 分割出来的单独部分如 aaa.bb就是非法的,* 可以匹配任何一个独立的部分,# 可以匹配0个或多个的独立部分)
相关代码实现
import com.example.demo.common.MqException;
// 使用这个类来实现交换机的转发规则
// 同时通过这个类来验证 bindingKey 是否合法
public class Router {
public boolean checkBindingKey(String bindingKey){
if (bindingKey.length() == 0) {
return true;
}
// 检查字符串中不能存在非法字符
for (int i = 0; i < bindingKey.length(); i++) {
char ch = bindingKey.charAt(i);
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
continue;
}
return false;
}
// 检查 * 或者 # 是否是独立的部分.
String[] words = bindingKey.split("\\.");
for (String word : words) {
// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
// 约定一下, 通配符之间的相邻关系(人为约定)
// 只有 aaa.*.*.bbb => 合法
for (int i = 0; i < words.length - 1; i++) {
// 连续两个 ##
if (words[i].equals("#") && words[i + 1].equals("#")) {
return false;
}
// # 连着 *
if (words[i].equals("#") && words[i + 1].equals("*")) {
return false;
}
// * 连着 #
if (words[i].equals("*") && words[i + 1].equals("#")) {
return false;
}
}
return true;
}
// 数字 + 字母 + 下划线
// 使用.分割若干部分
public boolean checkRoutingKey(String routingKey){
if(routingKey.length()==0){
return true;
}
for (int i = 0; i < routingKey.length(); i++) {
char ch = routingKey.charAt(i);
// 判断
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch == '_' || ch == '.') {
continue;
}
// 该字符, 不是上述任何一种合法情况, 就直接返回 false
return false;
}
return true;
}
// 判定该消息是否可以转发给这个绑定对应的队列
public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {
if(exchangeType == ExchangeType.FANOUT){
return true;
} else if (exchangeType == ExchangeType.TOPIC){
return routeTopic(binding,message);
} else {
throw new MqException("[Router] 交换机类型非法! exchange= " + exchangeType);
}
}
// 约定匹配规则
private boolean routeTopic(Binding binding, Message message) {
String[] bindingTokens = binding.getBindingKey().split("\\.");
String[] routingTokens = message.getRoutingKey().split("\\.");
// 引入两个下标
int bindingIndex = 0;
int routingIndex = 0;
while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
if (bindingTokens[bindingIndex].equals("*")) {
// * 可以匹配到任意部分
bindingIndex++;
routingIndex++;
continue;
} else if (bindingTokens[bindingIndex].equals("#")) {
// 如果遇到 #, 需要先看看有没有下一个位置.
bindingIndex++;
if (bindingIndex == bindingTokens.length) {
// # 匹配成功
return true;
}
// # 拿着后面的内容, 去 routingKey 中往后找, 找到对应的位置.
// findNextMatch 查找 返回该下标. 没找到, 就返回 -1
routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
if (routingIndex == -1) {
// 没找到匹配的结果. 匹配失败
return false;
}
// 找到的匹配的情况, 继续往后匹配.
bindingIndex++;
routingIndex++;
} else {
// 普通字符串, 要求两边的内容一致.
if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
return false;
}
bindingIndex++;
routingIndex++;
}
}
// 判定是否是双方同时到达末尾
if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
return true;
}
return false;
}
// # 查找
private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {
for (int i = routingIndex; i < routingTokens.length; i++) {
if (routingTokens[i].equals(bindingToken)) {
return i;
}
}
return -1;
}
}
订阅消息
-
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.
因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者 -
自动发送消息至订阅者
那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.
咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.
应答消息
应答消息共有两种模式.
自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)
消费者管理类
关于消费者,咱们并不打算持久化存储消费者的信息,即只在内存中存储消费者信息,如果服务器重启后,那么内存中的消费者信息也会清空,此时消费者就需要重新订阅消息.