RocketMQ-订阅一致及解决方案

背景

这里借用Rocketmq官方的一句话来描述订阅关系一致: 

订阅关系一致指的是同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

具体的问题和实例请看阿里云关于Rocketmq订阅关系一致的说明 ,里面写的非常详细,这边主要是讨论一下关于经典的会出现的一个订阅不一致问题。

当前问题

我司由于历史问题,java侧服务mq使用泛滥,每多一个topic订阅就伴随着新建一个group,导致维护成本越来越高,所以我们在2.0 sdk第一版即支持 【一个消费group消费多个topic】,也就是如下面这张图的预期:
在这里插入图片描述
看起来没有问题,RocketMQ官方也支持多topic的订阅逻辑,我们也是这么去推动大家升级的。但是随着对MQ的深入了解,逐渐发现一个很可怕的问题: 如果一个正在使用的group我希望去对它进行订阅关系的变更(添加/删除topic订阅),这个是绝对没有办法走灰度发布的!因为它会直接出现

RocketMQ领域经典的订阅不一致问题,详情见下图(模拟了一个使用中的group变更订阅关系时的灰度发布过程)
在这里插入图片描述
由图中可知,当前sdk虽然支持了一个group监听多个topic,但是这仅限于新业务,一个全新的group才可以在一开始用这种方式去升级,但却没有办法支持后续的订阅关系变更,看起来之前的sdk升级没什么用,可扩展性太差。如果消息的收发都是新业务还好一点,假如是订阅一个发送量非常大的现有topic,一发版就会喜提告警,严重的会存在消息丢失的风险,并且无法回放。

解决方案

其实问题的关键在于: 每个客户端虽然知道其他客户端的存在,但是并不知道大家的订阅关系,就导致了在实际平衡的时候产生【我觉得他应该去消费这些队列】的错觉,所以解决问题的关键就是我们只要让每个客户端都知道整个group集群中所有客户端的订阅关系就行了。参考之前发表的rocketmq灰度方案,可以利用ClientId的特性,将当前客户端的订阅关系加密追加在ID后面。

public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
 
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
 
        if (enableStreamRequestType) {
            sb.append("@");
            sb.append(RequestType.STREAM);
        }
        # 关键在于下面这几行代码
        MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());
            if (instance != null) {
                sb.append("#");
                sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));
            } else {
                sb.append("#[]");
            }
        return sb.toString();
    }

关于instance、group、topic的关系可以看下面这张图:

在这里插入图片描述

每个服务进程使用binder可以收发不同实例下的消息,因此在SDK中ClientId是以订阅的实例为维度创建的,在RocketMQ源码中是单例模式。

然后可以自己实现一个负载均衡策略:

/**
 * 消息队列分配策略增强--保证不出现订阅不一致的情况
 *
 * @author mobai
 * @since 2024/6/9 12:57 AM
 */
@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {
 
 
    /**
     * 保证订阅一致的分配算法
     * 如果有任意客户端sdk版本低于当前版本,则降为默认的平均分配算法
     * <p> 1.如果是重试topic,则使用平均分配策略(重试的topic走的是内部回传broker,写到哪一个队列是随机的)
     * <p> 2.通过clientId获取每个client的订阅信息,然后获取客户端中对应当前group的topic监听列表,判断当前需要平衡的topic是否在监听列表中,
     * 如果不在则认为订阅不一致,让所有订阅了当前topic的客户端去分配所有的队列
     * <p> 3.如果订阅一致,则使用平均分配策略
     * 同时提供了一个允许覆盖的分配方法,默认是平均分配。子类可以根据实际情况自行覆盖,该方法会传入当前的订阅是否不出现不一致
     *
     * @param consumerGroup 当前消费者组
     * @param currentCID    当前客户端id
     * @param mqAll         当前topic下的所有队列
     * @param cidAll        当前group的云端所有客户端实例
     * @return 分配结果
     */
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return Collections.emptyList();
        }
        if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
            return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
        }
        String topic = mqAll.get(0).getTopic();
        boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);
        if (isSomeClientVersionLower) {
            //避免当前这个增强sdk版本在灰度的时候,出现低版本客户端
            log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);
            return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
        }
        if (log.isDebugEnabled()) {log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());
        }
        Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);
        Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());
        allClientsSubInfo.forEach((k, v) -> {
            for (MessageConsumer messageConsumer : v) {
                if (messageConsumer.getActualGroup().equals(consumerGroup)) {
                    eachClientGroup.put(k, messageConsumer);
                    break;
                }
            }
        });
        List<String> validCids = new ArrayList<>(eachClientGroup.size());
        for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {
            List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();
            if (currentConsumerSubTopics.stream()
                    .anyMatch(listenTopic ->
                            listenTopic.getActualTopic().equals(topic)
                                    || listenTopic.getTopic().equals(topic)
                                    || listenTopic.getSourceTopic().equals(topic))) {
                validCids.add(consumerEntry.getKey());
            }
        }
        //如果存在订阅不一致的情况,则让所有订阅了当前topic的客户端去分配所有的队列,并且此逻辑不允许扩展,优先保证消息安全不丢失、不堆积
        if (validCids.size() != cidAll.size()) {
            List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);
            log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,
                    MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));
            return messageQueues;
        } else {
            return doAllocate(consumerGroup, currentCID, mqAll, cidAll);
        }
 
    }
 /**
     * 可扩展的分配算法,默认是平均分配
     *
     * @param consumerGroup 消费组
     * @param currentCID    当前消费者
     * @param mqAll         所有消息队列
     * @param cidAll        所有消费者
     * @param isSubBalance  是否订阅均衡
     * @return 分配结果
     */
    public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
    }
 
 
    /**
     * 平均分配算法
     *
     * @param consumerGroup 消费组
     * @param currentCID    当前消费者
     * @param mqAll         所有消息队列
     * @param cidAll        所有消费者
     * @return 消息队列
     */
    public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
    }
 
 
    @Override
    public String getName() {
        return "Enhance";
    }

