背景
这里借用Rocketmq官方的一句话来描述订阅关系一致:
订阅关系一致指的是同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。
具体的问题和实例请看阿里云关于Rocketmq订阅关系一致的说明 ,里面写的非常详细,这边主要是讨论一下关于经典的会出现的一个订阅不一致问题。
当前问题
我司由于历史问题,java侧服务mq使用泛滥,每多一个topic订阅就伴随着新建一个group,导致维护成本越来越高,所以我们在2.0 sdk第一版即支持 【一个消费group消费多个topic】,也就是如下面这张图的预期:
看起来没有问题,RocketMQ官方也支持多topic的订阅逻辑,我们也是这么去推动大家升级的。但是随着对MQ的深入了解,逐渐发现一个很可怕的问题: 如果一个正在使用的group我希望去对它进行订阅关系的变更(添加/删除topic订阅),这个是绝对没有办法走灰度发布的!因为它会直接出现
RocketMQ领域经典的订阅不一致问题,详情见下图(模拟了一个使用中的group变更订阅关系时的灰度发布过程)
由图中可知,当前sdk虽然支持了一个group监听多个topic,但是这仅限于新业务,一个全新的group才可以在一开始用这种方式去升级,但却没有办法支持后续的订阅关系变更,看起来之前的sdk升级没什么用,可扩展性太差。如果消息的收发都是新业务还好一点,假如是订阅一个发送量非常大的现有topic,一发版就会喜提告警,严重的会存在消息丢失的风险,并且无法回放。
解决方案
其实问题的关键在于: 每个客户端虽然知道其他客户端的存在,但是并不知道大家的订阅关系,就导致了在实际平衡的时候产生【我觉得他应该去消费这些队列】的错觉,所以解决问题的关键就是我们只要让每个客户端都知道整个group集群中所有客户端的订阅关系就行了。参考之前发表的rocketmq灰度方案,可以利用ClientId的特性,将当前客户端的订阅关系加密追加在ID后面。
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
if (enableStreamRequestType) {
sb.append("@");
sb.append(RequestType.STREAM);
}
# 关键在于下面这几行代码
MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());
if (instance != null) {
sb.append("#");
sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));
} else {
sb.append("#[]");
}
return sb.toString();
}
关于instance、group、topic的关系可以看下面这张图:
每个服务进程使用binder可以收发不同实例下的消息,因此在SDK中ClientId是以订阅的实例为维度创建的,在RocketMQ源码中是单例模式。
然后可以自己实现一个负载均衡策略:
/**
* 消息队列分配策略增强--保证不出现订阅不一致的情况
*
* @author mobai
* @since 2024/6/9 12:57 AM
*/
@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {
/**
* 保证订阅一致的分配算法
* 如果有任意客户端sdk版本低于当前版本,则降为默认的平均分配算法
* <p> 1.如果是重试topic,则使用平均分配策略(重试的topic走的是内部回传broker,写到哪一个队列是随机的)
* <p> 2.通过clientId获取每个client的订阅信息,然后获取客户端中对应当前group的topic监听列表,判断当前需要平衡的topic是否在监听列表中,
* 如果不在则认为订阅不一致,让所有订阅了当前topic的客户端去分配所有的队列
* <p> 3.如果订阅一致,则使用平均分配策略
* 同时提供了一个允许覆盖的分配方法,默认是平均分配。子类可以根据实际情况自行覆盖,该方法会传入当前的订阅是否不出现不一致
*
* @param consumerGroup 当前消费者组
* @param currentCID 当前客户端id
* @param mqAll 当前topic下的所有队列
* @param cidAll 当前group的云端所有客户端实例
* @return 分配结果
*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return Collections.emptyList();
}
if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
String topic = mqAll.get(0).getTopic();
boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);
if (isSomeClientVersionLower) {
//避免当前这个增强sdk版本在灰度的时候,出现低版本客户端
log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
if (log.isDebugEnabled()) {log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());
}
Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);
Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());
allClientsSubInfo.forEach((k, v) -> {
for (MessageConsumer messageConsumer : v) {
if (messageConsumer.getActualGroup().equals(consumerGroup)) {
eachClientGroup.put(k, messageConsumer);
break;
}
}
});
List<String> validCids = new ArrayList<>(eachClientGroup.size());
for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {
List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();
if (currentConsumerSubTopics.stream()
.anyMatch(listenTopic ->
listenTopic.getActualTopic().equals(topic)
|| listenTopic.getTopic().equals(topic)
|| listenTopic.getSourceTopic().equals(topic))) {
validCids.add(consumerEntry.getKey());
}
}
//如果存在订阅不一致的情况,则让所有订阅了当前topic的客户端去分配所有的队列,并且此逻辑不允许扩展,优先保证消息安全不丢失、不堆积
if (validCids.size() != cidAll.size()) {
List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);
log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,
MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));
return messageQueues;
} else {
return doAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
}
/**
* 可扩展的分配算法,默认是平均分配
*
* @param consumerGroup 消费组
* @param currentCID 当前消费者
* @param mqAll 所有消息队列
* @param cidAll 所有消费者
* @param isSubBalance 是否订阅均衡
* @return 分配结果
*/
public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
/**
* 平均分配算法
*
* @param consumerGroup 消费组
* @param currentCID 当前消费者
* @param mqAll 所有消息队列
* @param cidAll 所有消费者
* @return 消息队列
*/
public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
@Override
public String getName() {
return "Enhance";
}
策略继承于平均分配策略,大概的思路如下:
- 排除掉重试topic
- 通过clientId判断是否存在不同版本的SDK,这点也很重要,当这个增强的策略在发布时,因为线上的服务并没有该ClientId标识,所以此时退化成标准的平均分配是最安全的。
- 通过将所有客户端Id进行信息提取和解密,判断当前balance的topic有哪一些客户端在监听(当前group肯定会监听,不然这个方法链路进不来)
- 如果发现过滤出来的客户端个数和云上记录的所有客户端个数不同,即认定为订阅不一致,此时让有当前topic订阅关系的客户端分配所有队列,这个逻辑禁止覆盖
- 在保证订阅一致的前提下,提供了一个允许扩展的分配算法,默认使用平均分配(灰度消息就是通过继承此类,扩展该方法实现的保证一致性的前提下做的灰度)
- 那些没有订阅当前topic的客户端进程不会进到这个topic的平衡方法
升级了SDK之后,以下是对应的交互变更效果图(只讨论新增订阅关系的场景,删除订阅关系也是一个道理)
验证
接下来通过一个服务来验证此逻辑的可行性(包含了灰度消息逻辑),首先准备了一个订阅了一个topic的group,sdk版本是2.0.8(没有该增强逻辑)
已知:topic有64个队列,存在8个broker上,消费已做好幂等。
升级该服务的sdk版本到2.1.0(当前增强版本),订阅关系不变,发布灰度
sdk判断当前客户端存在版本不一致,因此降级为默认平均分配算法,发送10条消息测试一下
消费正常。
升级该服务SDK到2.1.0,直接发布上线,无订阅关系变更
队列分配正确,再发送10条消息:
消费正常。
新增加一个topic的订阅关系,发布灰度(新topic48个队列,分布在6个brokder上)
控制台提示订阅不一致
灰度pod日志: 独自接管了新topic全部队列,旧topic获取到每个brokder最后一个分区
正常pod日志:不受影响,只和消费之前的topic(灰度pod消费每个broker最后一个分区),所以只分配到到56个队列
此时发送10条消息到新的topic上,结果消息全部被灰度也就是新加订阅关系的客户端全部消费
再发送10条老消息到旧topic上,9条在正常的pod,1条在灰度的pod,也符合灰度只负载1/10分区的策略
验证通过,灰度验证通过
订阅一致了
减少其中一个topic的订阅关系,再次发布灰度
控制台订阅不一致
灰度pod(减少订阅的客户端)日志:只参与旧topic的分配,且是灰度分区,其他无影响
正常pod(完整订阅关系)日志: 新topic提示不一致,进入增强逻辑,分配到全部的48个队列,旧topic分配正常
发送10条消息到被删除订阅关系的新topic: 全部被有订阅关系的正常客户端消费
发送10条消息到老的共有的老topic: 9比1的比例被俩客户端平均消费
验证通过。
结论
该方案被验证是安全可行的,但是在实际接入时需要注意:
- 不要在首次升级sdk时就变更订阅关系发灰度,这样的话还是会出现订阅不一致,无解,一个比较好的做法是先将SDK版本全部升级(允许灰度),等后续版本迭代再做订阅关系的变更,就可以正常发灰度验证。
- 生产环境永远不要使用公网接入点,除了安全问题之外,阿里云公网接入点架构模式是服务端负载,该策略会失效,而且原则上生产也不应该开放公网接入点。