消息中间件之RocketMQ源码分析(六)

Consumer消费方式

RocketMQ的消费方式包含Pull和Push两种

  • Pull方式。
    用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
    缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
    org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
    在这里插入图片描述
    1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
    2.遍历全部Queue,拉取每个Queue可以消费的消息
    3.如果拉取到消息,则执行用户编写的消费代码
    4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
  • Push方式。
    代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
    在这里插入图片描述
    1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
    启动PullMessageService的拉取服务
    在这里插入图片描述

在这里插入图片描述
PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
在这里插入图片描述
消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
在这里插入图片描述
在这里插入图片描述

拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
在这里插入图片描述
并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
在这里插入图片描述
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
在这里插入图片描述
队列锁定
在这里插入图片描述
订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
在这里插入图片描述
封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
在这里插入图片描述

在这里插入图片描述
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
在这里插入图片描述
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败

  • Pull和Push方式的比较
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/375096.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python+django+vue高校学生社团管理系统euw84

社团管理系统是一个B/S模式系统,采用django框架,MySQL数据库设计开发,充分保证系统的稳定性。在系统的测试环节,主要通过功能测试的方式,验证系统的功能设计是否符合要求,能否满足使用需求。本社团管理系统…

【实训】自动运维ansible实训(网络管理与维护综合实训)

来自即将退役学长的分享,祝学弟学妹以后发大财! 一 实训目的及意义 1.1 实训目的 1、熟悉自动化运维工具:实训旨在让学员熟悉 Ansible 这一自动化运维工具。通过实际操作,学员可以了解 Ansible 的基本概念、工作原理和使用方法…

text-generation-webui搭建大模型运行环境与踩坑记录

text-generation-webui搭建大模型运行环境 text-generation-webui环境初始化准备模型启动项目Bug说明降低版本启动项目 text-generation-webui text-generation-webui是一个基于Gradio的LLM Web UI开源项目,可以利用其快速搭建部署各种大模型环境。 环境初始化 下载…

【VSTO开发-WPS】下调试

重点2步: 1、注册表添加 Windows Registry Editor Version 5.00[HKEY_CURRENT_USER\Software\kingsoft\Office\WPP\AddinsWL] "项目名称"""2、visual studio 运行后,要选中附加到调试,并指定启动项目。 如PPT输入WPP搜…

考研高数(一阶导与二阶导)

一阶导数 导数最大的作用是判断复杂函数的单调性&#xff0c;则可用一阶导判断原函数的单调性。 一阶导数>0&#xff1a;函数单调递增&#xff1b; 一阶导数<0&#xff1a;函数单调递减&#xff1b; 一阶导数0&#xff1a;函数是常函数。 也可以通过一阶导数0的根来…

一致性哈希算法

在分布式领域中各技术组件都有实现KV形式的存储&#xff0c;在实现各类工作能力的同时还简化了算法实现。以Raft分布式协议为例&#xff0c;它通过在领导者采用KV存储来简化算法实现和共识协商&#xff0c;但同时也限制所有写请求只能在领导者节点上进行处理&#xff0c;从而导…

TS项目实战二:网页计算器

使用ts实现网页计算器工具&#xff0c;实现计算器相关功能&#xff0c;使用tsify进行项目编译&#xff0c;引入Browserify实现web界面中直接使用模块加载服务。   源码下载&#xff1a;点击下载 讲解视频 TS实战项目四&#xff1a;计算器项目创建 TS实战项目五&#xff1a;B…

龙芯安装使用搜狗输入法

CPU&#xff1a;龙芯3A6000 操作系统&#xff1a;Loongnix 桌面主题&#xff1a;Cartoon 龙芯系统切换输入法的按键一般为&#xff1a;Ctrl空格。 1 安装搜狗输入法 进入Loongnix系统自带的龙芯应用合作社&#xff0c;寻找搜狗输入法&#xff0c;点击安装。 按下Ctrl空格&…

生成树技术华为ICT网络赛道

9.生成树 目录 9.生成树 9.1.生成树技术概述 9.2.STP的基本概念及工作原理 9.3.STP的基础配置 9.4.RSTP对STP的改进 9.5.生成树技术进阶 9.1.生成树技术概述 技术背景&#xff1a;二层交换机网络的冗余性与环路 典型问题1&#xff1a;广播风暴 典型问题2&#xff1a;MA…

C++多态_C++回顾

