RocketMQ、Kafka、RabbitMQ 消费原理,顺序消费问题【图文理解】

B站视频地址



文章目录

  • 一、开始
  • 二、结果
    • 1、RocketMQ 消费关系图
      • 1-1、queue和consumer的关系
      • 1-2、consumer 和线程的关系
    • 2、Kafka 消费关系图
      • 1-1、partitions和consumer的关系
      • 1-2、consumer 和线程的关系
    • 3、RabbitMQ 消费关系图
      • 1-1、queue和consumer的关系
      • 1-2、consumer 和线程的关系
    • 4、总结
  • 三、实践
    • 1、全局有序
    • 2、局部有序


一、开始


先来定义一下何为顺序消息,比如有A、B两条消息,消息处理的流程是 1、2、3 … 10,只有当A消息执行10完毕后,B消息才可以进行1流程。

注:如果A执行到7,B开始执行1,这其实不一定是顺序消息,因为各种原因最终可能导致B先执行完10。


目前比较流行的队列:RocketMQ、RabbitMQ、Kafka

  • RocketMQ 消息发送到 topic,再到topic关联的 queue
  • RabbitMQ 消息发送到 exchange,再由exchange通过规则到 queue
  • Kafka 消息发送到 topic,再到topic关联的 partitions (partitions可以理解就是一个queue)

基于消息队列的规则,想要达到我们的目标就要求A、B两个消息先后发送到同一个 queue/partitions,且只能有一个消费者,且消费的时候必须是单线程非异步的才可满足。


二、结果


三种MQ都支持消息发送到指定的 queue/partition,简单来说就是基于一个标识去计算看它应该落在哪个queue/partition,同一批顺序消息的标识是一样的,所以最终进入的queue/partition也是一致的。 进入 queue/partition 之后的消息都是顺序的,它们是 FIFO的。

顺序消息的控制主要是在消费端,那问题就变成了2个

  1. queue/partition 和消费者之间是如何对应的
  2. 消费者对同一个 queue/partition 的消息,是多线程还是单线程

只有满足一个queue/partition 只能对应一个消费者,一个消费者对于一个queue/partition 是单线程消费的,才可以做到消费顺序。


注:MQ有集群消费和广播消费,顺序消费肯定是建立在集群消费模式下的。

最终结果:RocketMQ和Kafka是支持顺序消息的,RabbitMQ不支持顺序消息。


1、RocketMQ 消费关系图


1-1、queue和consumer的关系


在这里插入图片描述


从上面的图可以看到,一个queue最多只能对一个 consumer,如果某个 topic需要更大的并发,那就需要,那就增加 queue,然后增加 consumer


1-2、consumer 和线程的关系


正常使用SpringBoot开发项目的时候,都是引入 rocketmq-spring-boot-starter,然后用 @RocketMQMessageListener 来做消费处理,所以下面图也是基于这个用法来画的

在这里插入图片描述


通过这个图可以看到使用 @RocketMQMessageListener 做消费者的时候,本质上消息是被多线程去消费了,那就存在A、B消息的真正处理顺序不一致了。


RocketMQ的解决办法是,当你设置消费为顺序消费的时候,在消息处理的时候它会基于 queue加锁,这样就只能单线程处理这个queue的消息了。


设置顺序消费的代码

@RocketMQMessageListener(
        topic = "Topic1", 
        consumerGroup = "springboot3_producer_group", 
        consumeMode = ConsumeMode.ORDERLY
)

2、Kafka 消费关系图


Kafka 里面没有queue的概念,转而用partitions,但其本质上queue和partitions是一样的,就把它理解成一个queue完事


1-1、partitions和consumer的关系


partitions 和 consumer的关系和 RocketMQ的一模一样,只是把queue改为partitions即可,就不画了


1-2、consumer 和线程的关系


在这里插入图片描述


  1. Kafa消费消息的时候是主动去拉,拉到了就去消费,消费完了,再去拉。 拉和消费的线程是一个
  2. 当自定义线程数大于 partitions 的时候,没用,这个没用的意思是 Kafka压根不会创建比分配给自己 partitions 数量更多的线程
  3. 添加消费者的时候,会自平衡(这点所有的MQ都一样的)
  4. 默认如果没有给consumer设置线程数的话,是单线程

Kafka的解决办法是,每一个 partitions 最多只有一个线程来消费它,单线程自然就是顺序消费的咯。


3、RabbitMQ 消费关系图


1-1、queue和consumer的关系


在这里插入图片描述


  1. RabbitMQ新增了exchange(交换机)的概念,所有的数据都是先发送到交换机,再由exchange基于规则下发到具体的queue
  2. 可以通过设置交换机的类型的,让消息投递到一个或多个 queue
  3. 广播消息:可以设置exchange类型为fanout,这样消息就会投递到所有与之绑定的queue(前提是没有设置特殊的 routingkey)
  4. 集群消费:可以设置多个 consumer去消费一个queue,或一个消费者设置多线程去消费,以此来增加消费速率

