RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?

引言:在当今的大数据和分布式系统中,消息队列扮演着至关重要的角色,它们作为系统之间通信和数据传输的媒介,为各种场景下的数据流动提供了可靠的基础设施支持。在消息队列的设计中,推拉模式是两种常见的消息传递机制,它们各自具有独特的优势和适用场景。本文将聚焦于两个著名的消息队列系统:RocketMQ 和 Kafka,并探讨它们在消息传递过程中是如何实现拉模式的。虽然两者都选择了拉模式,但它们的具体实现方式略有不同,从内部机制到性能优化,都反映了对不同应用场景的思考和针对性的改进。

题目

RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?

推荐解析

那到底是推还是拉?

推模式和拉模式各有优缺点,到底该如何选择呢?

RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。

我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。

而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。

虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。

那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。

长轮询

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。

为了简单化,下面我把消息不满足本次拉取的条数啊、总大小啊等等都统一描述成还没有消息,反正都是不满足条件。

RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已

因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。

后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。

这一部分代码我不截了,就是这么个事儿,稍后会用图来展示。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回,如果没有消息怎么办呢?我们来看一下代码。

Snipaste_2024-06-17_05-49-17.jpg

我们再来看下 suspendPullRequest 方法做了什么。

Snipaste_2024-06-17_05-50-03.jpg

而 PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒。

Snipaste_2024-06-17_05-51-17.jpg

简单的说就是 5 秒会检查一次消息时候到了,如果到了则调用 processRequest 再处理一次。这好像不太实时啊? 5秒?

别急,还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟

代码我就不截了,就是消息写入并且会调用 pullRequestHoldService 的 notifyMessageArriving 方法。

最后我再来画个图,描述一下整个流程。

Snipaste_2024-06-17_05-53-18.jpg

Kafka 中的长轮询

像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

我们来简单的看一下源码,为了突出重点,我会删减一些代码。

先来看消费者端的代码。

Snipaste_2024-06-17_05-54-55.jpg

上面那个 poll 接口想必大家都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。

Snipaste_2024-06-17_05-55-33.jpg

我们再来看下最终 client.poll 调用的是什么。

Snipaste_2024-06-17_05-55-58.jpg

最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)

现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的

Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。

Snipaste_2024-06-17_05-57-27.jpg

这个方法进来,我截取最重要的部分。

Snipaste_2024-06-17_05-57-59.jpg

下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,大家放大图片看下即可。

Snipaste_2024-06-17_05-58-31.jpg

这个炼狱名字取得很有趣,简单的说就是利用时间轮来执行定时任务,例如这里是 delayedFetchPurgatory ,专门用来处理延迟拉取操作。

我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。

这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:

  • isCompleted 检查条件是否满足的方法
  • tryComplete 条件满足之后执行的方法
  • onComplete 执行完毕之后调用的方法
  • onExpiration 过期之后需要执行的方法

判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?

这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager 和 appendRecords 方法内部再深入个两方法可以看到。

不过虽说代码不贴,图还是要画一下的。

Snipaste_2024-06-17_06-01-34.jpg

小结一下

可以看到 RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。

一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。

最后

总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更适合于消息队列。

其他补充

鱼聪明 AI 的回答:

鱼聪明 AI 地址:https://www.yucongming.com/

RocketMQ

优点:

  1. 高吞吐量和低延迟: RocketMQ 能够处理每秒数百万条消息的高吞吐量,并提供低延迟的消息传递能力,适合需要快速数据处理和响应的场景。
  2. 丰富的消息模型支持: RocketMQ 支持丰富的消息模型,包括顺序消息、事务消息等,能够满足复杂应用中对消息可靠性和顺序性的需求。
  3. 分布式特性和高可用性: RocketMQ 设计为分布式系统,具备良好的水平扩展性和高可用性,支持消息的持久化存储和数据备份,保障数据的安全性和可靠性。
  4. 灵活的部署模式: RocketMQ 支持多种部署模式,包括单机部署、主从模式和集群部署,可以根据应用需求选择合适的部署方式。

缺点:

  1. 社区和生态系统相对较小: 相较于Kafka,RocketMQ 的社区规模和生态系统相对较小,可能在某些特定功能支持和第三方集成方面不如Kafka成熟。
  2. 复杂性较高: RocketMQ 在配置和管理上相对较复杂,需要一定的运维经验和专业知识来保障系统的稳定性和性能表现。

主要使用场景:

  • 电商和金融行业的实时数据处理: RocketMQ 的低延迟和高吞吐量特性使其非常适合处理电商和金融领域的实时数据流,如订单处理、支付通知等。
  • 分布式事务应用: RocketMQ 的事务消息模型支持确保消息的可靠性传递,适合需要事务性保证的应用场景,如分布式事务、应用程序集成等。
  • 大规模日志收集与分析: RocketMQ 的高吞吐量和顺序消息特性,使其成为大规模日志收集和分析系统的理想选择。

