RocketMq源码解析四:生产者Producer启动

一、主要接口和类

        生产者服务核心接口和类的关系如下图所示:

        MQProducer是生产者解耦,这里找几个有代表性的方法

// 同步发送消息

SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;

// 同步超时发送消息 如果超过了timeout的时间就抛出异常

SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

// 异步发送消息

void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;
// 异步超时发送消息 如果超过了timeout的时间就抛出异常
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;

// 指定消息队列同步发送消息

SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;

        DefaultMQProducer除了实现MQProducer的方法外,还继承了ClientConfig类,ClientConfig中主要记录了客户端的一些连接配置信息,我们重点看下DefaultMQProducer中有哪些核心属性

producerGroup:生产者所属组
createTopickey:默认Topic

defaultTopicQueueNums:默认主题在每一个Broker队列数量

sendMsgTimeout:发送消息默认超时时间,默认3s

compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

retryTimeswhensendFailed:同步方式发送消息重试次数,默认为2,总共执行3次retryTimeswhensendAsyncFailed:异步方法发送消息重试次数,默认为2

retryAnotherBrokerwhenNotstoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

maxMessagesize:允许发送的最大消息长度,默认为4M 

        我们看到 DefaultMQProducer中持有了一个transient 修饰的DefaultMQProducerImpl类的成员属性defaultMQProducerImpl,实际上核心的功能都封装在了这个DefaultMQProducerImpl类中,下面我们逐一来为读者展开说明。

二、生产者启动流程 

        我们先来看生产者启动的方法 DefaultMQProducer::start

    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

        第一步是获取并设置生产者组的信息;

        第二步调用defaultMQProducerImpl的start方法,我们上文讲过DefaultMQProducer的大部分核心功能都是封装在DefaultMQProducerImpl类中。我们来看下defaultMQProducerImpl中的start方法:

        流程图如下:

        源码如下:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            // 如果是启动
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 1、先检查一下配置
                this.checkConfig();
                // 2、设置自身的客户端名称为进程ID
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 3、获取MQClientManager并获得MQClientInstance实例
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                // 4、把当前的生产者注入MQClientFactory中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                
                // 5、调用start方法启动
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            ...
        }
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        RequestFutureHolder.getInstance().startScheduledTask(this);
    }

       重点讲一下第3步:获取MQClientManager并获得MQClientInstance实例。整个JVM中只存在一个MQClienManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */, MQClientinstance> factoryTable = newConcurrentHashMap<String,MQClientlnstance>():

        同一个clientld只会创建一个MQClientInstance。MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
        代码:MQClientManager::getAndCreateMQClientInstance 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

a:构建客户端的id;

b:从缓存表factoryTable中获取对应clientId的实例;

c:如果没有就生成一个并放入到缓存表中;

d:返回 

        最后我们来看下mQClientFactory.start()当中的源码。

// 先把服务状态改为失败

this.serviceState = ServiceState.START_FAILED;
// 如果配置中的namesrv地址为空,重新获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel  启动一个netty服务用于处理请求
this.mQClientAPIImpl.start();
// Start various schedule tasks 启动一系列定时任务用于更新namesrv地址,topic消费情况等
this.startScheduledTask();
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动relalance服务
this.rebalanceService.start();
// Start push service 启动推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);

// 把服务状态改为运行中
this.serviceState = ServiceState.RUNNING;

        至此生产者启动流程已经讲述完毕。

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/648485.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

qt 布局学习笔记

目录 qt下载地址&#xff1a; widget 宽高 管理信息列表源码 c版&#xff1a; pro文件&#xff1a; qt 设置水平布局&#xff0c;里面有两个按钮&#xff0c;每个按钮就变的很宽&#xff0c;怎么设置按钮的精确位置 设置固定大小&#xff1a; 使用弹性空间&#xff08;…

【网络安全】勒索软件ShrinkLocker使用 windows系统安全工具BitLocker实施攻击

文章目录 威胁无不不在BitLocker 概述如何利用BitLocker进行攻击如何降低影响Win11 24H2 装机默认开启 BitLocker推荐阅读 威胁无不不在 网络攻击的形式不断发展&#xff0c;即便是合法的 Windows 安全功能也会成为黑客的攻击工具。 卡巴斯基实验室专家 发现 使用BitLocker的…

C++质数的那些事(判断指数、区间筛质数、互质等等)

质数的定义&#xff1a;若一个正整数除了1和它自身之外不能被任何自然数整除&#xff0c;则该数称为质数&#xff0c;也叫素数。否则为合数。 质数的性质&#xff1a;质数的分布较为稀疏&#xff0c;对于一个足够大的数S&#xff0c;不超过S的质数大约有个&#xff0c;也就是说…

渗透测试的测试流程与注意事项

软件测试流程 渗透测试是一种重要的软件测试技术&#xff0c;通过对系统进行模拟攻击和漏洞评估&#xff0c;帮助组织发现和修复潜在的安全风险&#xff0c;提高系统的安全性和稳定性。在进行渗透测试时&#xff0c;需要注意合法授权、技术能力、安全意识和报告质量等方面的问…

简单多状态 dp 问题

11. 按摩师&#xff08;easy&#xff09; 解法&#xff08;动态规划&#xff09;&#xff1a; 图解&#xff1a; C 算法代码&#xff1a; class Solution { public:int massage(vector<int>& nums) {// 1. 创建⼀个 dp 表// 2. 初始化// 3. 填表// 4. 返回值int n n…

用C#调用SAP 的WebServices接口

