面试集中营—rocketmq架构篇

一、基本定义

        Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

  • Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。
  • 生产者:负责生产消息并发送消息到 Topic 的角色。
  • 消费者:负责从 Topic 接收并消费消息的角色。
  • 消息:生产者向 Topic 发送的内容,会被消费者消费。
  • 消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Mesage Key 和 Tag 等。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

        Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

二、消息存储

1、基础

        分布式队列因为有可靠性的要求,一般要对消息进行持久化的处理。对于存储介质的选择:

        1、关系型数据库DB:如Avtivemq,关系型数据库在数据量变大之后性能会显著下降,同时还需要进行关系型数据库的连接,数据库持久化的操作,最终还是数据还是进入的文件系统。

        2、文件系统:如RocketMQ、Kafka、RabbitMQ。直接把数据保存在文件系统中,可靠快速。

2、消息存储和发送性能保证

        如果磁盘配置得当,磁盘的顺序写入速度可以达到600MB/s,这超过了一般网卡的传输速度。但是随机写的速度只有大概100KB/s,和顺序写入相比相差了大概6000倍。所以RocketMQ就使用了顺序写来保证消息的存储速度。

        默认情况下,从磁盘中读取文件或者通过网络向文件中写入数据,需要内核态与用户态的多次拷贝操作。RocketMQ使用了零拷贝的技术,省去了向用户态拷贝的步骤,提高了消息存盘和网络发送的速度。零拷贝机制在Java中有一个MappedByteBuffer来实现

       采用MappedByteBuffer技术对文件有大小的要求,故RocketMQ的默认的一个CommitLog日志数据文件大小为1g。

3、消息存储结构

       消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

  • commitLog:  存储消息的元数据,可能会有多个文件,每个文件默认大小1g。
  • consumequeue:存储消息在CommitLog的索引,这个消费逻辑队列。为了加快commitLog的读取速度。当我们创建了一个消息队列就会对应产生一个对应的consumequeue。
  • indexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种提供indexFile来查找消息的方法不影响发送和消费消息的主流程

        消息发送过来先存储到commitLog,为了加快commitLog读取速度,就出现了一个consumequeue,相当于消息的主键索引,而indexFile其他索引例如key或者时间

4、刷盘机制

        同步和异步刷屏如下图所示

        (1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

        (2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache(内存)即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

三、高可用机制

通信机制

        RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

        1、Broker启动后,向NameServer注册并每隔30s时间定时向NameServer上报Topic路由信息;

        2、消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息;

        3、消息生产者Producer根据第二步获得路由信息,选择一个消息队列(MessageQueue)进行消息的投递,消息的接收方是Broker,Broker接收消息并落盘;

        4、消息的消费者Consumer也根据第二步获得的路由信息,先进行客户端的负载均衡,然后选择一个或多个消息队列中的信息进行消费;

高可用集群架构

        如上图所示,生产者有生产者集群,消费者有消费者集群,Name Server也就Name Server集群。Broker集群一般采用多主多从的机制。一组主从节点的broker名称相同,其中master节点的brokerId为0,从节点的brokerId大于0;从节点只负责读的工作

        当消费者读取消息时,出现master节点故障,broker集群会立即把读取的目标转到slave节点中,保证消费的高可用;

        生产者发送消息时,即创建topic的时候,把topic中的多个messagequeue创建在多个broker组上,如果其中一个主节点挂了,还有一个主节点提供服务,此时保证生产者的高可用;

主从复制

        主从复制指的是broker集群中的主从节点的数据复制,包括同步复制和异步复制。同步复制牺牲了一部分的性能但是数据可靠性高基本不会都丢失数据;

        生产环境中,建议刷盘方式配置成异步SYNC_FLUSH,保证一个吞吐量,然后主从复制使用同步复制ASYNC_FLUSH组合来保证数据的安全性;

四、消息投递与消费       

负载均衡

        生产者负载均衡

         生产者在发送消息的时候,要根据topic的路由来完成消息的投递,投递的目的地是broker。一个topic关联了多个messageQueue,客户端正常会轮询的方式依次投递到不同的消息队列中,由于消息队列配置在不同的broker上,这样就完成了消息投递的负载均衡,我们看下官网中给出的解释:

        Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。

        具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

        消费者负载均衡

         在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

        在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量中;

        在Consumer实例的启动后,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。这个线程会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。

        集群模式

        1)获取该Topic主题下的消息消费队列集合;

        2)向Broker端发送获取该消费组下消费者Id列表;

        3)先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。

        广播模式

        Consumer实例把所有的消息队列中的消息都拉取过来

