RocketMq详解:一、RocketMQ 介绍及基本概念

文章目录

  • 前言
  • 1.RocketMQ简介
  • 2.RocketMQ 特点
  • 3.核心特性
  • 4.应用场景
  • 5.RocketMQ 优势
  • 6.RocketMQ 四大核心组件
    • 6.1 NameServer
      • 1.NameServer作用
      • 2.NameServer被设计为无状态的原因
      • 3.和NameServer和Zookeeper的区别
      • 4.NameServer的高可用保障
    • 6.2 Broker
      • 1.Broker部署方式
      • 2.高可用与负载均衡
    • 6.3 生产者(Producer)
      • 1.生产者类型
      • 2.发送方式
      • 3.生产者组
      • 4.启动流程
      • 3.高可用保障
    • 6.4 消费者(Consumer)
      • 1.消费者类型
      • 2.消费者组(Consumer Group)
      • 3.高可用保障
  • 7.RocketMq的运转流程
  • 8.常见的名词解释
  • 9.设计理念
  • 10.RocketMQ 架构
    • 10.1 NameServer 集群
    • 10.2 Broker 集群
    • 10.3 Producer 集群
    • 10.4 Consumer 集群
    • 10.5 集群间交互方式的说明
    • 10.6 RocketMq的消息
      • 1.顺序消费
      • 2.消息过滤
      • 3.消息存储
      • 4.消息高可用性

前言

由于之前都是用的RabbitMq,但是RabbitMq较RocketMq而言,无论是学习和维护成本还是性能上都远远逊色于RocketMQ,因为开发者需要花费更多的时间来理解和掌握其丰富的交换器类型、路由键规则、绑定机制以及高级特性,如插件系统、事务、确认模型等。

这种复杂性可能增加开发和维护的难度。且在高并发、大规模消息处理场景下,RocketMQ 通常被认为能够提供更高的吞吐量和更低的延迟

这意味着在对消息队列性能有严格要求的业务环境中,RocketMQ 可能是更合适的选择。

所以从本章开始带着大家系统的学习一下RocketMq,本章将带着大家详细的去了解RocketMq四个核心组件的原理、特点以及RocketMQ的整体架构设计

1.RocketMQ简介

RocketMQ 是阿里巴巴开源的分布式消息中间件,现已成为 Apache 软件基金会的顶级项目。支持事务消息顺序消息批量消息定时消息消息回溯等

它里面有几个区别于标准消息中件间的概念,如GroupTopicQueue等。

系统组成则由ProducerConsumerBrokerNameServer等组件组成。

2.RocketMQ 特点

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer、Consumer、队列都可以分布式
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式

pull其实就是消费者主动从MQ中去拉消息,而push则像rabbit MQ一样,是MQ给消费者推送消息。
但是RocketMQ的push其实是基于pull来实现的。
它会先由一个业务代码从MQ中pull消息,然后再由业务代码push给特定的应用/消费者。其实底层就是一个pull模式

  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 JMS、OpenMessaging 等
  • 较少的依赖

3.核心特性

高性能
1.单机万级队列:单台 Broker 节点支持创建超过一万个持久化队列,能够应对极高并发的生产与消费场景。

2.顺序写磁盘:采用 Commit Log 结构,所有消息按序写入一个大文件,利用磁盘顺序写入的高效率,实现高吞吐量和低延迟。

3.零拷贝技术:在消息读取过程中采用零拷贝技术,减少数据在内核空间和用户空间之间的复制操作,提高 I/O 效率。

高可靠与高可用
1.分布式架构:系统由 NameServer、Broker、Producer 和 Consumer 四大组件构成,各组件均可水平扩展,实现无单点故障。

2.多副本机制:支持主从副本模式,保证消息在 Broker 节点间的冗余备份,防止数据丢失。

3.故障自动切换:当主节点发生故障时,系统能够自动将流量切换至备节点,保证服务连续性。

消息模型与丰富功能
1.多种消息类型:支持普通消息、顺序消息、事务消息、批量消息、定时(延时)消息、消息回溯等,满足不同业务场景需求。

2.消息顺序性:提供单分区严格顺序消息和全局顺序消息,确保消息在特定场景下的消费顺序。

3.事务消息:支持分布式事务,通过两阶段提交回查机制,确保消息发送与数据库操作的最终一致性

