RocketMQ
- 1、基础入门
- 1、消息中间件(MQ)的定义
- 2、为什么要用消息中间件?
- 2、RocketMQ 产品发展
- 1、RocketMQ 版本发展
- 2、RocketMQ 的物理架构
- 1、核心概念
- 2、物理架构中的整体运转
- 3、RocketMQ 的概念模型
- 1、分组(Group)
- 2、主题(Topic)
- 3、标签(Tag)
- 4、消息队列(Message Queue)
- 5、偏移量(Offset)
- 4、消息发送的方式
- 5、消息消费的方式
- 6、顺序消息
- 1、顺序消息生产
- 2、顺序消息消费
- 7、延时消息
- 1、概念介绍
- 2、适用场景
- 3、使用方式
- 8、事物消息
- 1、事物消息状态
- 2、使用场景
- 3、使用限制
- 9、分布式事务
1、基础入门
1、消息中间件(MQ)的定义
其实并没有标准定义。一般认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。
高效:对于消息的处理处理速度快。
可靠:一般消息中间件都会有消息持久化机制和其他的机制确保消息不丢失。
异步:指发送完一个请求,不需要等待返回,随时可以再发送下一个请求,既不需要等待。
一句话总结,消息中间件不生产消息,只是消息的搬运工。
2、为什么要用消息中间件?
1、应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息中间件,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,继续处理存放在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
2、流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
互联网公司的大促场景(双十一、店庆活动、秒杀活动)都会使用到 MQ。
3、数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
接口调用的弊端,无论是新增系统,还是移除系统,代码改造工作量都很大。
使用 MQ 做数据分发好处,无论是新增系统,还是移除系统,代码改造工作量较小。
所以使用 MQ 做数据的分发,可以提高团队开发的效率。
2、RocketMQ 产品发展
1、RocketMQ 版本发展
Metaq1.x 是 RocketMQ 前身的第一个版本,本质上把 Kafka 做了一次 java 版本的重写(Kafka 是 sacla)。
Meta2.x,主要是对存储部分进行了优化,因为 kafka 的数据存储,它的 partition 是一个全量的复制,在阿里、淘宝的这种海量交易,Kafka这种机制的横向拓展是非常不好的。2012 年阿里同时把 Meta2.0 从阿里内部开源出来,取名 RocketMQ,同时为了命名上的规范(版本上延续),所以这个就是 RocketMQ3.0。
现在 RocketMQ 主要维护的是 4.x 的版本,也是大家使用得最多的版本,2017 年从 Apache 顶级项目毕业。
2、RocketMQ 的物理架构
消息队列 RocketMQ 是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双 11 使用的核心产品。
RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。
1、核心概念
1、NameServer
NameServer 是整个 RocketMQ 的“大脑”,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
Broker 在启动时向所有 NameServer 注册(主要是服务器地址等),生产者在发送消息之前先从 NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。
NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。这样就可以实现 RocketMQ 的高可用。
2、生产者(Producer)
生产者:也称为消息发布者,负责生产并发送消息至 RocketMQ。
3、消费者(Consumer)
消费者:也称为消息订阅者,负责从 RocketMQ 接收并消费消息。
4、消息(Message)
消息:生产或消费的数据,对于 RocketMQ 来说,消息就是字节数组。
5、主机(Broker)
RocketMQ 的核心,用于暂存和传输消息。
2、物理架构中的整体运转
- NameServer 先启动;
- Broker 启动时向 NameServer 注册;
- 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker 进行消息发送。
- NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。
- 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中订阅消息,订阅规则由 Broker配置决定。
3、RocketMQ 的概念模型
1、分组(Group)
生产者:标识发送同一类消息的 Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息:事务消息中如果发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该 commit 还是 rollback。
消费者:标识一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。同一个 Consumer Group 下的各个实例将共同消费 topic的消息,起到负载均衡的作用。
消费进度以 Consumer Group 为粒度管理,不同 Consumer Group 之间消费进度彼此不受影响,即消息 A 被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
2、主题(Topic)
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定 Topic。
区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息。
3、标签(Tag)
RocketMQ 支持给在发送的topic 打 tag,同一个 topic 的消息虽然逻辑管理是一样的。但是消费 topic1 的时候,如果你消费订阅的时候指定的是 tagA,那么 tagB 的消息将不会投递。
4、消息队列(Message Queue)
简称 Queue 或 Q。消息物理管理单位。一个 Topic 将有若干个 Q。若一个 Topic 创建在不同的 Broker,则不同的 broker 上都有若干 Q,消息将物理地存储落在不同 Broker 结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对 Q 级别。例如 Producer 发送消息的时候,会预先选择(默认轮询)好该 Topic 下面的某一条 Q发送;Consumer 消费的时候也会负载均衡地分配若干个 Q,只拉取对应 Q 的消息。
每一条 message queue 均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
5、偏移量(Offset)
RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的 offset。不指定的话,就是指 Message Queue 下面的 offset。
Message queue 是无限长的数组。一条消息进来下标就会涨 1,而这个数组的下标就是 offset,Message queue 中的 max offset 表示消息的最大 offset。
Consumer offset 可以理解为标记 Consumer Group 在一条逻辑 Message Queue 上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息 offset+1,即实际上表示的是下次拉取的 offset 位置。
4、消息发送的方式
5、消息消费的方式
1、集群模式:适用场景&注意事项
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
2、广播模式:适用场景&注意事项
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
6、顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个 queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
1、顺序消息生产
使用顺序消息:首先要保证消息是有序进入 MQ 的,消息放入 MQ 之前,对 id 等关键字进行取模,放入指定 messageQueue,consume 消费消息失败时,不能返回 reconsume——later,这样会导致乱序,应该返回 suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
2、顺序消息消费
7、延时消息
1、概念介绍
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
2、适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
3、使用方式
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源)
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认msg.setDelayTimeLevel(3)代表延迟 10 秒:
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java
8、事物消息
其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
正常事务流程
(1) 发送消息(half 消息):图中步骤 1。
(2) 服务端响应消息写入结果:图中步骤 2。
(3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
(4) 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4。
事务补偿流程
(1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
(2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
(3) 根据本地事务状态,重新 Commit 或者 Rollback:图中步骤 6。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
1、事物消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
1、 TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成上方图中了 1,2,3,4 步,第 4 步是 Commit)。
2、 TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成上方图中了 1,2,3,4 步, 第 4 步是 Rollback)。
3、 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成上方图中了 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。
2、使用场景
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?
可以使用 RocketMQ 的分布式事务保证在下单失败后系统数据的完整性
3、使用限制
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者ID 查询到消费者。
9、分布式事务
业务场景:用户 A 转账 100 元给用户 B,这个业务比较简单,具体的步骤:
1、用户 A 的账户先扣除 100 元
2、再把用户 B 的账户加 100 元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中,因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
消息中间件的方式,把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到消息中间件;加钱业务订阅“扣款成功消息”,再对用户 B 加钱(系统怎么知道给用户 B 加钱呢?是消息体里面包含了源账户和目标账户 ID,以及钱数)
场景一:先扣款后向 MQ 发消息
先扣款再发送消息,万一发送消息失败了,那用户 B 就没法加钱。
场景二:先向 MQ 发像消息,后扣款
扣款成功消息发送成功,但用户 A 扣款失败,可加钱业务订阅到了消息,用户 B 加了钱。
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
RocketMq 消息中间件把消息分为两个阶段:半事务阶段和确认阶段阶段。
半事务阶段:
该阶段主要发一个消息到 rocketmq,但该消息只储存在 commitlog 中,但 consumeQueue 中不可见,也就是消费端(订阅端)无法看到此消息。
commit/rollback 阶段(确认阶段):
该阶段主要是把 prepared 消息保存到 consumeQueue 中,即让消费端可以看到此消息,也就是可以消费此消息。如果是 rollback 就不保存。
整个流程:
1、A 在扣款之前,先发送半事务消息
2、发送预备消息成功后,执行本地扣款事务
3、扣款成功后,再发送确认消息
4、B 消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱。
注意:上面的确认消息可以为 commit 消息,可以被订阅者消费;也可以是 Rollback 消息,即执行本地扣款事务失败后,提交 rollback 消息,即删除那个预备消息,订阅者无法消费。
异常 1:如果发送半事务消息失败,下面的流程不会走下去;这个是正常的。
异常 2:如果发送半事务消息成功,但执行本地事务失败;这个也没有问题,因为此预备消息不会被消费端订阅到,消费端不会执行业务。
异常 3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户 A 扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMq 如何解决上面的问题,核心思路就是【事务回查】,也就是 RocketMq 会定时遍历 commitlog 中的半事务消息。
异常 3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为 RocketMq 会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送 commit 确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常 2 也解决了,如果本地事务没有执行成功,RocketMQ 回查业务,发现没有执行成功,就会发送 RollBack 确认消息,把消息进行删除。
同时还要注意的点是,RocketMQ 不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把 A已经处理完的业务进行回退)
什么是幂等性?
对于消息接收端的情况,幂等的含义是采用同样的输入多次调用处理函数,得到同样的结果。例如,一个 SQL 操作: update
stat_table set count= 10 where id =1 这个操作多次执行,id 等于 1 的记录中的 count
字段的值都为 10,这个操作就是幂等的,我们不用担心这个操作被重复。 再来看另外一个 SQL 操作: update stat_table
set count= count +1 where id= 1; 这样的 SQL 操作就不是幂等的,一旦重复,结果就会产生变化。
常见办法
- MVCC:
多版本并发控制,乐观锁的一种实现,在生产者发送消息时进行数据更新时需要带上数据的版本号,消费者去更新时需要去比较持有数据的版本号,版本号不一致的操作无法成功。例如博客点赞次数自动+1 的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一个 version 只有一次执行成功的机会,一旦失败了生产者必须重新获取数据的最新版本号再次发起更新。 - 去重表:
利用数据库表单的特性来实现幂等,常用的一个思路是在表上构建唯一性索引,保证某一类数据一旦执行完毕,后续同样的请求不再重复处理了(利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。)
以电商平台为例子,电商平台上的订单 id 就是最适合的 token。当用户下单时,会经历多个环节,比如生成订单,减库存,减优惠券等等。每一个环节执行时都先检测一下该订单 id 是否已经执行过这一步骤,对未执行的请求,执行操作并缓存结果,而对已经执行过的 id,则直接返回之前的执行结果,不做任何操作。这样可以在最大程度上避免操作的重复执行问题,缓存起来的执行结果也能用于事务的控制等。
如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张 Transaction 表,将业务表和 Transaction 绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction 中应当已经记录该TransactionId 的状态为「已完成」。当 RocketMq 事务回查时,只需要检查对应的 TransactionId 的状态是否是「已完成」就好,而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般 A 与 B 需要进行手动对账,手动补偿。