RocketMQ源码阅读-Broker消息接收

RocketMQ源码阅读-Broker消息接收

  • 1. 从单元测试入手
  • 2. Broker启动流程
  • 3. Broker接收消息
  • 4. Broker接收消息时序图
  • 5. 小结

Broker接收 Producer发送的消息。

Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。

Broker的核心类为SendMessageProcessor。
image.png

1. 从单元测试入手

同样从单元测试入手,看Broker接收消息的流程。
SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。
image.png
包含上面这些方法,其中init()方法是启动类,其他是测试流程的方法。
先看init()方法中Broker的启动流程。

2. Broker启动流程

单元测试中的inti()方法为:

@Before
public void init() {
    brokerController.setMessageStore(messageStore);
    when(messageStore.now()).thenReturn(System.currentTimeMillis());
    Channel mockChannel = mock(Channel.class);
    when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
    when(handlerContext.channel()).thenReturn(mockChannel);
    when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
    sendMessageProcessor = new SendMessageProcessor(brokerController);
}

init方法新建一个SendMessageProcessor,并一个传入brokerController,指定一个messageStore:

  • BrokerController: Broker的管理器
  • MessageStore: 消息存储的接口

继续看接收消息的流程SendMessageProcessor#sendMessage。

3. Broker接收消息

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            // 解析请求
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return null;
            }

            // 发送请求Context。在hook场景下使用
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // hook:处理发送消息前逻辑
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            if (requestHeader.isBatch()) {
                // 处理批量发消息逻辑
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                // 处理发送消息逻辑
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

可以看到,此方法负责解析RPC请求,最终是调用SendMessageProcessor#sendMessage方法:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    // 初始化响应
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 如果未开始接收消息,抛出系统异常
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    // 消息配置(Topic配置)校验
    response.setCode(-1);
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    final byte[] body = request.getBody();

    // 如果队列小于0,从可用队列随机选择
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 创建MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 校验是否不允许发送事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        // 添加消息
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
	// 处理result
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

可以看到,此方法进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了MessageStore#putMessage方法:
MessageStore是接口,其默认实现为DefaultMessageStore:
image.png
DefaultMessageStore#putMessage方法:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

	// 从节点不允许写入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
	// store是否允许写入
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }
	// 消息过长
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
	// 消息附加属性过长
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
	// 添加消息到commitLog
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

可以看到,首先是检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过CommitLog.putMessage进行存储。

4. Broker接收消息时序图

SendMessageProcessor_processRequest.png

5. 小结

Producer作为客户端发送消息后,Broker作为服务端需要接收消息并存储消息。
本篇分析了broker接收消息的流程:

  • SendMessageProcessor#processRequest是RPC执行接收消息的方法,此方法主要负责解析RPC请求
  • processRequest调用SendMessageProcessor#sendMessage方法,进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了DefaultMessageStore#putMessage方法
  • DefaultMessageStore#putMessage方法检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过commitLog.putMessage进行存储

消息存储是通过CommitLong#putMessage进行的,这个流程在下一篇《RocketMQ源码阅读-Broker消息存储》学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/320156.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

opencv多张图片实现全景拼接

最近camera项目需要用到全景拼接&#xff0c;故此查阅大量资料&#xff0c;终于将此功能应用在实际项目上&#xff0c;下面总结一下此过程中遇到的一些问题及解决方式&#xff0c;同时也会将源码附在结尾处&#xff0c;供大家参考&#xff0c;本文采用的opencv版本为3.4.12。 首…

一天吃透JVM面试八股文

内容摘自我的学习网站&#xff1a;topjavaer.cn 什么是JVM&#xff1f; JVM&#xff0c;全称Java Virtual Machine&#xff08;Java虚拟机&#xff09;&#xff0c;是通过在实际的计算机上仿真模拟各种计算机功能来实现的。由一套字节码指令集、一组寄存器、一个栈、一个垃圾回…

前端基础知识整理汇总(下)

react 生命周期 React v16.0前的生命周期 初始化(initialization)阶段 此阶段只有一个生命周期方法&#xff1a;constructor。 constructor() 用来做一些组件的初始化工作&#xff0c;如定义this.state的初始内容。如果不初始化 state 或不进行方法绑定&#xff0c;则不需…

Jenkins自动化部署docker