易用性与灵活性
1.多种发送与消费模式:Producer 提供同步、异步、单向发送等多种方式;Consumer 支持拉取(Pull)和推送(Push)两种消费模式。

2.丰富的客户端支持:提供 Java、Python、Go 等多种语言的 SDK,便于不同开发环境集成。

3.易于运维与管理:具备完善的监控指标、报警机制、命令行工具及图形化控制台,便于日常运维和故障排查。

4.应用场景

RocketMQ 广泛应用于各种消息驱动的场景,包括但不限于:

  • 订单系统:处理订单创建、支付、退款等流程中的异步通知和状态同步。
  • 数据同步:在大数据处理中作为数据采集和分发的管道,实现数据的实时或准实时流转。
  • 日志收集:作为日志聚合系统的一部分,接收、存储和转发各类应用产生的日志数据。
  • 通知推送:用于发送短信、邮件、APP 推送等各类通知消息,实现用户触达。
  • 任务调度:配合定时(延时)消息功能,执行定时任务或延迟任务。

5.RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,ActiveMQ其主要优势有:

1.支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)

2.支持 18 个级别的延迟消息(Kafka 不支持)

3.支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)

4.支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)

5.支持重复消费(RabbitMQ 不支持,Kafka 支持)

6.在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)

6.RocketMQ 四大核心组件

RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
在这里插入图片描述

Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。

Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。

ConsumerGroup 由多个Consumer 实例构成

Topic:区分消息的种类。一个发送者可以发送消息给一个或多个 Topic,一个消息的接收者可以订阅一个或多个 Topic 消息。

Message Queue:相当于是 Topic 的分区,用于并行发送和接收消息。

6.1 NameServer

NameServer 是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步

所以 RocketMQ 需要先启动 NameServer 再启动 Broker

1.NameServer作用

名称服务器(NameServer) 是整个 RocketMQ 的 “大脑”,它相当于是服务注册中心的角色,用来管理 Broker

名称服务器(NameServer)用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找Broker 信息。

NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到NameServer 获取到 Broker 的路由信息,进而和Broker取得连接。

Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的ZooKeeper ,后来改为了自己实现NameServer 。

2.NameServer被设计为无状态的原因

RocketMQ 的 NameServer 之所以被称为无状态,是因为它不依赖持久化存储,节点间不进行状态同步,仅在内存中临时保存由 Broker 等角色主动上报的动态状态信息,且这些信息在 NameServer 重启后不会保留。

这种设计简化了系统架构,提升了服务的可伸缩性和容错能力,同时也降低了运维复杂度

我们可以通过以下几点去理解为何NameServer要被设计为无状态:

1.内存存储
NameServer 不依赖于持久化的存储介质(如硬盘)来保存其管理的元数据。

它通过接收 Broker、Producer 和 Consumer 的定期心跳消息,将集群中的Broker信息、Topic路由信息、队列分布情况等动态状态数据全部存储在内存中。

这意味着 NameServer 在重启后,如果不接收到心跳更新,将失去之前维护的所有状态信息,需重新从各个角色处接收心跳并重建内存中的状态

2.节点独立
NameServer 节点之间相互独立,它们并不进行任何状态数据的同步。每个 NameServer 实例都独立处理心跳请求、注册信息和查询请求,对外提供的服务完全基于各自内存中的实时状态。由于没有状态共享或复制机制,任意 NameServer 节点的宕机或加入都不会影响其他节点的正常运行,也不会导致整体系统的状态变化。

3.服务发现与负载均衡
NameServer 主要负责服务发现和路由寻址功能,即为客户端提供 Broker 列表Topic与 Broker 之间的映射关系 等。

客户端(Producer、Consumer)根据 NameServer 返回的数据与具体的 Broker 建立连接,进行消息的生产和消费。

这种设计使得 NameServer 不需要保存客户端的任何会话状态或连接状态,只关注集群的整体拓扑结构

4.简化架构与运维
无状态的设计极大地简化了 NameServer 的架构,这种设计使得 NameServer 无需考虑数据一致性问题、数据同步开销以及潜在的分布式一致性算法复杂性

每个 NameServer 实例都是平等且可替换的,这简化了集群的扩容、缩容和故障恢复过程,提高了系统的运维效率。

5.高可用与容错
由于 NameServer 无状态,可以很容易地通过增加节点数量来实现水平扩展和高可用

