RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    // Map<消费组名称,订阅组配置>
    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
        new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
    // 内存数据版本号
    private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {
    // 消费组名称
    private String groupName;
    // 是否开启消费
    private boolean consumeEnable = true;
    // 是否允许消费最早消息
    private boolean consumeFromMinEnable = true;
    // 是否允许广播消费
    private boolean consumeBroadcastEnable = true;
    // 重试队列数
    private int retryQueueNums = 1;
    // 重试最大次数
    private int retryMaxTimes = 16;
    // brokerId
    private long brokerId = MixAll.MASTER_ID;
    // 当产生慢消费时,选择第几个broker
    private long whichBrokerWhenConsumeSlowly = 1;
    // 是否通知消费者ids变化
    private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {
    
    public SubscriptionGroupManager() {
        this.init();
    }

    public SubscriptionGroupManager(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.init();
    }

    private void init() {
        {
            // 初始化系统消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化过滤服务消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化自测消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化http代理消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_PULL消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_PERMISSION消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
        }

        {
            // 初始化ONS_API_OWNER消费组
            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
        }
    }

    // 更新订阅配置,且更新内存数据版本号
    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
        if (old != null) {
            log.info("update subscription group config, old: {} new: {}", old, config);
        } else {
            log.info("create new subscription group, {}", config);
        }

        this.dataVersion.nextVersion();

        this.persist();
    }

    // 失效消费组
    public void disableConsume(final String groupName) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
        if (old != null) {
            old.setConsumeEnable(false);
            this.dataVersion.nextVersion();
        }
    }

    // 查找指定消费组的订阅配置
    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
        if (null == subscriptionGroupConfig) {
            if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
                subscriptionGroupConfig = new SubscriptionGroupConfig();
                subscriptionGroupConfig.setGroupName(group);
                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
                if (null == preConfig) {
                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
                }
                this.dataVersion.nextVersion();
                this.persist();
            }
        }

        return subscriptionGroupConfig;
    }

    // 将内存数据结构编码成字符串
    @Override
    public String encode() {
        return this.encode(false);
    }

    // 获取配置文件路径
    @Override
    public String configFilePath() {
        return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig()
            .getStorePathRootDir());
    }

    // 从字符串中恢复数据,写回内存数据结构
    @Override
    public void decode(String jsonString) {
        if (jsonString != null) {
            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
            if (obj != null) {
                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
                this.dataVersion.assignNewOne(obj.dataVersion);
                this.printLoadDataWhenFirstBoot(obj);
            }
        }
    }

    // 将内存数据结构编码成字符串
    public String encode(final boolean prettyFormat) {
        return RemotingSerializable.toJson(this, prettyFormat);
    }

    // 当第一次启动时,打印加载数据时的日志
    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, SubscriptionGroupConfig> next = it.next();
            log.info("load exist subscription group, {}", next.getValue().toString());
        }
    }

    public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
        return subscriptionGroupTable;
    }

    public DataVersion getDataVersion() {
        return dataVersion;
    }

    // 删除指定消费组的订阅配置
    public void deleteSubscriptionGroupConfig(final String groupName) {
        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
        if (old != null) {
            log.info("delete subscription group OK, subscription group:{}", old);
            this.dataVersion.nextVersion();
            this.persist();
        } else {
            log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/248186.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CMS—评论功能设计

一、需求分析 1.1、常见行为 1.敏感词过滤 2.新增评论&#xff08;作品下、评论下&#xff09; 3.删除评论&#xff08;作品作者、上级评论者、本级作者&#xff09; 4.上级评论删除关联下级评论 5.逻辑状态变更&#xff08;上线、下线、废弃...&#xff09; 6.上逻辑状态变更…

Java报错-Non-terminating decimal expansion; no exact representable decimal result

1. 背景 在使用 BigDecimal 的 divide() 对两个数相除时&#xff0c;报了如题的错误。 public class Test {public static void main(String[] args) {BigDecimal b1 new BigDecimal(1);BigDecimal b2 new BigDecimal(3);System.out.println(b1.divide(b2)); // Sys…

jupyter notebook介绍、安装和使用

简介 Jupyter Notebook是基于网页的用于交互计算的应用程序。其可被应用于全过程计算&#xff1a;开发、文档编写、运行代码和展示结果。——Jupyter Notebook官方介绍 简而言之&#xff0c;Jupyter Notebook是以网页的形式打开&#xff0c;可以在网页页面中直接编写代码和运…

Shell三剑客:文本过滤工具——grep

一、简介&#xff1a;过滤&#xff0c;查找文档中的内容 二、分类 grepegrep——扩展支持正则\w所有字母与数字&#xff0c;称为字符[a-zA-Z0-9] l[a-zA-Z0-9]*ve l\w*ve\W所有字母与数字之外的字符&#xff0c;称为非字符 love[^a-zA-Z0-9] love\W\b词边界 \<love\>…

力扣每日一题day32[104. 二叉树的最大深度]

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;root [1,null,2] 输出…

winform中也可以这样做数据展示✨

1、前言 在做winform开发的过程中&#xff0c;经常需要做数据展示的功能&#xff0c;之前一直使用的是gridcontrol控件&#xff0c;今天想通过一个示例&#xff0c;跟大家介绍一下如何在winform blazor hybrid中使用ant design blazor中的table组件做数据展示。 2、效果 先来…

MYSQL练题笔记-子查询-换座位

一、题目相关内容 1&#xff09;相关的表和题目 2&#xff09;帮助理解题目的示例&#xff0c;提供返回结果的格式 二、自己初步的理解 没啥思路&#xff0c;我还没做过交换的这种题&#xff0c;所以我觉得这类交换的题以后值得做一个合集&#xff0c;是有点灵活度在里面的&a…

computed 和 watch 的奇妙世界:让数据驱动你的 Vue 应用(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

海思平台isp之ccm标定

文章目录 1、raw图采集2、ccm标定2.1、标定参数配置2.2、标定效果优化2.2.1、优化方式一2.2.2、优化方式二2.2.3、优化方式三1、raw图采集 raw图采集步骤及标准,请参考文章 《海思平台isp之ccm标定》。2、ccm标定 2.1、标定参数配置 (1)图像基本参数 (2)黑电平设置 (…

VR播控系统深耕VR教学领域,助力开启未来新课堂

作为提升教育质量的技术之一&#xff0c;VR技术已经逐渐成为培养新一代人才、提升教学质量的重要方式&#xff0c;相比于传统教育&#xff0c;VR技术在教学方面的应用&#xff0c;所带来的变化和效果提升都是非常明显的&#xff0c;尤其是VR播控系统的上线&#xff0c;作为VR教…

CDH6.3.2安装

文章目录 [toc]一、CM简介1、ClouderaManager的概念2、ClouderaManager的功能3、ClouderaManager的架构 二、准备清单1、部署步骤2、集群规划3、软件环境准备 三、安装清单1、操作系统iso包2、JDK包3、MySQL包4、CM和CDH包5、部署ansible 四、基础环境准备1、配置网络2、配置ho…

Doris学习笔记

目录 简介 特点 MPP数据库 PB和EB都是用来衡量数据存储量的单位。 秒级响应 Google Mesa Apache Impala 支持标准sql且兼容mysql协议 ROLAP OLAP&#xff08;On-Line Analytical Processing&#xff0c;联机分析处理&#xff09; ROLAP&#xff08;Relational On-Line An…

理解Mysql索引原理及特性

作为开发人员&#xff0c;碰到了执行时间较长的sql时&#xff0c;基本上大家都会说”加个索引吧”。但是索引是什么东西&#xff0c;索引有哪些特性&#xff0c;下面和大家简单讨论一下。 1 索引如何工作&#xff0c;是如何加快查询速度 索引就好比书本的目录&#xff0c;提高数…

智能优化算法应用:基于差分进化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于差分进化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于差分进化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.差分进化算法4.实验参数设定5.算法结果6.…

FTP、U盘等传统数据安全摆渡方法的6个弊端

数据安全摆渡&#xff0c;即数据在不同的网络之间&#xff0c;进行安全流转。做网间隔离的初衷&#xff0c;就是为了保护数据安全&#xff0c;但是在数据摆渡时&#xff0c;除了安全&#xff0c;企业还是需要考虑其他的要素&#xff0c;比如可靠性、易用性、兼容性等等。而传统…

linux 防火墙systemctl (个人笔记)

查看 systemctl status firewalld 开启 systemctl start firewalld 关闭 systemctl stop firewalld.service 查看所有 firewall-cmd --zonepublic --list-ports 开放端口&#xff1a;// --permanent 永久生效,没有此参数重启后失效 firewall-cmd --zonepublic --add-port9527/…

c语言 词法分析器《编译原理》课程设计 文本形式保存

词法分析器的功能输入源程序&#xff0c;按照构词规则分解成一系列单词符号。单词是语言中具有独立意义的最小单位&#xff0c;包括关键字、标识符、运算符、界符和常量等。 (1) 关键字&#xff1a;是由程序语言定义的具有固定意义的标识符。例如begin&#xff0c;end&#xf…

SpringBoot中日志的使用log4j2

SpringBoot中日志的使用log4j2 1、log4j2介绍 Apache Log4j2 是对 Log4j 的升级&#xff0c;它比其前身 Log4j 1.x 提供了重大改进&#xff0c;并提供了 Logback 中可用的许多改 进&#xff0c;同时修复了 Logback 架构中的一些问题&#xff0c;主要有&#xff1a; 异常处理…

GPDB - 高可用特性 - 同步复制与异步复制

GPDB - 高可用特性 - 同步复制与异步复制 GreenPlum是基于PostgreSQL的分布式数据库&#xff0c;master用于接收用户请求并生成执行计划与分发&#xff0c;当然也可以参与计算&#xff1b;而segment则用于存储数据&#xff0c;将计算的结果传递给master。Segment本身具有高可用…

5.4 Linux KickStart 无人值守安装

1、概念介绍 搭建无人执行安装服务器需要从装网络引导安装操作系统&#xff0c;这样我们就可以不必走到机器那里插入CD&#xff0d;ROM光盘或者U盘手动一台一台安装操作系统&#xff0c;使用网络引导批量部署服务器操作系统。 服务架构&#xff1a;PXE DHCP TFTP Kickstar…