策略继承于平均分配策略,大概的思路如下:

  1. 排除掉重试topic
  2. 通过clientId判断是否存在不同版本的SDK,这点也很重要,当这个增强的策略在发布时,因为线上的服务并没有该ClientId标识,所以此时退化成标准的平均分配是最安全的。
  3. 通过将所有客户端Id进行信息提取和解密,判断当前balance的topic有哪一些客户端在监听(当前group肯定会监听,不然这个方法链路进不来)
  4. 如果发现过滤出来的客户端个数和云上记录的所有客户端个数不同,即认定为订阅不一致,此时让有当前topic订阅关系的客户端分配所有队列,这个逻辑禁止覆盖
  5. 在保证订阅一致的前提下,提供了一个允许扩展的分配算法,默认使用平均分配(灰度消息就是通过继承此类,扩展该方法实现的保证一致性的前提下做的灰度)
  6. 那些没有订阅当前topic的客户端进程不会进到这个topic的平衡方法

升级了SDK之后,以下是对应的交互变更效果图(只讨论新增订阅关系的场景,删除订阅关系也是一个道理)

在这里插入图片描述

验证

接下来通过一个服务来验证此逻辑的可行性(包含了灰度消息逻辑),首先准备了一个订阅了一个topic的group,sdk版本是2.0.8(没有该增强逻辑)

已知:topic有64个队列,存在8个broker上,消费已做好幂等。
在这里插入图片描述

升级该服务的sdk版本到2.1.0(当前增强版本),订阅关系不变,发布灰度

在这里插入图片描述在这里插入图片描述

sdk判断当前客户端存在版本不一致,因此降级为默认平均分配算法,发送10条消息测试一下

在这里插入图片描述
消费正常。

升级该服务SDK到2.1.0,直接发布上线,无订阅关系变更

在这里插入图片描述
在这里插入图片描述
队列分配正确,再发送10条消息:

在这里插入图片描述
消费正常。

新增加一个topic的订阅关系,发布灰度(新topic48个队列,分布在6个brokder上)

控制台提示订阅不一致

在这里插入图片描述
灰度pod日志: 独自接管了新topic全部队列,旧topic获取到每个brokder最后一个分区

在这里插入图片描述
在这里插入图片描述
正常pod日志:不受影响,只和消费之前的topic(灰度pod消费每个broker最后一个分区),所以只分配到到56个队列

在这里插入图片描述
在这里插入图片描述
此时发送10条消息到新的topic上,结果消息全部被灰度也就是新加订阅关系的客户端全部消费

在这里插入图片描述
再发送10条老消息到旧topic上,9条在正常的pod,1条在灰度的pod,也符合灰度只负载1/10分区的策略

在这里插入图片描述
验证通过,灰度验证通过

在这里插入图片描述
订阅一致了

减少其中一个topic的订阅关系,再次发布灰度

控制台订阅不一致