客户端通常配置多个 NameServer 地址列表,当某个 NameServer 不可用时,可以快速切换到其他节点继续获取服务。

此外,无状态特性也意味着故障节点恢复后无需复杂的数据恢复过程,只需重新开始接收心跳更新即可恢复正常服务。

3.和NameServer和Zookeeper的区别

Name Server和ZooKeeper的作用大致是相同的,从宏观上来看,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连,这就需要broker端连接所有的Name Server,运行数据的改动要发送到每一个Name Server来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了Name Server很轻量级,但是broker端就要做更多的东西了。

而ZooKeeper呢,broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成.

尽管 NameServer 和 Zookeeper 都承担着服务注册与发现、配置管理等职责,服务于分布式系统中,但它们之间存在一些根本性的区别,主要体现在以下几个方面:

1.设计目的和应用场景
Zookeeper:是一个分布式协调服务,设计初衷是为了解决分布式系统中的一致性问题,如选举Master节点、分布式锁、配置管理、组服务等。

它提供了强大的一致性保证(CP原则),常用于需要强一致性的分布式场景,如Hadoop、Kafka等

NameServer:是RocketMQ中特有的组件,专注于消息中间件的场景,主要负责Broker的注册与发现、Topic路由信息的管理

它的设计更加轻量级,侧重于提供高效的路由信息查询服务,牺牲了一定的一致性保证,追求更高的吞吐量和更低的延迟

2.数据模型
Zookeeper:提供了一个层次化的命名空间(ZNode树结构),允许在树中存储数据,并且支持监听机制,可以对节点变更进行监听,实现分布式通知

NameServer:数据模型相对简单,主要存储Broker信息和Topic路由信息,没有Zookeeper那样复杂的节点和数据结构,也不支持监听机制,数据更新通过Broker的心跳机制主动推送给NameServer

3.一致性模型
Zookeeper:实现了强一致性(CP),保证任何时候所有客户端看到的服务端数据都是一致的,这得益于其使用了ZAB协议,能够确保数据在所有节点间的一致性。

NameServer:不保证强一致性,而是追求高可用性和高吞吐量,它是无状态的,不维护持久化状态,数据存储在内存中,通过Broker定期发送心跳更新路由信息,因此在极端情况下可能会出现短暂的路由信息不一致。

4.可靠性与容错
Zookeeper:采用集群部署,通过投票选举Leader来保证高可用,即使部分节点故障,只要多数节点存活就能维持服务,但其复杂性也意味着运维成本相对较高。

NameServer:同样推荐集群部署以增强可用性,但因为其无状态特性,某个节点故障不会影响其他节点服务,客户端通常配置多个NameServer地址,某个节点失效后可以迅速切换到其他节点,故障恢复更快,运维相对简单。

5.资源消耗
Zookeeper:由于其强一致性保证和复杂的选举机制,资源消耗相对较高,尤其是在大型集群中。

NameServer:设计为轻量级服务,资源消耗较低,更适合大规模分布式消息系统中对资源敏感的场景。

RocketMQ选择自建NameServer而非直接使用Zookeeper,主要是为了更好地匹配消息队列的特定需求,比如追求更高的消息处理速度和更低的延迟,同时简化运维复杂度

4.NameServer的高可用保障

Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。

当然其高可用的保证除了心跳机制以及负载均衡外,由于NameServer 本身设计为无状态服务,所以RocketMQ鼓励部署多个实例形成集群,且实施有效的监控系统来跟踪 NameServer 的运行状态也是十分必要的,包括CPU、内存使用情况、网络状况以及服务响应时间等,一旦发现异常,立即触发告警,便于运维人员及时介入处理,预防潜在的故障

6.2 Broker

消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。

它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等

从如下的部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型:

  • Master 既可以写又可以读
  • Slave不可以写只可以读
    在这里插入图片描述

1.Broker部署方式

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的BrokerId来定义,BrokerId为0表Master,非0表示Slave。

Master也可以部署多个。

从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。

  • 单 Master

这种方式一旦 Broker 重启或宕机会导致整个服务不可用,这种方式风险较大,所以显然不建议线上环境使用。

  • 多 Master

所有消息服务器都是 Master ,没有 Slave 。这种方式优点是配置简单,单个 Master 宕机或重启维护对应用无影响。缺点是单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。

  • 多 Master 多 Slave(异步复制)