多态的概念 通俗的说多态就是多种形态&#xff0c;具体点就是去完成某个行为&#xff0c;当不同的对象去完成时会产生出不同的概念。 什么是多态 静态的多态 静态的多态即函数重载&#xff0c;编译时是参数匹配和函数名修饰规则。 动态的多态 运行时实现&#xff0c;跟指…

(篇九)MySQL常用内置函数

目录 ⌛数学函数 ⌛字符串函数 ⌛聚合函数 ⌛日期函数 &#x1f4d0;获取当前时间 &#x1f4d0;获取时间的某些内容 &#x1f4d0;​编辑 &#x1f4d0;格式化函数 &#x1f4cf;format类型&#xff1a; ⌛系统信息函数 ⌛类型转换函数 数学函数 字符串函数 聚合函…

《计算机网络简易速速上手小册》第6章:网络性能优化(2024 最新版)

文章目录 6.1 带宽管理与 QoS - 让你的网络不再拥堵6.1.1 基础知识6.1.2 重点案例&#xff1a;提高远程办公的视频会议质量实现步骤环境准备Python 脚本示例注意事项 6.1.3 拓展案例1&#xff1a;智能家居系统的网络优化实现思路Python 脚本示例 6.1.4 拓展案例2&#xff1a;提…

Go语言每日一练 ——链表篇(三)

传送门 牛客面试笔试必刷101题 ---------------- 链表中的节点每k个一组翻转 题目以及解析 题目 解题代码及解析 package main import _"fmt" import . "nc_tools" /** type ListNode struct{* Val int* Next *ListNode* }*//*** 代码中的类名、方…

矩阵的正定(positive definite)性质的作用

1. 定义 注意&#xff0c;本文中正定和半正定矩阵不要求是对称或Hermite的。 2. 性质 3. 作用 &#xff08;1&#xff09;Axb直接法求解 cholesky实对称正定矩阵求解复共轭对称正定矩阵求解LDL实对称非正定矩阵求解复共轭对称非正定矩阵求解复对称矩阵求解LU实非对称矩阵求解…

离线环境怎么下载python依赖包

公司内网环境无网络&#xff0c;运行自动化脚本需要安装python模块 1、脚本依赖包及其版本获取&#xff0c;记录在requirements.txt中 pipreqs ./script --encodingutf8 requirements.txt注意&#xff0c;这里是将./script 里的python模块自动扫描并写入到requirements.txt中…

QT学习日记 | 显示类控件

目录 前言 一、QLabel控件 1、属性介绍 2、实战演练 &#xff08;1&#xff09;文本格式属性 &#xff08;2&#xff09;图片属性 &#xff08;3&#xff09;对齐、换行、缩进、边距属性 &#xff08;4&#xff09;伙伴属性 二、QLCDNumber控件 1、属性介绍 2、实战…

图灵之旅--二叉树堆排序

目录 树型结构概念树的表示形式 二叉树概念特殊的二叉树二叉树性质二叉树的存储二叉树的遍历前中后序遍历 优先级队列(堆)概念 优先级队列的模拟实现堆的性质概念堆的存储方式堆的创建 堆常用接口介绍PriorityQueue的特性PriorityQueue常用接口介绍优先级队列的构造插入/删除/获…

闲聊电脑(5)装个 Windows(一)

​夜深人静&#xff0c;万籁俱寂&#xff0c;老郭趴在电脑桌上打盹&#xff0c;桌子上的小黄鸭和桌子旁的冰箱又开始窃窃私语…… 小黄鸭&#xff1a;冰箱大哥&#xff0c;上次说到硬盘分区和格式化&#xff0c;弄完之后&#xff0c;就该装系统了吧&#xff1f; 冰箱&#x…

确定问卷调查样本量

目录 1. 问卷数据类型1.1 定性数据&#xff06;定性分析1.2 定量数据&#xff06;定量分析 2. 确定初始样本容量&#xff1a;2.1 公式&#xff1a;2.2 Z值2.3 p2.4 e2.5 举例 3.调整初始样本容量&#xff1a;3.1 公式&#xff1a;3.2 结论就是 小结&#xff1a; 1. 问卷数据类型…

消息中间件之RocketMQ源码分析(七)

并行消费和顺序消费 ConsumeMessageService是一个通用的消费服务接口&#xff0c;它包含两个实现类org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和 org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService&#xff0c;这两个…