在这里插入图片描述
灰度pod(减少订阅的客户端)日志:只参与旧topic的分配,且是灰度分区,其他无影响

在这里插入图片描述
在这里插入图片描述
正常pod(完整订阅关系)日志: 新topic提示不一致,进入增强逻辑,分配到全部的48个队列,旧topic分配正常

在这里插入图片描述
在这里插入图片描述
发送10条消息到被删除订阅关系的新topic: 全部被有订阅关系的正常客户端消费

在这里插入图片描述
发送10条消息到老的共有的老topic: 9比1的比例被俩客户端平均消费

在这里插入图片描述
验证通过。

结论

该方案被验证是安全可行的,但是在实际接入时需要注意:

  1. 不要在首次升级sdk时就变更订阅关系发灰度,这样的话还是会出现订阅不一致,无解,一个比较好的做法是先将SDK版本全部升级(允许灰度),等后续版本迭代再做订阅关系的变更,就可以正常发灰度验证。
  2. 生产环境永远不要使用公网接入点,除了安全问题之外,阿里云公网接入点架构模式是服务端负载,该策略会失效,而且原则上生产也不应该开放公网接入点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/776685.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BS结构的毕业设计题目管理系统-计算机毕业设计源码92342

目 录 摘要 1 绪论 1.1 研究背景 1.2目的及意义 1.3论文结构与章节安排 2 毕业设计题目管理系统设计分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 法律可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分…

3D打印推动透气钢革命

在科技日新月异的今天&#xff0c;3D打印技术如同一股强劲的潮流&#xff0c;正悄然改变着制造业。从简单的塑料玩具到复杂的工业部件&#xff0c;再到高精尖的医疗器械&#xff0c;3D打印技术凭借其独特的优势&#xff0c;不断拓宽着应用的边界。今天&#xff0c;我们一起深度…

Linux-DNS

DNS域名解析服务 1.DNS介绍 DNS 是域名系统 (Domain Name System) 的缩写&#xff0c;是因特网的一项核心服务&#xff0c;它作为可以将域名和IP地址相互映射的一个分布式数据库&#xff0c;能够使人更方便的访问互联网&#xff0c;而不用去记住能够被机器直接读取的IP数串。…

MySQL/SqlServer 跨服务器 增删改查(CRUD) 的一种方法

前言&#xff1a;主要是利用SqlServer 的链接服务器功能 1.准备一台 SqlServer Server&#xff0c;服务如下图&#xff1a; 这台服务器专门用于 链接服务器 IP&#xff1a;10.x.x.3 和数据源服务器&#xff08;10.x.x.5&#xff09; 在一个局域网 1.1 版本 是 2017 2.在 10.…

算法体系-26 第二十六节:第26节:单调栈结构 (5节)

一 单调栈知识讲解 1.1描述 一个数组里面想的到每个位置与他最近的左边和右边比他小的最近的信息 1.2 分析 通过单调栈的特点&#xff0c;for遍历数组中的每个数&#xff0c;当前数来的时候对比单调栈中的数进行每个数的左右判断完满足条件的进行更新到当前i种的 int[][] re…

MySQL索引教程(01):创建索引

文章目录 MySQL 创建索引索引介绍MySQL CREATE INDEX 语法MySQL 索引类型MySQL CREATE INDEX 实例结论 MySQL 创建索引 对于一个具有大量数据行的表&#xff0c;如果你根据某个查询条件检索数据时很慢&#xff0c;可能是因为你没有在检索条件相关的列上创建索引。 索引类似于…

平价猫粮新选择!福派斯鲜肉猫粮,让猫咪享受美味大餐!

福派斯鲜肉猫粮&#xff0c;作为一款备受铲屎官们青睐的猫粮品牌&#xff0c;凭借其卓越的品质和高性价比&#xff0c;为众多猫主带来了健康与美味的双重享受。接下来&#xff0c;我们将从多个维度对这款猫粮进行解析&#xff0c;让各位铲屎官更加全面地了解它的魅力所在。 1️…

查看电脑显卡(NVIDIA)应该匹配什么版本的CUDA Toolkit

被串行计算逼到要吐时&#xff0c;决定重拾CUDa了&#xff0c;想想那光速般的处理感觉&#xff08;夸张了&#xff09;不要太爽&#xff0c;记下我的闯关记录。正好我的电脑配了NVIDIA独显&#xff0c;GTX1650&#xff0c;有菜可以炒呀&#xff0c;没有英伟达的要绕道了。回到正…