每个 Master 配置一个 Slave,所以有多对 Master-Slave,消息采用异步复制方式,主备之间有毫秒级消息延迟。

这种方式优点是消息丢失的非常少,且消息实时性不会受影响,Master 宕机后消费者可以继续从 Slave 消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多 Master 方式几乎一样。缺点是 Master 宕机时在磁盘损坏情况下会丢失极少量消息。

  • 多 Master 多 Slave(同步双写)

每个 Master 配置一个 Slave,所以有多对 Master-Slave ,消息采用同步双写方式,主备都写成功才返回成功。这种方式优点是数据与服务都没有单点问题,Master 宕机时消息无延迟,服务与数据的可用性非常高。缺点是性能相对异步复制方式略低,发送消息的延迟会略高。

2.高可用与负载均衡

Broker的高可用与负载均衡主要依赖于:NameServer和自动故障转移进行实现

  • NameServer

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

  • 自动故障转移

当主Broker故障时,消费者可以从NameServer获取最新的Broker状态信息,自动切换到从Broker进行消息消费,确保服务的连续性。

6.3 生产者(Producer)

也称为消息发布者,)是消息队列系统中的关键组件,负责创建并发送消息到消息服务器(Broker)

1.生产者类型

RocketMQ 提供了三种类型的生产者:

  • NormalProducer:标准生产者,用于发送常规消息,不保证消息的顺序性或事务性。
  • OrderProducer:顺序生产者,保证同一主题内消息的顺序性。适合那些对消息顺序有严格要求的应用场景。
  • TransactionProducer:事务生产者,用于支持分布式事务,确保消息发送与本地事务操作的原子性。在消息发送前执行预提交操作,并在事务成功或回滚后确认或撤销消息。

2.发送方式

生产者向brokers发送由业务应用程序系统生成的消息。RocketMQ提供了发送:同步、异步和单向(one-way)的多种范例。

  • 同步发送

同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。

  • 异步发送

异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

假如过一段时间检测到某个信息发送失败,可以选择重新发送。

  • 单向发送

单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

3.生产者组

生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。

从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群

4.启动流程

初始化:创建DefaultMQProducer实例,设置生产者组名、NameServer地址等配置。
连接NameServer:生产者会连接到NameServer集群,获取Topic的路由信息。
发送消息:生产者根据路由信息选择合适的Broker,通过网络通信客户端(如Netty)发送消息到Broker。
失败重试:如果消息发送失败,生产者会根据配置的重试策略进行自动重试。

3.高可用保障

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳

Producer完全无状态,可集群部署

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

6.4 消费者(Consumer)

也称为消息订阅者,负责从 Topic 接收并消费消息,它从Broker拉取消息或者由Broker推送消息给消费者,具体是拉还是推取决于所使用的消费模式

1.消费者类型

RocketMQ提供了两种类型的消费者客户端:

  • DefaultMQPushConsumer:推模式消费者。

这种消费者注册订阅关系后,Broker会根据订阅关系主动将消息推送给消费者。适用于大多数需要实时处理消息的场景。推模式下,消费者需要设置消息处理的回调函数,当有新消息到达时,RocketMQ会调用这个回调函数来处理消息。

  • DefaultMQPullConsumer:拉模式消费者。

这种消费者需要自己主动从Broker拉取消息。相较于Push模式,Pull模式提供了更多的控制权,比如消费者可以决定何时以及如何拉取消息,适用于需要更多控制逻辑或者对消息消费时机有特殊要求的场景。

2.消费者组(Consumer Group)

消费者组一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。

消费者组与生产者组类似,都是将相同角色的分组在一起并命名。

RocketMQ中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费
这样说可能不太直观,我们具体看一个例子:
在这里插入图片描述

上图中的消息就只能被A中的某一台机器消费,但是同时也可以被B中的某一台机器消费

3.高可用保障

与生产这一样,Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。

Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。

但是一旦master恢复,未同步过去的消息会被最终消费掉

7.RocketMq的运转流程

有了上面的知识储备, 下面我们看一下各角色之间完整的交互过程。
在这里插入图片描述

  • 1.由于NameServer是整个 RocketMQ 的 “大脑”,所以NameServer 先启动

