消息队列在分布式架构的作用
消息队列:在消息的传输过程中保存消息的容器,生产者和消费者不直接通讯,依靠队列保证消息的可靠性,避免了系统间的相互影响。
主要作用:
- 业务解耦
- 异步调用
- 流量削峰
业务解耦
将模块间的 RPC调用改为通过消息队列中转,解除系统间的耦合。
- 提升系统稳定性(自身模块不受其他模块影响)
- 通过广播消息避免多次调用(调用下游多次改为下游自己订阅,自己只发一次消息通知)
- 提高编码效率(不用关注下游代码更改)
异步调用
对于无需关注调用结果的场景,可以通过消息队列异步处理。
下单链路的通知卖家以及数据记录都可以改为异步流程,不影响主流程。
下单处理原逻辑:风控拦截->扣减库存->通知卖家->记录数据
下单优化后流程:风控拦截->扣减库存->消息异步通知卖家->消息异步记录数据
流量削峰
系统的吞吐量往往取决于底层存储服务的处理能力,数据访问层可以调整消费速度缓解存储服务压力,避免短暂的高峰将系统压垮。
主流消息队列选型对比
- Kafka:系统间的数据流通道
- RocketMQ:高性能可靠消息传输
- RabbitMQ:可靠消息传输
RocketMQ深入剖析
RocketMQ功能特性
- 支持事务型消息
- 支持延时消息
- 支持消息重发
- 支持consumer端tag过滤
- 支持消息回放
RocketMQ拓扑图
- NameServer: 类似注册中心的定位。他们之间完全独立,互相没有交互。所有的broker把自己的信息注册到所有的NameServer上(IP、key-cluser-value-[brokers]、key-broker-value-主从节点、key-topic-value-[brokers],可以后面把这个结构debug一下放到这里todo)。
- Broker:用于消息存储
- Producer:生产者,投递消息到broker上。从NameServer做服务发现。
- Consumer:消费者,从broker上消费消息。从NameServer做服务发现。
RocketMQ架构
- Broker主从部署,自身注册信息在NameServer中
- Client(Producer、Consumer)从NameServer中获取Broker信息
- NameServer节点相对独立,无数据交互
RocketMQ消息存储
- CommitLog:存储消息主体
- 所有topic的消息都写在CommitLog文件中
- 顺序写、随机读,当某几个topic的消费速度比较快的时候数据就相对比较离散,怎么去优化?
- ConsumeQueue:消息消费队列
- dispatch线程将CommitLog里的消息按主题-topic分发到ConsumeQueue队列里(ConsumeQueue也是物理文件,一个topic可能对应多个ConsumeQueue队列,注1:这里的负载均衡是Producer做的,每个消息都会带上要存储的队列序号。注2:这里只存储commitLog的偏移量-commitLogOffset和大小msgSize,实际消费时还是去CommitLog里拿)
- maxOffset:下次消息往这里写。
- cusomerOffset:消费者当前消费到的位置。
- minOffset:rocketMQ使用这个字段保证消息不丢。consumer消费每次会拉一批数据,拉完之后就会更新cusomerOffset,但是minOffset到cusomerOffset之间还存在没消费完的。当consumer成功把所有拉去的数据消费完之后,会给broker发一个ack,此时minOffset才会移动到上次拉走数据的位置。这样能保证拉走的数据都能成功消费完。比如consumer拉了10条数据,但是只消费了9条,然后cosumer端服务挂了重启之后就会从minOffset再去拉数据。
- IndexFile:消息索引文件
优化手段
RocketMQ应对CommitLog随机读顺序写的性能优化
- CommitLog切分,默认1G
- MMap提升文件访问性能
- 磁盘升级SSD
消息存储流程
broker消息接收入口
- 获取主题信息
- 获取队列信息
- 延时消息,事务消息处理
CommitLog写入
- 获取CommitLog文件
- 判断CommitLog是不是满了,满了就获取一个新的CommitLog(对应代码mapedFile)
- 追加写入
- 写入不成功则创建新文件
- 刷盘
- 复制
Dispatch消息分发
- 获取消息类型,如延时消息、事务消息
- 普通消息则获取消息信息
- topic、队列id、大小等
- 通过主题和队列id找到队列
- 往队列里写值
特性分析
- 可靠性分析
- 可用性分析
- 生产、消费方式
- 负载均衡
可靠性分析
可靠性的问题主要是存的数据会不会丢? 按需配置
单机情况下的优化
- 同步刷盘
- 性能低,可靠性高
- 异步刷盘
- 性能高,可靠性低
- 性能高,可靠性低
集群下的优化
这里主要是主从之间的数据同步
- 异步复制
- 生产者发送消息至master后,master就会反馈给生产者消息发送成功。master会单独开启一个线程,将数据copy到slave中。这样造成的风险就是如果master挂了,而数据还没有copy到slave节点,就会造成数据的丢失。
- 同步双写
- 生产者发送消息至master节点后,master节点copy数据到slave节点。当copy成功时,master节点才会告诉生产者,消息发送成功。这样保证了数据的备份,但是会影响性能。为了高可用,还是要选择这种模式,因为选用异步模式,根本没解决多master模式所造成的问题。
最可靠的方式就是 同步刷盘+同步双写。一般情况下用异步刷盘、异步复制就够了。
可用性分析
这里需要引入RocketMQ集群架构。
公司目前使用rocketMQ,raft协议Dledger搭建集群。
RocketMQ Dledger高可用集群的Leader消息写入是同步刷盘的方式。在Dledger中,当Leader节点收到消息后,会首先将消息写入本地磁盘并进行同步刷盘,以保证数据的可靠性和一致性。只有当消息被成功写入本地磁盘后,Leader节点才会将消息发送给Follower节点进行复制。
而在Dledger中,Leader和Follower之间的同步是异步复制的方式。当Leader节点将消息写入本地磁盘并进行同步刷盘后,会异步地将消息发送给Follower节点进行复制。这样可以提高消息发送的性能和吞吐量。
普通集群:
- Master
- 主节点,可以进行读和写操作。
- Slave
- 从节点,只可以读,不进行写操作。
也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。
Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个 Broker 是 Master,大于 0 表明这个 Broker 是 Slave,同时 brokerRole 参数也会说明这个 Broker 是 Master 还是 Slave。
- 主从模式Master宕机
- Broker可读不可写
- 集群搭建方式
- 单Master模式(线上一般不考虑)
- 多Master模式
- 优点:多master集群,一个topic在每个master中都有,相当于对topic进行了横向扩展。当有很多生产者往topic中发送消息时,可以负载到多个master节点上,提高写入数据的效率。
- 缺点:如果某个master宕机,则这个master上的数据将不可用。根本原因还是没有对master上的数据进行主从备份的原因。多个master节点各自存着自己的数据,不会相互备份。如当主1挂了、主2还在时,主2还可以投递,但是主1没消费完的消息就丢失了,直到主1恢复以后才能继续消费。
- 多Master模式多Slave模式-异步复制
- 多Master模式多Slave模式-同步双写
RcoketMQ普通集群与Dledger高可用集群
普通集群:
这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。
Dledger高可用集群:
Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。Dledger技术做的事情:
- 接管Broker的CommitLog消息存储,普通集群是由broker自己去写CommitLog消息存储,现在是由Dledger去写CommitLog消息存盘
- 从集群中选举出master节点,Dledger最主要的功能
- 完成master节点往slave节点的消息同步
Dledger是怎样进行leader选举的呢?
在探究这个问题之前,我们先要明确一件事情,Dledger使RocketMQ集群的每个节点都有三个状态:
- **Leader 主节点:**主要用于接收客户端请求
- **Follower 从节点 :**主要用于Leader节点的数据备份,当有客户端请求到Follower时,Follower节点会把请求转到Leader节点
- **Candidate 候选者节点:**只有Candidate状态的节点才会参与主节点的选举。
原文链接:https://blog.csdn.net/qq_45076180/article/details/113775278
生产方式
- 同步(sync)
- 投递完后等待MQ回ACK 生产者一般都用同步的方式去发消息
- 异步(async)
- 投递过去后不关注结果
- 单向(oneway)
- 投递过去MQ不用回包
消息消费
消息消费一般分Pull和Push的两种消费方式,两种方式都有各自的特点,同时也会带来不同的影响。
- PUSH
- 消息队列主动地将消息推送给消费者。
- 优点:消息实时性高。
- 缺点:没有考虑到客户端的消费能力,导致客户端消息堆积。
- PULL
- 由消费者客户端主动向消息队列拉取消息。
- 优点:客户端按消费能力拉取消息。
- 缺点:消息实时性低,可能造成大量无效请求。
RocketMQ消费模式
RocketMQ的消费方式基于拉模式,但是使用了一种长轮询机制,来平衡上面Push/Pull模型的各自缺点。
- LongPoll
- Consumer发送拉取消息
- Broker hold住请求,直到有新消息再返回(类似一个push的动作)
- 请求超时,Consumer再次发起请求
- 请求超时时间默认30s
消息消费方式
- 集群消费
- 集群内竞争消费,单条消息只消费一次
- 广播消费
- 各集群消费全量消息,单条消息在每个集群都会被消费一次
- 集群内广播消费
- 集群内每个消费者消费一次消息
- 大部分场景下都是集群内竞争,集群间广播
负载均衡
- Producer端负载均衡
- Consumer端负载均衡
Producer端负载均衡
- 定时获取Queue信息
- 从NameServer定时获取队列信息
- 负载均衡算法:随机递增取模(轮询)
- 按照队列数量轮询分发
- 容错机制:故障延迟
- eg.500ms发送超时,下次发送到这个队列的延迟改为1s。
Consumer端负载均衡
- 客户端定时上报数据
- 服务端知道有多少的客户端
- 定时Rebalance 20S
- 获取队列信息
- 获取消费者信息
- 排序平均分配(按照获得的信息平均分配)
- 与上次结果对比(左边是本次计算出的队列,entity1、entity2是上次负载均衡计算的结果,需要去除。entity3、entity4是上次就在的,entity5、entity6是本次新分配的)
消息消费失败
消费失败策略
- 默认重试16次
- 失败后消息重新投递到重试队列
- 重试时间间隔递增
- 失败进入死信队列
实现原理 - 自动创建失败消息主题
- 客户端默认订阅
- 结合延时消息实现重试间隔 支持18种延时,先进入延时队列,再进入重试队列。默认的一个topic重试队列只有一个,导致只有一个consumer能消费。
RocketMQ高级功能
- 事务消息
- 延时消息
事务消息
业务场景分析:
- 用户下单
- 创建订单 + 减库存
- 发布或更新商品信息
- 写商品库 + 更新外置索引
分布式事务
- 强一致
- 柔性事务
- 事务消息
强一致性协议
- 一致性协议
-
两阶段提交 2PC
- 事务协调者:程序
- 事务参与者A:数据库A
- 事务参与者B:数据库B
- 缺陷:第一阶段没问题,第二阶段假如只commit了事务参与者A,发完后事务协调者挂了,则B未提交。
-
三阶段提交 3PC
- 将第二阶段拆成两段,第二阶段对齐事务状态,所有事务参与者在此刻都知道事务可以提交了。
- 第三阶段参与者等待事务协调者触发,若事务协调者挂了未执行触发。则指定一个超时时间,超过超时时间未触发则事务参与者自动提交。
-
- 落地规范
- XA规范(很少使用,性能衰减严重)
- 资源管理器 - 事务参与者
- 写入加锁
- 事务管理器 - 事务协调者
- 本地记录事务执行状态
- 资源管理器 - 事务参与者
- XA规范(很少使用,性能衰减严重)
柔性事务(最终一致性方案)
-
TCC (Try-Confirm-Cancel)
- 尝试执行业务,预留资源
- 确认执行业务,使用Try阶段资源
- 取消执行业务,释放Try阶段预留的资源
- TCC成功处理流程
- TCC失败处理流程
- 缺点:逻辑全部由开发处理。非常复杂
-
SAGA 模型
- 一个分布式事务拆分为多个本地事务;
- 本地事务都有相应的执行模块(commit事务)和补偿模块(对补偿模块做逆操作);
- 事务管理器负责在事务失败时调度执行补偿逻辑;
事务消息
- 简化了分布式事务模型
- 两次RPC调用 -> 一次RPC + 一次事务消息。只保证落DB + 消息发送的事务。
- 对业务友好
RocketMQ事务消息机制
两阶段提交:
- 发送半消息
- 投递到MQ里不会被消费,发送完后MQ返回半消息发送成功通知。
- 执行本地事务
- 写db等操作
- 发送Commit/Rollback
- 提供回查接口(业务侧提供,缺点就是侵入业务)
- 如果MQ-server很久都没有接收到业务系统发送commit或者rollback的消息的话,MQ-Server就会做半消息的回查,查业务系统来获取这个Message是该提交还是该Rollback。
- 如果MQ-server很久都没有接收到业务系统发送commit或者rollback的消息的话,MQ-Server就会做半消息的回查,查业务系统来获取这个Message是该提交还是该Rollback。
RocketMQ事务消息原理
半消息主题(Topic)
- HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
- MQ会有一个HALF消息的topic:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
- 在发送消息的时候用半消息的主题替换原主题去做投递,保存原主题和队列信息(数据投递到半消息队列中)
- 半消息对Consumer不可见,不会被投递
- OP消息:RMQ_SYS_TRANS_OP_HALF_TOPIC(记录二阶段操作)
- Rollback:只做记录
- Commit:根据备份信息重新构造消息并投递
- 回查
- 对比HALF消息和OP消息进行回查
延时消息
延迟需求:由业务场景或现实情况决定,需要在当前时候之后某一时间触发指定的业务逻辑或操作。
业务场景分析
- 即时通讯消息重发
- IM系统为了确保消息触达接收方,消息发出几秒后没有收到ack,需要重发 消息。
- 订单状态流转
- 取消规定时间内未付款的订单。商家发货后,长时间未确认的订单自动确认收货。
RocketMQ延时消息
RocketMQ支持18个级别的延时等级,默认值为“1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h”,生产者发消息时通过设置delayLevel选择
客户端调用
- 消息体里设置延时属性,值为延时的级别
public void setDelayTimeLevel(intlevel{
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL,String.valueOf(level));
}
实现原理
- 缓存缓存消息
- 替换主题SCHEDULE_TOPIC_XXX,根据延时级别放入对应的队列
- 18个Queue对应18个延时级别
- 每个队列创建定时任务进行调度
- 恢复到期消息重新投递到真实的Topic(Delay queue投递到CommitLog)
代码分析
- 替换主题代码
- 替换主题
- 根据延时级别放到对应的延时队列中去
特点
- 不支持任意时间延时
- Broker配置中指定,只支持18个level
- 不灵活、配置调整需要重启
消息队列拓展功能实现
RocketMQ可扩展功能:
- 事务消息
- 业务侵入大
- 延时消息
- 支持场景有限
事务消息
- 通过客户端实现
- 事务消息表记录发消息事件
- 数据库每个实例里面都创建一个事务消息表
- 本地事务保证业务数据与写消息表原子性
- 事务管理器维护事务消息表
- 扫库发送、清理
5 - 消息投递成功了,删除消息记录
- 扫库发送、清理
客户使用示例
- 本地数据库写入
- 事务消息客户端发送消息(把落db和发送消息都封装起来)
- con.getAutoCommit代表本次有没有开启事务。
MybatisransactionMsgClient类设计
- 获取数据库连接
- 调用sendMsg
事务消息客户端实现
-
消息内容落库
- con.getAutoCommit代表本次有没有开启事务
-
发送消息(消息放到队列)
-
后台发送队列设计
- 发送失败将消息放到优先级队列里
- 发送失败将消息放到优先级队列里
-
发送消息后台任务
- 因为事务还没处理完队列里就已经有消息了
- 因为事务还没处理完队列里就已经有消息了
-
若事务还没提交,则可以将消息放到重试队列。
-
若消息发送成功,修改数据库状态
-
后台优先队列维护
- 从队列中取出数据,判断是否到期,到期则重新放入待发送队列
- 从队列中取出数据,判断是否到期,到期则重新放入待发送队列
-
事务消息表设计
- status 是否已投递出去
- status 是否已投递出去
延时消息
时间轮算法
时间轮算法:可以用于高效的执行大量的定时任务,实现简单并且有非常高的精度。
- 基本概念
- 事件:可以认为是一个个定时任务,比如下单未支付事件。
- 桶:每个数组元素可以认为是一个桶,桶内的链表为一个个事件(桶里的事件的特点是同一时刻到期、触发)
- 游标:指到哪个桶,那这个桶里的事件就要触发了
- 时间精度:游标移动的速度,1s移动一次,表示支持秒级别的触发。1分钟移动一次,表示支持分钟级别的触发。
- 循环数组
- 数据结构
- 数组+链表
- 数组+链表
长时间跨度问题
如果要支持高精度调度,15天的长度,那数组空间的占用就会非常大!
- 使用文件+内存的方式处理
磁盘文件的内容按时间倒序,每个文件存储半小时。若过来一个请求,在当前半小时内的,直接将半小时内的这条数据落在redis,如果在半小时之外的,则落到磁盘文件中。然后每经过半小时时间(提前个5分钟),将文件中的内容读取到redis内存中。
延时消息服务端设计
- Dispatch改造
- 延时消息存储
- 使用内存+文件替代RocketMQ原来的18个队列
- 内存索引(时间轮)
- 延时消息投递
Dispatch改造
- 拓展消息类型
- 根据类型标记判断延时消息
- 根据时间分发到ScheduleLog或内存时间轮
延时消息存储设计(ScheduleLog)
存放延时消息的一组文件,将延时消息按到期时间划分,以半小时为一个区间,存放到指定的ScheduleLog文件中。
- 文件命名:时间 + 偏移量(将文件切分,因为每个时间周期内的ScheduleLog文件大小太不固定了,因此按128M将文件做多个切分,该时段内的时间都一样,偏移量按文件顺序排序)
- 202002171430(时间)+ 000000000
- 文件大小:默认128M
- 存储内容
- 消息体 OR 消息索引(offset + length)
存储内容是按消息体存还是消息索引存??
- 消息索引
- 优点:节省空间
- 缺点:
- 随机读
- CommitLog里的历史文件(15天前)无法删除,而且这15天前的commitLog还得加载到内存里。
- 消息体
- 消息存两份
- 避免随机读
结论:空间目前不是瓶颈,性能是瓶颈,因此按消息体存储。
ScheduleLog文件管理
- 大文件切分
- 一对多
- 文件命名规则
- 时间 + 偏移量
一个逻辑上的ScheduleLog文件由多个小文件组成。
- 时间 + 偏移量
代码实现
- DelayLogManager
- Key:所代表时间;Value:ScheduleLog
- Key:所代表时间;Value:ScheduleLog
- DelayLog
- 管理一个区间内所有文件
- mapedFIles(Key:启始偏移量;Value:文件内容)
- 文件名到内存文件映射
- 文件名到内存文件映射
- ScheduleLog文件写入
- 判读是否需要新建文件
- 根据要写的数据长度来判断
- 按照DelayLog整体文件维度偏移量与128M大小(mapedFileSize)取余得到当前偏移量
- 当前偏移量 + 数据大小 + 结束标识长度是否大于mapedFileSize
- 定位要写入的文件
- 数据写入
- 判读是否需要新建文件
内存时间轮
TimeWheel: 内存时间轮,其中保存的是30分钟内到期的延时消息索引
-
TimeWheel类
- 使用Map(Key 消息到期时间,Value 相同时间的消息队列) + Queue
- addToWheel(DelayMsgIndex delayMsgIndex)
- 从ScheduleLog中装载数据进时间轮
- getQueue(long delayEndSecond)
- 拿到当前到期的队列
- remove(long delayEndSecond)
- 投递完之后清空队列
- 投递完之后清空队列
-
时间轮写入
-
数据装载(非实时写入)
- 获取当前ScheduleLog
- 根据时间从delayLog中取出下一个要到期的文件
- 逐条装载消息索引信息
- 迭代器逐条读取信息生成索引信息
- 将索引信息装载到时间轮内
- 获取当前ScheduleLog
-
实时写入
- 在当前时段内的消息需要同时写入时间轮
- 在当前时段内的消息需要同时写入时间轮
-
到期投递
- 获取当前ScheduleLog
- 逐条装载消息索引信息