Kafka

优点:

  1. 极高的吞吐量和存储容量: Kafka 是为大规模数据流设计的,能够处理每秒数百万条消息和PB级别的存储数据,适合大数据场景下的实时数据处理和分析。
  2. 持久性和可靠性: Kafka 提供了持久化存储和数据复制机制,确保消息的可靠传递和数据的安全性,支持高可用性和故障容错。
  3. 灵活的消息发布订阅模型: Kafka 提供了灵活的发布订阅模型,支持多种消费者订阅方式和数据分发模式,如分区、复制和副本机制,能够满足复杂数据流的处理需求。
  4. 成熟的生态系统和社区支持: Kafka 拥有庞大的开发社区和丰富的生态系统,支持大量的第三方集成和工具,如数据流处理、流媒体处理等。

缺点:

  1. 运维复杂度较高: Kafka 的配置和管理较为复杂,需要专业的运维团队来维护和管理整个集群,包括监控、扩展和故障处理等方面的工作。
  2. 实时性稍逊: 尽管 Kafka 在吞吐量上表现出色,但在一些低延迟的场景下可能无法满足实时性要求,特别是对于顺序消息的处理。

主要使用场景:

  • 大数据流处理和实时分析: Kafka 的高吞吐量和大容量存储能力使其成为大数据场景下的首选,如日志收集、实时监控、用户行为分析等。
  • 事件驱动架构: Kafka 的消息发布订阅模型适合构建事件驱动的微服务架构和实时数据处理流水线,如事件溯源、实时通知等。
  • 流媒体处理和持续集成: Kafka 的持久化存储和可靠传输特性使其适合于流媒体处理和持续集成的场景,如实时推荐系统、数据流管道等。

总结

RocketMQ 和 Kafka 都是功能强大的消息队列系统,各自在不同的应用场景中有着显著的优势和适用性。选择合适的系统取决于具体的业务需求,包括数据处理的速度、可靠性要求以及整体架构设计等方面的考量。

欢迎交流

本文主要介绍两种不同的消息队列的推拉模式、以及各种的优缺点和使用场景,在文末还有三个关于消息队列的问题,欢迎小伙伴在评论区留言!近期面试鸭小程序已全面上线,想要刷题的小伙伴可以积极参与!

1)消息队列系统在分布式系统中的角色和优化策略是什么?

2)消息队列系统如何确保消息顺序性?

3)消息队列如何保证消息的可靠性传递?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/747475.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

搜索引擎的原理与相关知识

搜索引擎是一种网络服务,它通过互联网帮助用户找到所需的信息。搜索引擎的工作原理主要包括以下几个步骤: 网络爬虫(Web Crawler):搜索引擎使用网络爬虫(也称为蜘蛛或机器人)来遍历互联网&#…

云计算【第一阶段(21)】引导过程与服务控制

目录 一、linux操作系统引导过程 1.1、开机自检 1.2、MBR引导 1.3、GRUB菜单 1.4、加载 Linux 内核 1.5、init进程初始化 1.6、简述总结 1.7、初始化进程centos 6和7的区别 二、排除启动类故障 2.1、修复MBR扇区故障 2.1.1、 实验 2.2、修复grub引导故障 2.2.1、实…

这5款国内可用的宝藏AI视频工具,不允许有人还不知道!(建议收藏)

文章首发于公众号:X小鹿AI副业 大家好,我是程序员X小鹿,前互联网大厂程序员,自由职业2年,也一名 AIGC 爱好者,持续分享更多前沿的「AI 工具」和「AI副业玩法」,欢迎一起交流~ 前几天一位粉丝说给…

40.连接假死-空闲检测-发送心跳

连接假死情况 1.网络设备出现故障,例如网卡,机房等。底层的TCP连接已经断开,但应用程序没有感知到,仍然占着资源。 2.公网网络不稳定,出现丢包。若果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着。 3.应用程序线程阻塞,无法…

postman汉化中文(Windows)

Postman 是一款专业的 API 开发工具,为开发者提供了创建、测试、调试和分享 HTTP 请求的便利性和灵活性。其主要功能包括请求构建与发送、自动化测试、团队协作与分享、实时监视与调试以及环境与变量管理。无论是个人开发者还是团队,Postman 都能有效地提…

UDS - 10.2 DiagnosticSessionControl (10) service