NameServer 是轻量级的服务,负责管理和维护集群的元数据信息,包括 Broker 服务器的地址、Topic 路由信息等。启动 NameServer 实例后,它们之间并不进行数据同步,每个 NameServer 独立存储自己的元数据信息。

  • 2.NameServer启动后,Broker启动,并在启动时向 NameServer 注册

Broker 启动后会向所有 NameServer 注册,发送心跳信息,包括 Broker 的地址、服务状态以及它所承载的 Topic 信息。Broker 既可以是主 Broker 也可以是从 Broker,主从之间通过同步机制保证数据的一致性。

  • 3.生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送

生产者首先连接到 NameServer,获取到当前 Topic 可用的 Broker 信息。

生产者根据负载均衡策略选择一个 Broker,通过 Netty 建立连接,并发送消息到 Broker。

Broker 接收到消息后,会将其存储到 CommitLog,并根据消息的 Topic 和 Queue 信息,更新相应的 ConsumeQueue。

如果是事务消息,生产者还需要完成事务提交或回滚的流程,确保消息的最终一致。

  • 4.NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除

4.消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定

  • 消费者启动时,也会连接 NameServer 获取 Topic 路由信息,并选择一个 Broker 进行消费。
  • 根据消费模式(集群消费或广播消费),消费者从 Broker 拉取消息或接收 Broker 推送的消息。
  • 在集群消费模式下,消息只会被一个消费者消费;在广播消费模式下,消息会被所有订阅该 Topic 的消费者消费。
  • 消费者消费消息后,会根据消费结果(成功或失败)向 Broker 发送 ACK(确认),Broker 根据 ACK 更新消费位点。
  • 如果消费失败,消息可能根据配置的重试策略进行重试,或者最终进入死信队列。

8.常见的名词解释

消息
消息(Message)就是要传输的信息。一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 key 并在 Broker 上查找此消息以便在开发期间查找问题。

主题
主题(Topic)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个Topic 也可以被 0个、1个、多个消费者订阅。

标签
标签(Tag)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。

比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

简单来说TOPIC可以看作衣服,而TAG可以看作衣服下的【短袖】、【外套】、【卫衣】等

消息队列
消息队列(Message Queue),主题被划分为一个或多个子主题,即消息队列。一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。下图 Broker 内部消息情况:
在这里插入图片描述

消息消费模式
消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)

默认情况下就是集群消费,该模式一条消息只能被某一消费者组中的某一台机器消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

消息顺序
消息顺序(Message Order)有两种:顺序消费(Orderly)和并行消费(Concurrently)。

顺序消费表示消息消费的顺序和生产者为每个消息队列发送信息时候的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。

并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

9.设计理念

RocketMQ 的设计基于主题的发布与订阅模式(也叫观察者模式),这种一种十分常用的设计模式,如果不了解的可以移步至此处:《观察者模式》,其核心功能包括:消息发送消息存储(Broker)消息消费

整体设计追求简单与性能第一,主要体现在以下三个方面:

  • NameServer设计及其简单

RocketMQ摒弃了业界常用的zookeeper作为注册中心,而是使用自研的NameServer来实现元数据的管理,因为Topic路由信息无须在集群间保持强一致性,追求最终一致性,并且能容忍分钟级的不一致,所以RocketMQ的NameServer集群间互不通信,极大降低了设计的复杂度,降低了对网络的要求,提升性能。

  • 高效的IO存储机制

RocketMQ追求消息发送的高吞吐量,RocketMQ消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制。

所有主题的消息存储基于顺序写,提升写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。

  • 容忍存在的设计缺陷

适当将某些工作下放给RocketMQ使用者。消息中间件的实现者经常会遇到一个难题:
如何保证消息一定能被消息消费者消费,并且保证只消费一次。

RocketMQ的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,如果你们要用RocketMQ那么你们自己在消费端用逻辑实现只消费一次的功能,后面的实战章节中,我会具体讲解RocketMq幂等消费的控制

10.RocketMQ 架构

我们先看一个架构图:
在这里插入图片描述

上图中,展示了四个集群:NameServer 集群Broker 集群Producer 集群Consumer 集群,也就是我们上文中所提到的RocketMq四大核心组件对应的集群

10.1 NameServer 集群

提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer是一个功能齐全的服务器,主要包含两个功能:

  • Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活
  • 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息

10.2 Broker 集群

通过提供轻量级的 Topic 和Queue 机制处理消息存储。