五、事务消息

        Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

事务消息流程概要

        分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程

        1.事务消息发送及提交:

        (1) 发送消息(half消息)。

        (2) 服务端响应消息写入结果。

        (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

        (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

        2.补偿流程:

        (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

        (2) Producer收到回查消息,检查回查消息对应的本地事务的状态

        (3) 根据本地事务状态,重新Commit或者Rollback

        其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

事务消息设计

        我们看到上图中有一个halfmsg,这就是事务消息的关键;

        1、事务消息在一阶段对用户不可见

         如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

        2、Commit和Rollback操作以及Op消息的引入

        在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。

        Rollback

        本身一阶段的消息对用户就是不可见的, 也就是无法消费的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的);但还是需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。

        OP消息

        RocketMQ将Op消息写入到全局一个特定的Topic中,这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

        Commit

        在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

六、消息消费幂等性

     必要性

       Rocketmq中的消息是会出现重复的,主要有以下三个内容

       1、发送时消息重复:生产者在成功发送了消息之后,此时由于网络抖动等问题造成服务端没有对客户端应答失败,故而生产者会重复投递一个messageId相同的消息;

       2、投递时消息重复:消费者成功消费之后,由于网络抖动等问题造成客户端没有给服务端应答,此时服务端基于投递策略(比如至少成功一次),会再次尝试投递之前已经成功的消息;

       3、扩容时rebalance造成的消息重复投递:当客户端重启,扩缩容时会造成rebanlance就是重新负载均衡,也会造成消息的重复投递;

     解决方案

       首先不建议使用messageId来做幂等性检查,但是rocketmq不保证默认消息id的唯一性;

       通常在消息中设置一个业务key,在消费者端通过判定业务key是否已经消费过了来判定幂等性;这个业务key可以通过数据库来存储,也可以通过redis等缓存工具来存储;

参考:

3.RocketMQ消息存储结构_哔哩哔哩_bilibili

分布式事务-阿里云MQ事务消息踩坑记录_localtransactionchecker-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/626883.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

多格式兼容的在线原型查看:Axure RP的便捷解决方案

Axure rp不仅可以绘制详细的产品构思,还可以在浏览器中生成html页面,但需要安装插件才能打开。安装Axure后 rpchrome插件后,还需要在扩展程序中选择“允许访问文件网站”,否则无法在Axure中成功选择 rp在线查看原型。听起来很麻烦…

添砖Java之路(其七)——static

目录 static: 1.被类的所有对象所共享(和c有点像) 2.多了一种调用方法,可以通过类名调用 3.随着类的加载而加载,是优先于对象的存在。 工具类: 为什么主类的方法要加static: 理解 public static void main&#…

喜大普奔!VMware Workstation Pro 17.5 官宣免费!

Broadcom 已经正式收购 VMware,【VMware中国】官方公众号已于3月11日更名为【VMware by Broadcom中国】。 13日傍晚,该公众号发表推文 V风拂面,好久不见 - 来自VMware 中国的问候 ,意味着 VMware 带着惊喜和美好的愿景再次归来。 …

​​​【收录 Hello 算法】6.2 哈希冲突

目录 6.2 哈希冲突 6.2.1 链式地址 6.2.2 开放寻址 1. 线性探测 2. 平方探测 3. 多次哈希 6.2.3 编程语言的选择 6.2 哈希冲突 上一节提到,通常情况下哈希函数的输入空间远大于输出空间,因此理论上哈希冲突是不可避免的。比如&a…

基于GWO灰狼优化的CNN-GRU-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络(CNN)在时间序列中的应用 4.2 GRU网络 4.3 注意力机制(Attention) 4.4 GWO优化 5.算法完整程序工程 1.算法运行效果图预览…

S32K144 EB 和 MCAL 安装

首先安装 EB Design : Product Information : Automotive SW - AUTOSAR MCAL / QM (flexnetoperations.com) 参考 NXP_AUTOSAR_MCAL开发环境搭建引导_S32K14x系列_nxp的s32k144 sdk文档-CSDN博客 然后安装 MCAL 需要把 P1 的 Plugins 和 AUTOSAR\S32K14X_MCAL4_3_RTM_1_0_0\S…

暴雨分布式存储集群助重庆高校打造智慧校园

教育是国家发展的基石,教育兴则国家兴,教育强则国家强。党的二十大报告指出,“加快建设教育强国”,并提出到2035年“建成教育强国”的总体目标。随着数字时代的到来,以物联网、大数据、云计算和人工智能为代表的数字技…

【C语言】4.C语言数组(2)

文章目录 6. 二维数组的创建6.1 ⼆维数组的概念6.2 ⼆维数组的创建 7. 二维数组的初始化7.1 不完全初始化7.2 完全初始化7.3 按照⾏初始化7.4 初始化时省略⾏,但是不能省略列 8. 二维数组的使用8.1 ⼆维数组的下标8.2 ⼆维数组的输⼊和输出 9. 二维数组在内存中的存…

数据挖掘(三)特征构造

前言 基于国防科技大学 丁兆云老师的《数据挖掘》课程 数据挖掘 数据挖掘(一)数据类型与统计 数据挖掘(二)数据预处理 3、特征构造 3.1 基本特征构造方法: 3.1.1 运用已有知识直接构造: 一般是根据原有…

Elasticsearch分词及其自定义

文章目录 分词发生的阶段写入数据阶段执行检索阶段 分词器的组成字符过滤文本切分为分词分词后再过滤 分词器的分类默认分词器其他典型分词器 特定业务场景的自定义分词案例实战问题拆解实现方案 分词发生的阶段 写入数据阶段 分词发生在数据写入阶段,也就是数据索…

centos7.9安装PHP运行环境

MySQL安装 报错:源 "MySQL 8.0 Community Server" 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。 解决: yum install mysql-server -y --nogpgcheck 查询初始密码 grep temporary password /var…

振弦式应变计的与实际测量值不一致怎么办

在进行结构健康监测或其他工程测量时,精确性和可靠性至关重要。振弦式表面应变计是一种广泛使用的测量工具,它通过测量材料表面的应变来评估结构的应力状态。然而,在实际应用中,振弦式应变计的测量值与实际应变值之间的不一致问题…

Springboot+MybatisPlus如何实现分页和模糊查询

实现分页查询的时候我们需要创建一个config配置类 1、创建MybatisPlusConfig类 Configuration //表明这是一个配置类 ConditionalOnClass(Value{PaginationInterceptor.class} //ConditionalOnClass:当指定的类存在时,才会创建对应的Bean // 这里当PaginationInt…

单调栈练习

最大矩形面积 如果栈为空&#xff0c;或者新的元素是大于栈顶元素的&#xff0c;那么新来的元素不会破坏栈的单调性&#xff0c;那么就把这个柱子入栈。 特别注意&#xff1a;这里的s.empty()和s.top().height < a不能调换顺序&#xff0c;包括后面的判断也要先判断栈是否为…

基于yolov2深度学习网络的单人口罩佩戴检测和人脸定位算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ..............................................................I0 imresize…

C++初阶学习第六弹——string(1)——标准库中的string类

前言&#xff1a; 在前面&#xff0c;我们学习了C的类与对象&#xff0c;认识到了C与C语言的一些不同&#xff0c;今天&#xff0c;我们将进入C的 关键部分——STL&#xff0c;学习完这部分之后&#xff0c;我们就可以清楚的认识到C相比于C语言的快捷与便利 目录 一、为什么有s…

实时网络监控 - 一目了然网络状况

网络问题排查一直是IT管理员头痛的问题。随着网络规模的不断扩大和业务复杂度的提升&#xff0c;如何快速定位和解决网络故障变得尤为关键。本文详细介绍了一款名为 AnaTraf 的网络流量分析工具,它能提供全流量回溯分析、实时网络监控、性能分析等功能,助力企业快速诊断和解决各…

Linux/ubuntu build编译make时出现has modification time int the future的问题解决方法

针对Linux由于双系统之间的时间冲突导致linux时间经常变化&#xff0c;出现执行make命令时出现“make[2]: Warning: File xxx.c’ has modification time 1.6e05 s in the future “警告的问题&#xff0c;亦或者虚拟机出现相同的问题。 由于时钟同步问题&#xff0c;出现 warn…

ChatGlm的部署和训练

一、chatGlm的环境部署 1.安装anocoda 下载安装anaconda。具体教程详见官网教程。 2.安装CUDA 1&#xff09;首先在终端查看你的Nividian版本&#xff0c;命令如下&#xff1a; 2)如果你没有下载你要去下载cuda下载网站&#xff0c;这里是12.3是因为我cuda version版本12…

正则表达式教程

正则表达式在线工具网站&#xff1a;https://regexr.com