详细分析SQL语句中的硬解析、软解析、软软解析基本知识

目录 前言1. 基本知识2. Demo 前言 从实战中探索 图为全局搜索且在高并发下&#xff0c;会引发硬解析&#xff0c;导致CPU崩溃 1. 基本知识 解析 (parsing) 是数据库在处理 SQL 语句时必不可少的一步&#xff0c;它将 SQL 语句转换为数据库可以执行的低级指令 硬解析 (Hard…

昇思25天学习打卡营第18天|Pix2Pix实现图像转换

Pix2Pix概述 Pix2Pix是基于条件生成对抗网络实现的一种深度学习图像转换模型。Pix2Pix是将cGAN应用于有监督的图像到图像翻译&#xff0c;包括生成器和判别器。 基础原理 cGAN的生成器是将输入图片作为指导信息&#xff0c;由输入图像不断尝试生成用于迷惑判别器的“假”图像…

c++ 附赠课程的知识点记录

&#xff08;1&#xff09; 静态变量的赋值 再一个例子&#xff1a; &#xff08;2&#xff09; 一般在定义类的赋值运算符函数时&#xff0c; operator ( const A& a ) 函数&#xff0c;应避免自赋值的情况&#xff0c;就是把对象 a 又赋值给 对象a 如同 a a 这样的情况…

类和对象深入理解

目录 static成员概念静态成员变量面试题补充代码1代码2代码3如何访问private中的成员变量 静态成员函数静态成员函数没有this指针 特性 友元友元函数友元类 内部类特性1特性2 匿名对象拷贝对象时的一些编译器优化 感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接…

C++ | Leetcode C++题解之第217题存在重复元素

题目&#xff1a; 题解&#xff1a; class Solution { public:bool containsDuplicate(vector<int>& nums) {unordered_set<int> s;for (int x: nums) {if (s.find(x) ! s.end()) {return true;}s.insert(x);}return false;} };

【PB案例学习笔记】-27制作一个控制任务栏显示与隐藏的小程序

写在前面 这是PB案例学习笔记系列文章的第27篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

视频参考帧和重构帧复用

1、 视频编码中的参考帧和重构帧 从下图的编码框架可以看出&#xff0c;每编码一帧需要先使用当前帧CU(n)减去当前帧的参考帧CU&#xff08;n&#xff09;得到残差。同时&#xff0c;需要将当前帧的重构帧CU*&#xff08;n&#xff09;输出&#xff0c;然后再读取重构帧进行预测…

Pandas数据可视化详解:大案例解析(第27天)

系列文章目录 Pandas数据可视化解决不显示中文和负号问题matplotlib数据可视化seaborn数据可视化pyecharts数据可视化优衣库数据分析案例 文章目录 系列文章目录前言1. Pandas数据可视化1.1 案例解析&#xff1a;代码实现 2. 解决不显示中文和负号问题3. matplotlib数据可视化…

HTTP代理服务器:深度解析与应用

“随着互联网的飞速发展&#xff0c;HTTP代理服务器在网络通信中扮演着越来越重要的角色。它们作为客户端和服务器之间的中介&#xff0c;不仅优化了网络性能&#xff0c;还提供了强大的安全性和隐私保护功能。” 一、HTTP代理服务器的概念与作用 HTTP代理服务器是一种能够接…

Qt扫盲-QRect矩形描述类

QRect矩形描述总结 一、概述二、常用函数1. 移动类2. 属性函数3. 判断4. 比较计算 三、渲染三、坐标 一、概述 QRect类使用整数精度在平面中定义一个矩形。在绘图的时候经常使用&#xff0c;作为一个二维的参数描述类。 一个矩形主要有两个重要属性&#xff0c;一个是坐标&am…

前端面试题16(跨域问题)

跨域问题源于浏览器的同源策略&#xff08;Same-origin policy&#xff09;&#xff0c;这一策略限制了来自不同源的“写”操作&#xff08;比如更新、删除数据等&#xff09;&#xff0c;同时也限制了读操作。当一个网页尝试请求与自身来源不同的资源时&#xff0c;浏览器会阻…

设计模式探索:代理模式

1. 什么是代理模式 定义 代理模式是一种结构型设计模式&#xff0c;通过为其他对象提供一种代理以控制对这个对象的访问。代理对象在客户端和实际对象之间起到中介作用&#xff0c;可以在不改变真实对象的情况下增强或控制对真实对象的访问。 目的 代理模式的主要目的是隐…