同时支持推(Push)和拉(Pull)两种模型,包含容错机制

提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。

Broker 有几个重要的子模块:

  • 远程处理模块,Broker 入口,处理来自客户端的请求
  • 客户端管理,管理客户端(包括消息生产者和消费者),维护消费者的主题订阅
  • 存储服务,提供在物理硬盘上存储和查询消息的简单 API
  • HA 服务,提供主从 Broker 间数据同步
  • 索引服务,通过指定键为消息建立索引并提供快速消息查询

10.3 Producer 集群

消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。

10.4 Consumer 集群

消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。

提供了实时的消息订阅机制,可以满足大多数消费者的需求。

10.5 集群间交互方式的说明

1.Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync

2.每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有NameServer

3.Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳

4.Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和Broker Master和 Broker Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

10.6 RocketMq的消息

1.顺序消费

顺序消息(FIFO:First Input First Output)是一种严格按照顺序进行发布和消费的消息类型。要求消息的发布和消息消费都按照顺序进行,RocketMQ可以严格保证消息有序

RocketMQ可以严格的保证消息有序

但这个顺序,不是全局顺序,只是分区(queue)顺序

要全局顺序只能一个分区,但是同一条queue里面,RocketMQ的确是能保证FIFO的。

所以我们可以简单总结一下:RocketMQ 支持两种类型的顺序消费模式,以满足不同场景下对消息顺序性的要求:

1.全局顺序消费: 全局顺序消费是指对于一个特定的主题(Topic)下的所有消息,都能按照生产者发送的顺序被消费者消费。为了实现这一特性,RocketMQ要求:

所有消息必须发送到同一个队列(Queue)中。这意味着如果要保证全局顺序,Topic只能配置一个队列。

只能有一个消费者实例来消费这个队列中的消息,因为多消费者实例会破坏消息的顺序性。

全局顺序消费适用于对消息顺序有严格要求的场景,例如金融交易、订单处理等。

2.分区顺序消费(局部顺序消费): 分区顺序消费是在消息有序的基础上进行了折衷,允许消息在一定程度上保持顺序,同时又能够横向扩展消费能力。

具体来说:

消息根据某个业务逻辑上的Sharding Key(如订单ID)进行分区,相同Sharding Key的消息会被发送到同一个队列中。

消费者端可以配置多个实例,并且每个实例通过MessageQueueSelector选择器(例如根据消息的某个属性进行哈希)确保相关消息被同一个消费者实例消费,从而在每个队列内部保持消息的顺序。

这种方式在保证了相同分区内的消息顺序的同时,也允许不同分区的消息并行处理,提高了系统的处理能力

在实现顺序消费时,RocketMQ的消费者需要使用特定的API和配置,例如使用MessageListenerOrderly接口来实现顺序消费逻辑,确保消息按照预期的顺序被处理。

此外,RocketMQ通过在队列级别加锁等机制来确保同一时刻只有一个消费者线程能够从一个队列中消费消息,以此来维持消息的顺序性

2.消息过滤

消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。

RocketMQ消息过滤支持在服务端与消费端的消息过滤机制。

消息在Broker端过滤,Broker只将消息消费者感兴趣的消息发送给消息消费者。

消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从Broker传输到消费端

3.消息存储

消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:

  • 消息堆积能力
  • 消息存储性能

RocketMQ追求消息存储的高性能,引入内存映射机制,所有主题的消息顺序存储在同一个文件中。

同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制

RocketMQ 的消息存储机制是其高性能、高可靠性的核心之一,主要采用了文件系统存储方式,并设计了精心优化的数据结构来保证消息的高效存储和检索。以下是RocketMQ消息存储的关键组成部分和机制:

  • CommitLog

所有消息主体均存储在一个名为CommitLog的文件中,无论属于哪个主题。这种设计简化了消息的存储逻辑,提高了写入速度。

CommitLog文件默认大小为1GB,达到上限后会创建新的文件。消息以追加的方式顺序写入,利用了磁盘顺序写入的高效性。

这种存储方式确保了高吞吐量和低延迟,因为顺序写磁盘操作远比随机写入更快。

  • ConsumeQueue(消费队列)

为了加快消息的查询速度,RocketMQ为每个主题的每个消息队列创建了单独的ConsumeQueue文件。
ConsumeQueue存储了CommitLog中消息的逻辑偏移量、消息大小、消息在CommitLog中的物理偏移量等信息,形成了消息的索引。