注:RabbitMQ的queue和consumer是可以设置为多对多的关系


1-2、consumer 和线程的关系


在这里插入图片描述


  1. RabbitMQ默认也是一个线程消费
  2. 当开启了多个线程的时候,消息最终顺序就可能不一致,因为各个线程之间其实是相互独立的

4、总结


从上述结果来看其实三种队列都是支持顺序消息的(前提消息都发送到一个 queue/partitions),但支持的程度和结果不同

  1. RocketMQ,一个queue只能有一个consumer,消费者是多线程的,但开启顺序消费的时候,会对 queue加锁从而保证顺序
  2. Kafka,一个 partitions只能由一个consumer的一个线程去消费,基于单线程就保证了顺序性
  3. RabbitMQ,queue和consumer是多对多的,consumer的多个线程是独立的,要想保证顺序,只能让一个queue只有一个consumer,且consumer只有一个线程(但这样做效率就很低)

三、实践


1、全局有序


基于上述分析,三种MQ都可以做到全局有序,因为一旦要求全局有序,消费者就必须是单线程消费。


2、局部有序


比如用户订单业务,对于不同的用户它们的消费顺序可以不用关注,但是对于同一个用户的消息必须是严格有序的(简单的如先下单、再支付)。

对于这种场景RabbitMQ基本上就不满足的,它只有一个队列,如果消费者是单线程的会阻塞其它的消息,一定会造成消息积压。

RocketMQ和Kafka在发送消息的时候都可以指定一个queue/partitions(发消息的时候指定一个key,通过key的hash找一个queue,相同的key得到的就是同一个queue)。

  1. RocketMQ 通过顺序消息对queue加锁变成单线程消费
  2. Kafka 的每一个partitions 就只有一个线程去消费

消息可能重复消费这个和顺序消息没关系,所以在写消费逻辑的时候应该做幂等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/445649.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

爬虫练习:获取某招聘网站Python岗位信息

一、相关网站 二、相关代码 import requests from lxml import etree import csv with open(拉钩Python岗位数据.csv, w, newline, encodingutf-8) as csvfile:fieldnames [公司, 规模,岗位,地区,薪资,经验要求]writer csv.DictWriter(csvfile, fieldnamesfieldnames)writer…

每日OJ题_牛客WY28 跳石板(动态规划)

目录 牛客WY28 跳石板 解析代码 牛客WY28 跳石板 跳石板_牛客题霸_牛客网 解析代码 #include <iostream> #include <vector> #include <climits> #include <cmath> using namespace std;void get_div_num(int n, vector<int>& arr) {for…

selenium元素定位问题

具体网页信息如下&#xff1a; 定位的时候driver.find_element(By.CLASS_NAME, 方法搞不定。 定位方法&#xff1a; 方法一&#xff1a;通过文本定位 driver.find_element(By.XPATH, "//*[text()高分一号]").click() time.sleep(3) 如果是部分文字 #部分文字py…

怎么写品牌方流量打造抖音运营规划方案

【干货资料持续更新&#xff0c;以防走丢】 怎么写品牌方流量打造抖音运营规划方案 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 抖音运营资料合集&#xff08;完整资料包含以下内容&#xff09; 目录 Step 1: 人货沟通策略 人群定位与细分 1. 从品牌及产品…

【备战蓝桥杯系列】蓝桥杯国二选手笔记二:算法模版笔记(Java)

感谢大家的点赞&#xff0c;关注&#xff0c;评论。准备蓝桥杯的同学可以关注一下本专栏哦&#xff0c;不定期更新蓝桥杯笔记以及经验分享。本人多次参加过蓝桥杯&#xff0c;并获得过蓝桥杯国二的成绩。 算法模版笔记&#xff08;Java&#xff09; 这篇文章给大家分享我的蓝桥…

寒假作业Day 10

寒假作业Day 10 一、选择题 1、下列数据结构中&#xff0c;不属于线性表的是( ) A.队列 B.顺序表 C.二叉树 D.链表 A. 队列&#xff1a;队列是一种特殊的线性表&#xff0c;它只允许在表的前端&#xff08;front&#xff09;进行删除操作&#xff0c;而在表的后端&#xff08…

【经管数据-更新】华证ESG评级得分数据(2009-2023年)

一、数据说明 参考《经济研究》中方先明&#xff08;2023&#xff09;的做法&#xff0c;将华证ESG评级进行赋值&#xff0c;指标包含C、CC、CCC、B、BB、BBB、A、AA、AAA共9个等级&#xff0c;将上市公司ESG 等级从低到高分别赋值为1至9 二、数据来源&#xff1a;世界银行&am…

Springboot进行web开发

