文章目录
- RocketMQ介绍
- RocketMQ架构:
- NameServer:
- Broker
- Producer
- Topic(主题):
- Queue(队列):
- Message(消息):
- RocketMQ的工作流程
- RocketMQ的使用场景
- 异步消息传递:
- 分布式事务消息:
- 日志收集与分析:
- 实时数据处理:
- 流量削峰与流量控制:
- 系统解耦:
- 为什么选择RocketMQ
- Apache Kafka:
- RabbitMQ:
- RocketMQ:
- ActiveMQ:
- RocKetMQ的消费模式:
- 集群消费(Clustering):
- 广播消费(Broadcasting):
- 顺序消费(Orderly Consumption):
- 如何保证消息的可用性/可靠性/不丢失呢?
- 1、生产阶段
- 2、存储阶段
- 3、消费阶段
- 如何保证消息不会被重复消费
- RocketMQ 怎么处理消息积压
- 死信队列
- 为什么RocketMQ不使用Zookeeper作为注册中心呢?
RocketMQ介绍
RocketMQ是一种开源的分布式消息传递平台,最初由阿里巴巴开发并于2012年开源。它是一个高性能、低延迟、高可靠性、可扩展性强的分布式消息中间件,旨在解决大规模分布式系统中的消息通信问题。RocketMQ具有以下特点:
1.高性能和低延迟: RocketMQ能够处理大量的消息,并以非常低的延迟进行消息传递,使其适用于高吞吐量和低延迟的场景。
2.高可靠性: RocketMQ采用了多种机制来保证消息的可靠性传递,包括消息持久化、消息复制和容错机制等,确保消息不会丢失。
3.可扩展性强: RocketMQ支持水平扩展,可以通过增加Broker节点来扩展消息处理能力,同时还支持动态增加和删除节点,方便根据业务需求调整集群规模。
4.丰富的特性: RocketMQ提供了丰富的特性,包括顺序消息、事务消息、消息过滤、消息轨迹等,满足了不同场景下的需求。
5.分布式架构: RocketMQ采用了分布式架构,包括Name Server、Broker和Producer/Consumer等组件,各组件之间通过网络通信实现协作。
6.社区活跃: RocketMQ拥有活跃的开发社区和用户社区,持续地进行功能更新和性能优化,同时提供了丰富的文档和示例,方便用户使用和扩展。
RocketMQ架构:
RocketMQ 一共有四个部分组成:NameServer,Broker,Producer 生产者,Consumer 消费者,它们对应了:发现、发、存、收,为了保证高可用,一般每一部分都是集群部署的。
NameServer
NameServer 是一个无状态的服务器,角色类似于 Kafka使用的 Zookeeper,但比 Zookeeper 更轻量。
特点:
NameServer:
每个 NameServer 结点之间是相互独立,彼此没有任何信息交互。
Nameserver 被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker,Consumer 也会定时从 NameServer 获取 Topic 的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 Topic 到 NameServer。
功能主要有两个:
1、和Broker 结点保持长连接。
2、维护 Topic 的路由信息。
Broker
消息存储和中转角色,负责存储和转发消息。
Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)。
单个 Broker 与所有的 Nameserver 保持着长连接和心跳,并会定时将 Topic 信息同步到 NameServer,和 NameServer 的通信底层是通过 Netty 实现的。
Producer
消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。
Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
RocketMQ 提供了三种方式发送消息:同步、异步和单向
同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
Topic(主题):
主题是消息的逻辑分类,生产者将消息发送到特定的主题,消费者则从特定的主题订阅消息。主题在 RocketMQ 中起到消息分类和路由的作用。
Queue(队列):
队列是主题的分区,每个主题可以分为多个队列,用于并行处理消息。队列在 RocketMQ 中用于提高消息处理的并发性能。
Message(消息):
消息是 RocketMQ 中的基本数据单元,由生产者产生并发送到主题,然后由消费者订阅并处理。消息通常包含标识符、消息体和其他元数据信息。
RocketMQ的工作流程
消息队列(MQ)的工作流程通常包括以下几个关键步骤:
1.生产者发送消息: 生产者将消息发送到消息队列中,消息可以是任意格式的数据,通常包括消息内容和相关的元数据(如消息标识、优先级等)。
2.消息队列存储消息: 消息队列接收到生产者发送的消息后,将消息存储在内部的队列或者主题中。存储的方式可以是持久化存储或者内存存储,具体取决于消息队列的实现方式和配置。
3.消费者订阅消息: 消费者从消息队列中订阅感兴趣的消息,可以订阅一个或多个主题或者队列。一旦有新消息到达,消息队列会将消息推送给已经订阅的消费者。
4.消息队列传递消息: 当有消息到达时,消息队列会将消息传递给订阅了相应主题或队列的消费者。消息可以按照一定的策略进行传递,如轮询、广播或者基于订阅关系等。
5.消费者处理消息: 消费者接收到消息后,进行相应的处理逻辑。处理逻辑可以是业务逻辑、数据处理、通知等。处理完成后,消费者通常会向消息队列发送确认消息,表示消息已经成功处理。
6.消息确认与回馈: 消费者在处理完消息后,向消息队列发送确认消息(ACK)。消息队列根据收到的确认消息,更新消息的状态,并根据需要进行消息的删除、重发或者转发等操作。如果消费者长时间未发送确认消息,消息队列可能会将消息重新投递给其他消费者或者进行相应的处理。
7.监控与管理: 消息队列通常提供监控和管理功能,用于监控系统的运行状态、消息的流量和延迟情况,并提供管理界面和命令行工具,用于管理队列、主题、消费者和权限等。
RocketMQ的使用场景
异步消息传递:
RocketMQ 可用于实现系统之间的异步消息传递。例如,当某个系统产生了一条重要的事件或数据更新时,可以将消息发送到 RocketMQ 中,其他系统可以异步地订阅并处理这些消息,从而实现系统解耦和异步通信。
分布式事务消息:
RocketMQ 提供了分布式事务消息的支持,可以保证消息的原子性和一致性。在分布式事务场景中,可以使用 RocketMQ 发送事务消息,然后由事务消费者对消息进行处理,确保在事务成功提交后消息被消费,从而实现分布式事务的可靠性。
日志收集与分析:
RocketMQ 可用于日志收集和分析,例如在大规模分布式系统中收集和处理日志数据。应用程序可以将日志消息发送到 RocketMQ 中,然后使用日志消费者将日志数据传输到日志存储系统(如 Hadoop、Elasticsearch 等)进行分析和检索。
实时数据处理:
RocketMQ 可以作为实时数据处理系统的消息中间件,用于传输和处理实时数据流。例如,在电子商务平台中,可以使用 RocketMQ 实时传输和处理用户行为数据,从而实现个性化推荐、实时监控等功能。
消息通知与广播:RocketMQ 可用于消息通知和广播,例如在电商平台中,可以使用 RocketMQ 发送订单状态更新通知给用户,以及发送促销活动消息给用户。
流量削峰与流量控制:
RocketMQ 可以用于流量削峰和流量控制,例如在高并发情况下,可以使用 RocketMQ 缓存和平滑处理突发的请求,以保护后端系统不受过载的影响。
系统解耦:
引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据……引入消息队列之后,可以把订单完成的消息丢进队列里,下游服务自己去调用就行了,这样就完成了订单服务和其它服务的解耦合。
为什么选择RocketMQ
Apache Kafka:
Kafka是一个高吞吐量、分布式、持久化的消息队列系统,主要用于大数据实时处理和日志收集等场景。
优点:高吞吐量、持久化存储、水平扩展能力强。
缺点:不支持消息顺序传递、延迟相对较高。
RabbitMQ:
RabbitMQ是一个开源的消息代理软件,实现了AMQP协议,主要用于企业级应用。
优点:支持多种消息传递模式、消息确认机制、可靠性高。
缺点:性能相对较低、扩展性不如Kafka。
RocketMQ:
RocketMQ是一个开源的分布式消息中间件,特点是高可靠性、低延迟和高吞吐量。
优点:高可靠性、低延迟、高吞吐量、支持丰富的特性。
缺点:相对于Kafka,扩展性稍逊一筹。
ActiveMQ:
ActiveMQ是一个流行的开源消息代理软件,支持多种传输协议和消息协议。
优点:功能丰富、易于使用、社区活跃。
缺点:性能较差、可靠性不如RabbitMQ和RocketMQ。
一个中间件的使用,应该根据自己的业务类型和场景,来选择适配的中间件,没有最好的中间件,只有最适配业务的中间件。
RocKetMQ的消费模式:
集群消费(Clustering):
集群消费模式下,多个消费者实例共同消费同一个主题的消息,每条消息只被消费一次。
消费者实例通过负载均衡的方式从消息队列中获取消息,每个消息只被其中一个消费者实例消费。
集群消费,怎么能够保证消息只能被消费者消费一次,而不是多次消费:
RocketMQ使用消费确认机制来确保消息只被消费一次。在消费者消费消息后,会向消息服务器发送消费确认请求,确认消息已经被处理。
消费者消费消息后,RocketMQ会将消息的消费进度(消费偏移量)保存在消息服务器上。这样,即使消费者实例重启或者发生故障,也能够通过消费进度来恢复之前的消费状态。
广播消费(Broadcasting):
RocketMQ通过广播消费模式来保证消息被所有消费者实例都处理,以此来实现广播消息的功能。在RocketMQ中,广播消费模式下的消息会被所有订阅了该主题的消费者实例都消费,而不会像集群消费模式那样只被其中一个消费者实例消费。以下是RocketMQ如何保证广播消息的实现方式:
消息发送:
生产者向RocketMQ服务器发送广播消息,将消息发送到指定的主题(Topic)中。
RocketMQ服务器会将消息保存在各个消息队列中,等待消费者实例来消费。
消费者订阅:
消费者实例通过指定消费者组(Consumer Group)来订阅特定的主题。
当消费者实例启动时,会向RocketMQ服务器发送订阅请求,表示它要订阅该主题的广播消息。
消息消费:
RocketMQ服务器收到消费者实例的订阅请求后,会将该主题的广播消息发送给所有订阅了该主题的消费者实例。
每个消费者实例独立地消费消息,不会像集群消费模式那样只被其中一个消费者实例消费。
消费者实例通过消费确认机制来确认消息已经被处理,以便RocketMQ服务器可以更新消息的消费进度。
顺序消费(Orderly Consumption):
顺序消费模式用于保证消息按照一定的顺序进行处理,即消息队列中的消息顺序和消息在应用程序中的处理顺序一致。
RocketMQ通过消息队列和分区来保证消息的顺序性,确保同一个分区的消息按照顺序被消费。
RocketMQ 通过一些机制来确保顺序消费:
1.单个队列保证顺序: 在RocketMQ中,消息是按照队列进行分区存储的,因此,如果你发送的消息都发送到同一个队列中,那么消费者就可以保证按照发送的顺序来消费消息。
2.顺序消息发送: 如果你的应用程序需要发送顺序消息,你可以指定消息的 key 字段。RocketMQ 将根据消息的 key 值,将具有相同 key 的消息发送到同一个队列中,从而保证消费者能够按照 key 的顺序消费消息。
3.顺序消费者: 在 RocketMQ 中,你可以为某个 Topic 的某个 Consumer Group 创建顺序消费者。顺序消费者会保证按照消息的存储顺序来消费消息,这样就可以确保消息的顺序性。
4.消息队列锁定: RocketMQ 支持通过消息队列锁定的方式来确保顺序消费。这种方式下,只有一个消费者可以消费某个队列的消息,从而保证了消息的顺序性。
如何保证消息的可用性/可靠性/不丢失呢?
保证消息的可用性,可靠新,不丢失我们要从三个阶段入手:
1、生产阶段
请求确认机制:消息队列的客户端会把消息发送到 Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
消息重试机制: 如果消息发送失败或者消费失败,RocketMQ 会自动进行消息重试。在消息发送失败时,RocketMQ 会根据配置的重试次数和时间间隔进行重试;在消费者消费失败时,RocketMQ 会将消息重新投递给其他消费者进行消费。
消息事务: RocketMQ 提供了消息事务机制,允许应用程序发送半消息,然后根据业务逻辑决定是否要提交或回滚消息。这种方式可以确保消息在发送和消费之间的一致性,从而提高消息的可靠性。
2、存储阶段
消息持久化: RocketMQ 默认将消息持久化到磁盘上,确保即使在消息发送后,即使 Broker 宕机,消息也不会丢失。这种持久化机制保证了消息的可靠性。
数据备份和故障转移: RocketMQ 支持将消息数据进行备份,并且在节点发生故障时能够进行快速的故障转移,从而提高了消息的可用性。
同步复制: RocketMQ 支持主从架构,消息会被同步复制到多个 Broker 节点上,这样即使主节点宕机,备份节点也可以继续提供服务,确保消息的可用性。
3、消费阶段
消息 ACK 机制: RocketMQ 的消费者在消费消息后需要发送 ACK 给 Broker,告诉 Broker 消息已经被成功消费。如果消费者长时间没有发送 ACK,Broker 会认为消息没有被成功消费,将消息重新投递给其他消费者,从而保证消息不会丢失。
如何保证消息不会被重复消费
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的“有且仅有一次” 。 RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等和消息去重。
业务幂等:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。这个都是从数据库唯一索引来做处理的。
消息去重:对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。
具体做法是:一般会用redis来做临时插入的处理,根据业务设置一个过期时间,每次去做处理,都判断号是否存在key,如果存在就不做消费处理。如果不存在就做消费处理,并且消费完成后,将key存到redis,并且设置好过期时间。
RocketMQ 怎么处理消息积压
1.消费者扩容:如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
2.消息迁移Queue扩容:如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。
3.优化消息消费逻辑: 仔细审查消费者的逻辑,确保消费者能够高效地处理消息。可能存在一些效率低下的处理步骤,例如数据库查询、网络请求等,可以通过优化这些操作来提高消息处理速度。
4.调整消息处理策略: 考虑调整消息的处理策略,例如增加消息处理的并发度、优化消息处理的顺序等。有时候可以牺牲一定的顺序性以换取更高的处理速度。
5.增加队列或者分区: 如果系统中的消息队列容量有限,可以考虑增加队列或者分区来分散消息的负载。这样可以避免单个队列过载而导致消息积压。
6.消息重试策略: 如果消息处理失败或者超时,可能会导致消息积压。可以考虑调整消息的重试策略,例如增加重试次数、增加重试间隔等,以降低消息处理失败的概率。
7.限流和流量控制: 实施流量控制策略,限制消息的产生速率,以免超过系统处理能力。可以根据系统的负载情况,动态调整流量控制策略,避免消息积压。
8.监控和报警: 部署监控系统,实时监控消息队列的状态和负载情况。设置合适的报警规则,及时发现消息积压问题并采取相应的措施进行处理。
9.数据清理和整理: 定期清理和整理系统中的过期数据和无效数据,释放系统资源,减少消息队列的负载压力。
死信队列
死信队列是消息队列中的一种机制,用于处理无法被消费者正常消费的消息。当消息无法被消费者消费时,这些消息会被发送到死信队列中,而不是被丢弃或者无限期地保留在原始队列中。
死信队列通常用于以下几种情况:
1.消息消费失败: 当消息被消费者消费失败,例如消费者抛出异常或者超时未处理等情况时,这些消息可以被发送到死信队列中,以便后续进行处理或者分析失败原因。
2.消息过期: 如果消息设置了过期时间,在消息过期后仍然没有被消费者消费,则可以被发送到死信队列中。
3.消息重试次数达到上限: 当消息在一定次数的重试后仍然无法被消费者成功消费时,可以将这些消息发送到死信队列,防止无限制地重试消费。
实现死信队列的方式因消息队列的具体实现而有所不同。一般来说,消息队列系统会提供配置死信队列的功能,例如 RabbitMQ、Kafka 和 RocketMQ 都支持死信队列的配置。
在使用死信队列时,需要注意以下几点:
1.死信队列需要单独创建并配置,以便将无法被消费者正常消费的消息发送到该队列中。
2.需要合理设置死信队列的 TTL(Time to Live)和消费者消费死信队列中的消息的逻辑,以确保及时处理死信消息,避免资源浪费和系统负载过高。
3.对于进入死信队列的消息,可以进行监控和分析,以找出系统中存在的问题,并采取相应的措施进行修复。
为什么RocketMQ不使用Zookeeper作为注册中心呢?
1.基于可用性的考虑,根据CAP理论,同时最多只能满足两个点,而Zookeeper满足的是CP,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
2.基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
3.持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
4.消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。