这样,消费者在消费时不必直接扫描CommitLog,而是通过ConsumeQueue快速定位到消息在CommitLog中的位置,提升了消费效率。

  • IndexFile(索引文件)

RocketMQ还提供了基于关键字的索引功能,IndexFile用于存储消息的索引,使得可以通过消息的Key快速查找消息。

索引文件并非消息存储的必要部分,主要用于支持消息的按关键字查询,提高特定消息检索的速度。

  • 刷盘机制

RocketMQ支持同步刷盘异步刷盘两种模式

  • 同步刷盘保证了数据的绝对安全,但牺牲了一定的性能;
  • 异步刷盘则提供了更高的吞吐量,但数据安全性略低。

刷盘机制确保消息在内存中写入后,会根据配置策略及时持久化到磁盘,避免因异常导致数据丢失。

  • 主从复制

为了进一步增强消息的可靠性,RocketMQ支持Broker的主从复制,即消息不仅存储在主Broker上,还会复制到至少一个从Broker。

这样即使主Broker发生故障,也能从从Broker中恢复消息,确保消息的高可用性。

综上所述,RocketMQ的消息存储机制通过CommitLog、ConsumeQueue、IndexFile等多种文件结构和高效的刷盘策略,结合主从复制,既保证了消息的高性能存储与检索,也确保了消息的高可靠性和数据安全性。

4.消息高可用性

通常影响消息可靠性的有以下几种情况

  • 1.Broker正常关机。
  • 2.Broker异常宕机。
  • 3.操作系统宕机。
  • 4.机器断电,但是能立即恢复供电情况。
  • 5.机器无法开机(可能是CPU、主板、内存等关键设备损坏)。
  • 6.磁盘设备损坏。

针对上述情况,情况1,4的RocketMQ在同步刷盘机制下可以确保不丢失消息在异步刷盘模式下,会丢失少量消息。

情况5,6属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ能保证只丢失少量消息,RocketMQ在后续版本中将引入双写机制,以满足消息可靠性要求极高的场合。

消息到消费低延迟

RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

确保消息必须被消费一次(不是只消费一次!)

RocketMQ通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。

回溯消息

回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。

消息堆积

消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。

定时消息

定时消息是指消息发送到Broker后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意进度的定时消息,而只支持特定延迟级别。

消息重试机制

消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。

以上几种消息的处理在后续我们实际操作中都会一一给同学们展示。


参考文献:
《RocketMQ的消费模式》
《RocketMQ之(一)RocketMQ入门》
《RocketMQ(八)RocketMQ延时消息》
《RocketMQ介绍》
《消息中间件》
《RocketMQ 的基本概念和特性》
《RocketMQ特点和基本概念》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/593859.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ssm105基于JAVAEE技术校园车辆管理系统+jsp

校园车辆管理系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生,其可以帮助管理者在短…

泰克示波器电流探头如何抓浪涌电流波形?

泰克示波器是一种常见的电子测量仪器,广泛应用于电子工程、通信工程、医疗设备等领域。它的主要功能是实时显示电信号的波形,从而帮助工程师和技术人员分析和调试电路。而在一些特定的应用场景中,例如电源、电机、电器设备等,我们…

模型全参数训练和LoRA微调所需显存的分析

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

TypeScript 基础学习笔记:泛型 <T> vs 断言 as

TypeScript 基础学习笔记&#xff1a;泛型 <T> vs 断言 as &#x1f525; 引言 &#x1f44b; TypeScript (TS) 以其静态类型的魔力&#xff0c;让我们的代码更加健壮、易读且易于维护。今天&#xff0c;我们将深入探讨两个核心概念——泛型&#xff08;Generics&#x…

【华为】AC三层旁挂直接转发

【华为】AC三层旁挂直接转发 实验需求实验拓扑配置AC和AP二层通信ACLSW1LSW2AP2获取到的管理地址AP3获取到的管理地址 AP上线配置WLAN业务ACLSW1&#xff08;作DHCP地址池&#xff09;业务成功下发 访问公网&#xff08;NAT&#xff09;LSW1AR1 配置文档ACLSW1LSW2AR1ISP 实验需…

杭电acm1013 Digital Roots 数字根 Java解法 高精度