创建springboot工程&#xff0c;基于2022版idea pom.xml文件中的插件爆红&#xff1a; 解决方法&#xff1a;给插件加<version>版本号</version> 版本号和<parent></parent>中的版本号一样。 另外有人说重启也可以解决爆红&#xff0c;可以试一下&a…

Stable diffusion(一)

Stable diffusion 原理解读 名词解释 正向扩散&#xff08;Fixed Forward Diffusion Process&#xff09;&#xff1a;反向扩散&#xff08;Generative Reverse Denoising Process&#xff09; VAE&#xff08;Variational AutoEncoder&#xff09;&#xff1a;一个用于压缩图…

【动态规划】【前缀和】【和式变换】100216. K 个不相交子数组的最大能量值

本文涉及知识点 动态规划汇总 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 LeetCode 100216. K 个不相交子数组的最大能量值 给你一个长度为 n 下标从 0 开始的整数数组 nums 和一个 正奇数 整数 k 。 x 个子数组的能量值定义为 stren…

Swagger修改Api文档中的数据类型

swagger不陌生,API接口利器,本次要解决的问题是:我们知道前端在接收Long类型的属性时会出现精度问题,一般我们会在序列化的时候将Long类型的数字转换成String但是swagger的API文档中的类型还是Long,我们要解决的就是这个问题 不知道swagger怎么配置得可以看之前的文章:springb…

空间复杂度的OJ练习——轮转数组

旋转数组OJ链接&#xff1a;https://leetcode-cn.com/problems/rotate-array/ 题目&#xff1a; 思路&#xff1a; 通过题目我们可以知道这是一个无序数组&#xff0c;只需要将数组中的数按给定条件重新排列&#xff0c;因此我们可以想到以下几种方法&#xff1a; 1.暴力求解法…

【Tauri】(5):本地运行candle和 qwen 大模型,并测试速度

1&#xff0c;本地运行candle 关于candle项目 https://github.com/huggingface/candle Hugging Face 使用rust开发的高性能推理框架。 语法简单&#xff0c; 风格与 PyTorch 相似。 CPU 和 Cuda Backend&#xff1a;m1、f16、bf16。 支持 Serverless&#xff08;CPU&#xff…

React 从0到1构建企业级框架基于Antd Designer

一、 create-react-app 创建 cms-front 二、 删除不必须要的文件形成如下结构 1. React版本为17版本 public 文件夹下保留 favicon.ico 偏爱图标index.html资源文件 2.src 保留 index.js 入口文件和app.js(基于spa原则)单文件即可 三、配置eslint 1. 安装 eslint. npm inst…

章六、集合(1)—— Set 接口及实现类、集合迭代、Map 接口、Collections类

一、 Set 接口及实现类 Set接口不能存储重复元素 Set接口继承了Collection接口。Set中所存储的元素是不重复的,但是是无序的, Set中的元素是没有索引的 Set接口有两个实现类&#xff1a; ● HashSet &#xff1a;HashSet类中的元素不能重复 ● TreeSet &#xff1a;可以给Set集…

低密度奇偶校验码LDPC(十)——LDPC码的密度进化

一、密度进化的概念 二、规则LDPC码的密度进化算法(SPA算法) 算法变量表 VN更新的密度进化 CN更新的密度进化 算法总结 程序仿真 参考文献 [1] 白宝明 孙韶辉 王加庆. 5G 移动通信中的信道编码[M]. 北京: 电子工业出版社, 2018. [2] William E. Ryan, Shu Lin. Channel Co…

Spring-AOP基础(全在这里)

八股文部分来源于网络&#xff0c;例子为原创 OOP(Object-oriented programming) 也就是面向对象编程&#xff0c;继承&#xff0c;封装&#xff0c;多态。 局限性 静态语言&#xff1a;类结构一旦定义&#xff0c;不容易被修改(并不是无法修改)。只能侵入性扩展&#xff1a…

太强了!最全的大模型检索增强生成(RAG)技术概览!

本文是对检索增强生成&#xff08;Retrieval Augmented Generation, RAG&#xff09;技术和算法的全面研究&#xff0c;对各种方法进行了系统性的梳理。文章中还包含了我知识库中提到的各种实现和研究的链接集合。 鉴于本文的目标是对现有的RAG算法和技术进行概览和解释&#…

【深度学习笔记】6_5 RNN的pytorch实现

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;部分标注了个人理解&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 6.5 循环神经网络的简洁实现 本节将使用PyTorch来更简洁地实现基于循环神经网络的语言模型。首先&#xff0c;我们读取周杰伦专辑歌词…

【C++】list模拟实现list迭代器失效问题

list模拟实现&list迭代器失效问题 一&#xff0c;list模拟实现1. list的主要框架接口模拟2. list构造&拷贝构造&析构3. list迭代器3.1 普通迭代器3.2 const迭代器 4. 增删查改 二&#xff0c;迭代器失效问题1. list的迭代器失效原因2. 解决办法 一&#xff0c;list…