10.3 诊断会话控制(10)服务 来自:ISO 14229-1-2020.pdf 10.2.1 服务说明 DiagnosticsSessionControl服务用于在服务器中启用不同的诊断会话。 诊断会话启用服务器中的一组特定诊断服务和/或功能。该服务提供了服务器可以报告对启用的诊断会话有效的数据链路层特定参数值(…

75101A 1553B总线测试模块

75101A 1553B总线测试模块 75101A 1553B总线测试模块是单通道多功能,符合CPCI/PXI总线的标准3U尺寸模块,可同时用作BC、RTs和BM,其中BM具有比特误码、highbit、lowbit、highword、lowword、校验错误、消息错误检测以及最大256M字节的数据捕…

【ZYNQ】VDMA 的介绍

AXI VDMA 是 Xilinx 官方提供的高带宽视频 DMA IP,用于实现 AXI4-Stream 视频数据流与 AXI4 接口数据的转换,同时提供帧缓存与帧同步控制功能。本文主要介绍 AXI VDMA 的基本结构与原理,并简要介绍 VDMA 的配置与使用方法。 目录 1 VDMA 简介…

程序员必备的ChatGPT技巧:从代码调试到项目管理

近年来,随着人工智能技术的迅猛发展,ChatGPT作为一种强大的对话式AI工具,已经广泛应用于各个领域。而对于程序员来说,ChatGPT不仅可以帮助他们解决编程中的各种问题,还能在项目管理中发挥重要作用。本篇博客将详细介绍…

微信小程序的课堂考勤系统

1 项目介绍 1.1 研究的背景及意义 在信息化快速发展的互联网时代,高校教学管理也面临着数字化转型的迫切需求。传统的课堂考勤方式,如到场点名或教师手工记录,不仅效率低下,耗费大量时间和人力资源,而且容易引发考勤…

vue3-登录小案例(借助ElementPlus+axios)

1.创建一个vue3的项目。 npm create vuelatest 2.引入Elementplus组件库 链接:安装 | Element Plus npm install element-plus --save 在main.js中引入 import ElementPlus from "element-plus";import "element-plus/dist/index.css";ap…

【unity实战】制作unity数据保存和加载系统——小型游戏存储的最优解(包含数据安全处理方案的加密解密)

前言 如何在 Unity 中正确制作一个保存和加载系统,该系统使用JSON 文件来处理保存配置文件,可以保存和加载任何类型对象!标题为什么叫小型游戏存储功能呢?因为该存储功能可能只适合存储数据比较单一的情况,它非常的方…

udp udpClient 聊天室

简介 1、UDP(User Data Protocol,用户数据报协议) (1) UDP是一个非连接的协议,传输数据之前源端和终端不建立连接,当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它…

三大关键技术看RAG如何提升LLM的能力

大语言模型表现出色,但是在处理幻觉、使用过时的知识、进行不透明推理等方面存在挑战。检索增强生成(RAG)作为一个新兴的解决方案,通过整合外部知识库的数据,提高了模型在知识密集型任务中的准确性和可信度&#xff0c…

数据质量管理-一致性管理

前情提要 根据GB/T 36344-2018《信息技术 数据质量评价指标》的标准文档,当前数据质量评价指标框架中包含6评价指标,在实际的数据治理过程中,存在一个关联性指标。7个指标中存在4个定性指标,3个定量指标; 定性指标&am…

星坤Type-A连接器:创新快充技术,引领电子连接!

快速发展的电子时代,消费者对电子设备的性能和便利性有着更高的要求。特别是在充电和数据传输方面,快充技术和高速传输已成为市场的新宠。中国星坤公司推出的Type-A连接器系列,以其卓越的性能和创新的设计,满足了市场对高效、稳定…

Linux-笔记 全志平台休眠功能初探

前言 全志平台支持的休眠功能主要包括两种模式:休眠模式和待机模式。这两种模式用于降低设备的功耗,并在需要时快速恢复工作状态。由于平台为T113,所以可以很方便的使用RTC来做唤醒源。唤醒源指的是能够让系统从休眠状态恢复到工作状态的信号…

《人人都是产品经理》:项目的坎坷一生

《人人都是产品经理》:项目的坎坷一生 产品VS项目产品经理和项目经理 一切项目从kick off 开始工作量预估Kick Off的大致也就15分钟 写文档咯UML图用例文档UCdemo也得做 需求活在项目中bug等级有多高bug流转过程 以终为始 产品VS项目 项目定义:是只会进…

Python 挖坑式填充Excel模板内容(包括页眉/SheetName/logo)

纵览 Python处理Excel的方式--解压缩方式1、导包2、对模板文件进行解压缩3、对解压缩后文件层级进行介绍4、准备需要载入的数据5、模板挖坑6、运行替换代码7、压缩文件8、生成文件9、完成代码10、可能遇到的问题 结语 Python处理Excel的方式–解压缩方式 在处理Excel中过程中&…

2024-6-26 石群电路-30

2024-6-26,星期三,10:38,天气:雨,心情:晴。今天没有什么事情发生,继续学习,加油!!!!! 今日观看了石群老师电路课程的视频…