Problem - 1013 (hdu.edu.cn) 高精度算术模拟 开long没过想到开bI 开bl一次过 import java.math.BigInteger; import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);BigInteger i;while (!(i sc.nextB…

Docker新建容器 修改运行容器端口

目录 一、修改容器的映射端口 二、解决方案 三、方案 一、修改容器的映射端口 项目需求修改容器的映射端口 二、解决方案 停止需要修改的容器 修改hostconfig.json文件 重启docker 服务 启动修改容器 三、方案 目前正在运行的容器 宿主机的3000 端口 映射 容器…

【Python项目】基于时间序列的【大气污染预测系统】

技术简介&#xff1a;使用Python技术、B/S架构、MYSQL数据库等实现。 系统简介&#xff1a;本系统的主要使用角色为普通用户和管理员用户&#xff0c;两者的功能几乎是一致的&#xff0c;但管理员用户比普通用户多了用户管理的功能&#xff0c;可以对系统内的用户进行管理。普通…

Java IO编程必备:FilterInputStream类的原理与实现

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

如何构建进攻性的网络安全防护策略

进攻性安全&#xff08;Offensive security&#xff09;是指一系列主动安全策略&#xff0c;这些策略与恶意行为者在现实世界的攻击中使用的策略相同&#xff0c;区别在于其目的是加强而非损害网络安全。常见的进攻性安全方法包括红队、渗透测试和漏洞评估。 进攻性安全行动通…

C++入门 ——类和对象(二)

this指针 this指针的引出 我们先来定义一个日期类 Date class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout <<_year<< "-" <<_month << "-"<< _da…

基于springboot+vue+Mysql的自习室预订系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

Springboot工程创建

目录 一、步骤 二、遇到的问题及解决方案 一、步骤 打开idea,点击文件 ->新建 ->新模块 选择Spring Initializr&#xff0c;并设置相关信息。其中组为域名&#xff0c;如果没有公司&#xff0c;可以默认com.example。点击下一步 蓝色方框部分需要去掉&#xff0c;软件包…

牛客NC382 切割木头【中等 二分超找 Java/Go/C++】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/707d98cee255448c838c76918a702be0 核心 二分查找Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可…

SpringBoot指标监控

一.SpringBoot指标监控_添加Actuator功能 Spring Boot Actuator可以帮助程序员监控和管理SpringBoot应用&#xff0c;比如健康检查、内存使用情况统计、线程使用情况统计等。我 们在SpringBoot项目中添加Actuator功能&#xff0c;即可使用Actuator监控 项目&#xff0c;用法如…

Vue的项目启动指令分析

通过Vue CLI脚手架创建的项目&#xff0c;默认的启动项目方式是 npm run serve 这里的serve是可以修改的。 在创建的项目目录中&#xff0c;找到package.json 双击打开&#xff0c;找到scripts部分 在scripts部分&#xff0c;有一个"serve"键值对&#xff0c;这里的…

知乎23届数据分析校招A卷——笔记

1、and 和 or的并列运用[先看and] 条件1 OR 条件2 AND 条件3 执行顺序是先执行AND操作符&#xff08;先看条件2和3&#xff09;&#xff0c;再根据其结果判断是否需要执行OR操作符&#xff0c;并最终返回整个表达式的逻辑结果。 条件1 and 条件2 or 条件3 执行逻辑是先执行…

使用socket+Python实现ping

import os import socket import struct import select import time# 计算校验和&#xff0c;用于确保数据的完整性 def checksum(source_string):sum 0count 0max_count len(source_string)# 处理成对的字节while count < max_count - 1:val source_string[count 1] *…

nginx的前世今生(二)

书接上回&#xff1a; 上回书说到&#xff0c;nginx的前世今生&#xff0c;这回我们继续说 3.缓冲秘籍&#xff0c;洪流控水 Nginx的缓冲区是其处理数据传输和提高性能的关键设计之一&#xff0c;主要用于暂存和管理进出的数据流&#xff0c;以应对不同组件间速度不匹配的问题…

PG WAL日志理解

类似于oracle的redo log&#xff0c;用于数据库恢复&#xff0c;当一条SQL语句执行&#xff0c;PG会把对应的块放到缓冲区执行&#xff0c;&#xff0c;会写进WAL缓冲区会进行写操作&#xff0c;commit后&#xff0c;WAL writer进程进行写操作&#xff0c;把日志缓冲区WAL buff…