Jenkins自动化部署docker和普通方式构建 docker外挂目录 准备测试服务器docker环境准备jdk环境将上传jar包修改为app.jar对外暴露1000端口启动jar FROM openjdk:8-jdk-alpine ARG JAR_FILE COPY ${JAR_FILE} app.jar EXPOSE 1000 ENTRYPOINT ["java","-jar&q…

spring-boot2.7.8添加swagger

一、新建项目swaggerdemo 二、修改pom.xml 注意修改&#xff1a;spring-boot-starter-parent版本为&#xff1a;2.7.8 添加依赖&#xff1a; springfox-swagger2 springfox-swagger-ui springfox-boot-starter <?xml version"1.0" encoding"UTF-8"…

ruoyi后台管理系统部署-3-安装redis

centos7安装redis 1. yum 安装 查看是否安装了redis yum installed list | grep redis ps -ef | grep redis安装epel 仓库&#xff08;仓库是软件包下载的&#xff0c;类似maven&#xff0c;nuget&#xff09; yum install epel-release搜索 redis 包 yum search redis安装…

剪映国际版,免费无限制使用

随着抖音的爆火短视频的崛起&#xff0c;相信每一个人都感受到了短视频快节奏下的生活洪流。 现如今每个人都能成为自己生活的记录者&#xff0c;每一个人都有掌握着剪辑的基本技能。而剪映就是很多人都会使用的剪辑软件。 相对于PR、AE等剪辑软件来说&#xff0c;作为一款国…

动态规划篇-01:爬楼梯

本文为力扣70&#xff1a;爬楼梯的详细解析。 虽然这道题的标签是“简单”&#xff0c;但是只有简单的题才能让我们专注于这类题的解题框架上。 一般来说动态规划会有三种解法&#xff1a;暴力解法、使用了备忘录自上而下的递归解法、使用了数组的自下而上的迭代解法。接下来…

FineBI实战项目一(21):不同支付方式订单总额分析开发

点击新建组件&#xff0c;创建不同支付方式订单总额组件。 选择饼图&#xff0c;拖拽total_money到角度&#xff0c;拖拽pay_type到颜色&#xff0c;调节内径。 修改颜色的标识文字。 将组件拖拽到仪表板。 结果如下&#xff1a;

用C语言实现哈希表HashMap

代码仓库地址 1. 功能说明 自定义初始容量和负载因子&#xff1b;当键值对的个数与容量比值超过负载因子时&#xff0c;自动扩容&#xff1b;借鉴Java8的HashMap部分扩容逻辑&#xff0c;定义了单独的桶结构体用于记录hash值&#xff0c;以及2倍扩容&#xff0c;减少了hash运算…

K8S--service

一、简介 Service 是将集群中的 一个或一组 Pod应用程序公开为网络服务的方法。我们都知道pod是不稳定的,有可能时时刻刻都在创建和销毁,这一时刻运行的 Pod 集合可能不同于下一刻运行该应用的 Pod 集合,并且新创建的pod的ip地址会改变,所以我们不应该寄期望于pod的稳定性…

Calibre DESIGNrev Object Selection Toolbar

包括 Reference Path Polygon Edge Vertex Text的解释说明 FieldDescription用法&#xff08;勾选后&#xff09;ReferenceUsed to move or select a cell reference or array reference.可以选择一个cellPathUsed to move or select a contiguous path object.暂时不明请指教…

浏览器进程模型和JS的事件循环

一、浏览器的进程模型 1、什么是进程&#xff1f; 程序运行所需要的专属内存空间 2、什么是线程&#xff1f; ​​​​​运行​代码的称为线程&#xff08;同一个进程中的线程共享进程的资源&#xff09; ⼀个进程⾄少有⼀个线程&#xff0c;所以在进程开启后会⾃动创建⼀个线…

k8s云原生环境搭建笔记——第二篇

目录 1、使用普通方式安装prometheus和grafana1.1、安装kube-state-metrics容器1.1.1、下载并修改yaml文件1.1.2、导入kube-state-metrics镜像1.1.3、执行yaml文件目录 1.2、安装node-exploer1.2.1、创建名称空间prometheus1.2.2、执行yaml 1.3、安装prometheus1.3.1、创建集群…

【JUC进阶】14. TransmittableThreadLocal

目录 1、前言 2、TransmittableThreadLocal 2.1、使用场景 2.2、基本使用 3、实现原理 4、小结 1、前言 书接上回《【JUC进阶】13. InheritableThreadLocal》&#xff0c;提到了InheritableThreadLocal虽然能进行父子线程的值传递&#xff0c;但是如果在线程池中&#x…

e2studio开发三轴加速度计LIS2DW12(4)----测量倾斜度

e2studio开发三轴加速度计LIS2DW12.4--测量倾斜度 概述视频教学样品申请源码下载计算倾斜角度工作原理单轴倾斜检测双轴倾斜检测三轴倾斜检测通信模式管脚定义IIC通信模式速率新建工程工程模板保存工程路径芯片配置工程模板选择时钟设置UART配置UART属性配置设置e2studio堆栈e…

自编C++题目——输入程序

预估难度 简单 题目描述 小明编了一个输入程序&#xff0c;当用户的输入之中有<时&#xff0c;光标移动到最右边&#xff1b;当输入有>时&#xff0c;光标移动到最左边&#xff0c;当输入有^时&#xff0c;光标移动到前一个字符&#xff0c;当输入为#时&#xff0c;清…

大模型实战营Day4 XTuner 大模型单卡低成本微调实战 作业

按照文档操作&#xff1a; 单卡跑完训练&#xff1a; 按照要求更改微调的数据&#xff1a; 完成微调数据的脚本生成&#xff1a; 修改配置文件&#xff1a; 替换好文件后启动&#xff1a; 启动后终端如图&#xff1a; 用于微调的一些数据显示&#xff1a; 训练时间&#x…

记录一次git merge后发现有些文件不对的问题,排查过程

分支进行merge&#xff08;A merge到B&#xff09;之后&#xff0c;发现string.xml中有些字段的值没有merge过来&#xff0c;一开始还以为自己是自己merge错误&#xff0c;检查了一遍自己的merge操作没有问题。 那为啥没有merge过来呢&#xff1f;有一种可能是&#xff0c;merg…

vue2实现日历12个月平铺,显示工作日休息日

参考&#xff1a;https://blog.csdn.net/weixin_40292154/article/details/125312368 1.组件DateCalendar.vue&#xff0c;sass改为less <template><div class"cc-calendar"><div class"calendar-title"><span>{{ year }}年{{ mo…