消息队列,FIFO :异步 解耦 削峰
复杂度上升 幂等 重复消费 消息丢失 / 可用性降低 mq故障 / 一致性要求
mq对比:
activeMQ:jms规范,支持事务 xa协议
rabbitMQ:erlang 性能👌 高并发 多语言 amqp协议 吞吐量低不易堆积(ait)
镜像队列:集群 blockingqueue存储数据,commit/rollback/ack
死信队列:被消费者否定 requeue=false,在队列存活时间超过ttl,消息数量超过max队列长度
死信交换机direct fanout topic
延迟队列:设置了ttl 消息最大存活时间
直连队列:直连queue,生产者消费者相同参数声明队列
kafka:高性能 高可用 大规模使用 单机容量有限 吞吐量百万 (livu)
rocketMQ:java实现 高可用 可靠 单机十万(商城)
- nameServer producer consumer broker
- b 启动时向all的nameServer注册,长连接 30s一次心跳
- p发送消息时从nameServer获取broker服务器地址,负载均衡选服务器发送消息
- c消费消息 从n获取b地址,主动拉取来消息
异步发送消息,先缓存 达到一定数量 批量发送
pull和push:
pull主动拉,消费者自己决定 控制速率,不及时
push:broker主动推,实时推送,消费者压力大
高可靠:
不多发 不少发,不重复消费 broker持久化 ack机制
消息发送可靠:
ack = 0不重试 1 leader写入成功就返回 all/-1等待isr同步完
unclean.leader.election.enable:false 禁止isr以外follower成为leader
tries>1
min.insync.replicas>1同步副本数,没满足前 不提供读写 写异常
消费可靠:
手工提交offset
broker可靠:pageCache 然后刷盘
减少刷盘间隔,insync;事务消息 commit rollback
rabbitMQ:
- 事务消息
- 消息确认
- 发送方设置channel=confirm模式,消费分配id ,信道ack回调confirmCallback,如错误nack 回调returnCallback
- 接收方声明队列noack=false,broker等待消费者手动ack;broker的ack无超时机制只判断连接是否断开,断了 重新发送另一个 去重
消息不丢失
rockerMQ:
生产者:1.同步阻塞发送 失败重试(broker存储失败)
2.异步发送回调 校验发送结果
3.ack机制,commitLog,存储consumerQueue失败 (风险)
broker:1.同步刷盘 集群模式同步 等待slave复制完成才返回确认
消费者:1.offset手动提交 2.消费保证幂等
环节上:
生产者不丢失:
发送+回调
rocketMQ:半事务 ack确认
rabbitMQ:手动事务 channel txSelect开启 txCommit提交 txRollback回滚 阻塞式
消息同步不丢失:
rocketMQ:普通集群配置 同步同步 异步同步
dledger:两阶段提交
rabbitMQ:普通集群 分散存储 不会主动同步
镜像集群:节点间主动数据同步
kafka:容许少量丢失 acks参数 0 1 all
MQ存盘不丢失:
rocketMQ:配置 同步/异步
rabbitMQ:持久化队列 3.x quorum队列采用raft协议消息同步(两阶段方式)
MQ消费消息不丢失:
rocketmq:默认消费方式 不异步
rabbitmq:autoCommit — 手动提offset
kafka:手动提交offset
不重复消费
幂等,标识
高性能
基于硬盘,消息堆积能力强
顺序写:磁盘顺序访问速度接近内存 append操作 partition有序的 节省寻道时间
批量操作 节省写入次数
partition物理上分多个segment存储,方便删除
零拷贝:将内核缓冲区数据发送到网卡传输
传统:读取磁盘文件 到 内核缓冲区 copy用户缓冲区 copy到socket发送缓冲区 发送网卡
Mmap的MappedByteBuffer 1.5g-2g
transfile通过FileChannel 无文件限制
rocketMQ:Mmap方式读写
kafka:index日志Mmap读写,其他日志无零拷贝;transfile将硬盘数据加载网卡
操作系统pageCache ;如果生产消费速率相当 直接用pageCache 不需要磁盘IO
zk:老版本
/brokers/ids临时节点 所有broker节点信息,物理地址 版本 启动时间 brokerID 定时发心跳
/brokers/topics:临时节点
子节点一个固定partitions节点 子节点上topic分区 保存state节点 leader分区 isr的brokerId
/consumers/group-id/owners/topic/broker_id-partion_id:消费者和分区的注册关系
/consumers/group-id/offsets/topic/broker_id-partion_id:分区消息消费进度offset
消费者 state 找到broker的绑定关系
分布式事务:最终一致性
生产者100%消息投递,消费者保证幂等消费 唯一id+校验
at least once:至少一次确认 发
at most once:最多发一次
exactly once:rocketMQ商业版本提供
rabbitMQ:事务消息
信道设置:
channel.txSelect开启事务,服务器返回tx.select-ok
basicPublish发送多条消息,
txCommit提交
txRollback回滚
消费者使用事务:
autoAck=false 手动ack
rocketMQ:两阶段
transactionListener接口:
executeLocalTransaction发送消息后调用 执行本地事务 成功 再提交消息
checkLocalTransaction本地事务检查,rocketmq依赖此做补偿
两阶段:
prepare:将消息投递给rms_sys_trans_half_topic的topic
commit/rollback:product通过executeLocalTransaction执行事务 据结果向broker发送commit/rollback,commit则rms_sys_trans_half_topic的消息投递到真实topic,删除消息投递rmq_sys_trans_op_half_topic中表示事务完成,如果rollback则只投递删除消息
消息顺序性:
只需要保证局部有序,不需要全局有序
rocketMQ:messageSelector一组有序消息放到同一个队列 消费者一次消费整个队列
发送 消费同一queue
发送保证顺序,消息体上设置消息顺序
发送者实现messageQueueSelector接口,选择queue
selectMessageQueueByHash按参数hash与可选队列 求余选择
selectMessageQueueByRandom:随机选择
mq:本身顺序追加 一个队列一个时间一个consumer消费 加锁 consumer上的顺序消费有一个定时任务来请求延长锁定
消费者:pull模式 顺序拉取消息 顺序消费
push实现mqPushConsumer接口,注册监听消息消息,registerMessageListener
messageListenerConcurrently并行消费
messageListenerOrderly串行消费 consumer会吧消息放入本地队列并加锁 定时任务锁同步
rabbitMQ:一个目标exchange只对应一个队列,一个队列只对应一个消费者
kafka:生产者定制partition分配规则,将消息分配同一partition,topic只对应一个消费者
其他:
rocketMq:每个消息 messageId 消费者自己判断,数据量大不保证唯一
延时队列:指定时间被处理的元素的队列,过期性操作的业务
kafka副本同步
partino:
leo下一跳消息写入位置
hw前的数据可见,isr列表分区信息,
firstUnstableOffset第一条未提交的数据 ,lastStableOffset最后一条提交的数据
isolation.level=read_committed消费到lastStableOffset,read_uncommitted
kafka的rebalance机制
避免,consumer group中消费者 与 topic下的partion 重新匹配
group 成员个数变化 / 消费超时 / group订阅topic个数变化 / group订阅topic分区数变化
coordinator:partition的leader所在broker 监控group中consumer存活 维持心跳 判断消费超时
心跳返回通知consumer进行rebalance
consumer请求coordinator加入组,coordinator选举leader consumer
leader从coordinator获取所有consumer,发送syncGroup分配信息给coordinator
coor心跳将syncGroup下发给consumer
leader consumer监听topic变化,通知coordinator 触发rebalance
C1消费超时 rebalance 重新分配 消息被其他消费 C1完成提交offset
coordinator每次rebalance,标记一个generation给consumer,每次rebalance该generation+1,consumer提交offset时,coordinator对比generaton 不一致拒绝提交
rabbitMQ
持久化
交换机持久化:exchange_declare 参数
队列持久化:queue_declare 参数
消息持久化:new AMOPMessage 参数
append写文件 据大小自动新文件,两个进程一个持久化存储,另一个非持久化存储(内存不够)
存储:ets表记录消息在文件中的映射关系:id 偏移量 有效数据 左边文件 右边文件
删除:从ets删除,垃圾数据 超出50% 文件数>3 垃圾回收,锁定左右两个文件 整理 更新 合并
先写buffer缓冲区,buffer满 写入文件(操作系统页) 25ms刷盘 buffer+页 落盘
交换机
先找到绑定的队列在判断routekey,发到哪个队列
routerKey/bindid
fanout扇形交换机 不判断routekey 直接发送绑定的队列 发布订阅
direct:routekey是否完全匹配,
topic:模糊匹配 routekey
header:队列 交换机指定键值对 交换机分发消息 先解开消息体headers数据 是否设置键值对 匹配成功 发送到队列 ,性能差
普通集群
元数据:
队列:名称和属性
交换器:名称 类型 属性
绑定元数据:简单表哥展示如何将信息路由到队列
vhost元数据:vhost内队列 交换器 提供命名空间和安全属性
客户端连接到非队列所在节点 ,路由转发 发送和消费
节点类型:
磁盘节点:配置信息 元信息 存储在磁盘
内存节点:配置信息 和元信息存储在内存,依赖磁盘节点持久化
至少一个磁盘节点,节点加入离开必须通知磁盘节点,唯一的磁盘崩溃 集群可保持运行 但不能改东西:创建队列 交换器 绑定 用户 数据
架构
bingkey:exchange与queue绑定关系
routingkey:producer指定,交换机拿到 和bingkey 联合
信道:connnection 虚拟连接 amqp信道 长连接
rocketMQ
持久化
commitlog:所有的queue共享,1g 加锁 再写; 写满重生生成 顺序写
避免分区过多 日志文件过多磁盘IO读写压力大
consumeQueue:逻辑queue,消息先到commitlog,异步转发consumeQueue,物理偏移位置offset 内容大小 messageTag的hash值 600w个字节
indexFile:key 时间区间查找commitlog消息,文件名 创建时间戳命名 400m 2000w索引
queue存储少量数据,轻量化,串行化访问磁盘
顺序写 随机读 先读consumeQueue 再读commitLog 降低读效率
同步刷盘:持久化再ack
异步刷盘:pageCache再ack
设计MQ
message对象 可伸缩FIFO队列 分布式队列
producer:消息体 标识id 类型 长度
exchange:队列 丰富路由策略 提前缓冲好对应关系 删除机制
queue:队列 内存 异步持久(数据库) 删除机制
consumer:集群 topic下partion,配置一对多 多对多 一对一消费模式 是否自动ack
java的Mmap的MappedBetyBuffer零拷贝 commitlog consumerLog