文章目录 用C#调用SAP 的WebServices接口创建C#的项目添加窗体添加引用在表单的装载事件里编写代码运行结果SAP的RFC函数 用C#调用SAP 的WebServices接口 创建C#的项目 添加窗体 添加引用 在表单的装载事件里编写代码 using System; using System.Collections.Generic; using …

在Nano上部署yolov5

确认镜像版本为JetPack4.4.1&#xff08;L4T 32.4.4&#xff09;以上版本 下载链接下载pytorchnvidia docker镜像&#xff08;pytorch1.6torchvision0.7.0&#xff09;yolov5opencv4.4.0 1. 在已经部署了镜像的机器上获取镜像   1.1 获取镜像名     docker images   …

ssm招聘信息管理系统-计算机毕业设计源码78049

摘 要 由于数据库和数据仓库技术的快速发展&#xff0c;招聘客户管理系统建设越来越向模块化、智能化、自我服务和管理科学化的方向发展。招聘客户系统对处理对象和服务对象&#xff0c;自身的系统结构&#xff0c;处理能力&#xff0c;都将适应技术发展的要求发生重大的变化。…

265 基于matlab的粒子群优化分数阶灰色预测模型

基于matlab的粒子群优化分数阶灰色预测模型&#xff0c;以误差结果为目标进行预测&#xff0c;输出多个预测结果。并输出迭代曲线。程序已调通&#xff0c;可直接运行。 265 分数阶灰色预测 粒子群优化算法 - 小红书 (xiaohongshu.com)

Mac | macOs系统安装Monuty解决外接u盘ntfs读写问题

问题 mac电脑的macOs系统无法将文件读写入外接u盘或硬盘中&#xff1b; 解决方案 安装Monuty 官网&#xff1a;mounty官网 下载软件 安装其他配置 macbook:~ uwe$ brew install --cask macfuse macbook:~ uwe$ brew install gromgit/fuse/ntfs-3g-mac macbook:~ uwe$ brew…

移动云主机ECS搭建Kubernetes集群:详细步骤与指南

目录 云主机 ECS&#xff1a;云计算的强大引擎什么是云主机ECS&#xff1f;为何选择云主机ECS&#xff1f; 使用移动云ECS进行Kubernetes集群搭建1. 环境准备2. 安装步骤2.1 在每一个节点上执行的操作2.1.1 系统准备2.1.2 安装Docker2.1.3 安装Kubernetes的安装组件 2.2 在Mast…

MyBatis中的Where标签:提升你的SQL查询效率

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 理解MyBatis的Where标签 MyBatis是一款优秀的持久层框架&#xff0c;它提供了许多强大的标签来帮助编写更优雅、高效的SQL语句。其中&#xff0c;<where>标签是使用频率极高的一个&#xff0c;它能够自动处理…

自反馈 Transformer:一种针对真实世界胰腺神经内分泌肿瘤数据的多标签诊断模型

文章目录 Self-feedback Transformer: A Multi-label Diagnostic Model for Real-World Pancreatic Neuroendocrine Neoplasms Data摘要方法实验结果 Self-feedback Transformer: A Multi-label Diagnostic Model for Real-World Pancreatic Neuroendocrine Neoplasms Data 摘…

录屏软件免费版有哪些?3款软件实现一站式录制

录屏软件免费版有哪些&#xff1f;在数字化时代的浪潮中&#xff0c;录屏软件已然成为现代生活与工作的得力助手。它们不仅帮助我们轻松捕捉屏幕上的精彩瞬间&#xff0c;还提供了丰富的编辑和分享功能。无论是教学演示、游戏直播还是日常记录&#xff0c;这些软件都能满足用户…

【方法】ZIP压缩文件的密码如何设置和取消?

ZIP是一种常见的压缩文件格式&#xff0c;今天来分享一下&#xff0c;ZIP压缩文件如何设置密码保护&#xff0c;以及如何取消密码&#xff0c;不清楚的小伙伴一起来看看吧&#xff01; 设置ZIP文件密码&#xff1a; 想要给ZIP压缩包设置密码&#xff0c;需要用到支持ZIP格式的…

前端开发工程师——webpack

一.环境准备 npm init -y npm i webpack webpack-cli -D 打包命令 npx webpack ./src/main.js --modedevelopment //development开发模式 //production生产模式 npx webpack 直接运行就行 二.加载器loader 在less/stylus/css/sass/images中添加适当的样式 例如&#xff1…

使用MyBatis进行批量新增更新操作 ON CONFLICT

1.数据库增加uniques 2.mybatis <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace"co…

社交网络安全:保护用户数据的Facebook实践

在数字化时代&#xff0c;社交网络安全成为了人们关注的焦点之一。作为全球最大的社交平台之一&#xff0c;Facebook一直在致力于保护用户数据安全和隐私。本文将探讨Facebook在社交网络安全方面的实践&#xff0c;以及它所采取的措施来保护用户数据的安全性。 1. 数据加密与隐…

Mysql搭建主从同步,docker方式(一主一从)

服务器&#xff1a;两台Centos9 用Docker搭建主从 使用Docker拉取MySQL镜像 确保两台服务器都安装好了docker 安装docker请查看&#xff1a;Centos安装docker 1.两台服务器都先拉取mysql镜像 docker pull mysql 2.我这里是在 /opt/docker/mysql 下创建mysql的文件夹用来存…

找出缺失的观测数据

代码实现&#xff1a; 在缺失的 n 个观测数据中&#xff0c;有 y 个观测数据是 x1&#xff0c;其余观测数据都是x int* missingRolls(int *rolls, int rollsSize, int mean, int n, int *returnSize) {int m rollsSize;int sum mean * (n m);int missingSum sum;for (int i…