消息中间件之RocketMQ源码分析(七)

并行消费和顺序消费

ConsumeMessageService是一个通用的消费服务接口,它包含两个实现类org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService,这两个实现类分别用于并发消费和顺序消费

核心方法

在这里插入图片描述

  • start()方法和shudown()方法分别在启动和关闭服务时使用
  • updateCorePoolSize():更新消费线程池的核心线程数
  • incCorePoolSize():增加一个消费线程池的核心线程数
  • decCorePoolSize():减少一个消费者线程池的核心线程数
  • getCorePoolSize():获取消费线程池的核心线程数
  • consumeMessageDirectly():如果一个消息已经被消费过了,但是还项再消费一次,就需要实现这个方法
  • submitConsumeRequest():将消息封装成线程池任务,提交给消费服务,消费服务再将消息传递给业务消费进行处理

1.ConsumeMessageService消息消费分发。ConsumeMessageService服务通过

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest接口接收消息消费任务后,将消息按照固定条数封装成多个ConsumeRequest任务对象,并发送到消费线程池,等待分发给业务消费;ConsumeMessageOrderlyService先将Pull的全部消息放在一个本地队列中然后提交一个ConsumeRequest到消费者线程池

ConsumeMessageConcurrentlyService
在这里插入图片描述
ConsumeMessageOrderlyService
在这里插入图片描述

2.消费消息。消费的主要逻辑再ConsumeMessageService接口的两个实现类中,以并发消费为例.

消费消息主要分为消费前预处理、消费回调、消费结构统计、消费结果处理4个步骤
在这里插入图片描述
第一步:消费执行前进行预处理。执行消费前的hook和重试消息预处理。消费前的hook可以理解为消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic重置为原来的Topic,而不用重试Topc名
在这里插入图片描述
第二步:消费回调。首先设置消息开始消费时间为当前时间,再将消息列表转为不可修改的List,
然后通过status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
方法将消息传递给用户编写的业务消费代码进行处理
在这里插入图片描述
第三步:消费结果统计和执行消费后的hook.客户端原声支持基本消费指标统计,比如消费耗时;消费后的hook和消费前的hook要一一对应,
用户可以用消费后的hook统计与自身业务相关的指标
在这里插入图片描述
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数+1;消费位点处理主要根据消费结果更新消费位点记录
在这里插入图片描述

RocketMQ是一个消息队列,FIFO先进先出规则如何再消费失败时保证消息的顺序呢?

可以从消费任务实现类ConsumeRequest和本地缓存队列ProcessQueue的设计来看主要差异

并发消费
在这里插入图片描述
顺序消费
在这里插入图片描述
顺序消息的ConsumeRequest中并没有保存需要消费的消息,再顺序消费时通过调用ProcessQueue.takeMessage()
方法获取需要消费的消息,而且消费也是同步进行的。
在这里插入图片描述
takeMessages()方法实现
在这里插入图片描述
msgTreeMap:是一个TreeMap<Long, MessageExt>类型,key是物理位点值,value是消息对象,该容器是ProcessQueue用来缓存本地顺序消息的,保存的数据是按照key(就是物理位点值)顺序排列的

consumingMssgOrderlyTreeMap:是一个TreeMap<Long,MessagExt>类型,key是消息物理位点值,value是消息对象,保存当前正在处理的顺序消息集合,是msgTreeMap的一个子集,保存的数据是按照key(就是物理位点值)顺序排列的

batchSize:一次从本地缓存中获取多少条消息回调给用户消费。顺序消息是如何通过ProcessQueue.takeMesages()
获取消息给业务代码消费的呢?

从msgTreeMap中获取batchSize数量的消息放入consumingMsgOrderlyTreeMap中,并返回给用户消费,
由于当前的MessageQueue是被Synchronized锁住的,并且获取的消费消息也是按照消费位点顺序排列的,
所以消费时用户能按照物理位点顺序消费消息

如果消费失败,又是怎么保证顺序的呢?来看processConsumeResult()实现
在这里插入图片描述
RocketMQ支持自动提交offset和手动提交offset两种方式。以自动提交offset为例,手动提交与其完全一致,先看入参
在这里插入图片描述
msg:当前处理的一批消息
status:消费结果的状态。目前支持SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT两种状态

消费成功后,程序会执行commit()方法提交当前位点,统计消费成功的TPS。消费失败后,程序会统计消费失败的TPS,通过执行makeMessageToCOnsumeAgin()方法,删除消费失败的消息,通过定时任务将消费失败的消息在延迟一定时间后,重新提交到消费线程池

makeMessagToConsumeAgin()方法将消息从consumingMsgOrderlyTreeMap中删除再重新放入本地缓存度列msgTreeMap中,等待下次被重新消费
在这里插入图片描述
submitConsumeRequestLater()方法会执行一个定时任务,延迟一定时间后重新将消费请求发送到消费线程池中,以供下一轮消费
在这里插入图片描述
做完这两个操作后,试想以下,消费线程在下一次消费时会发生什么事情?如果是从msgTreeMap中获取一批消息,
那么返回的消息又是那些呢?消息物理位点最小的,也就是之前未成功消费的消息,如果顺序消息消费失败,会再次投递给消费者消费,
直到消费成功,以此来保证顺序性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/375061.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学习并用好大模型

大模型是个好东西&#xff0c;学好并用好益处多多~ 1. 运用大模型服务我们的工作 运用大模型服务于工作&#xff0c;可以从以下几个方面着手&#xff1a; 知识管理与检索&#xff1a; 利用大模型强大的自然语言处理能力&#xff0c;建立企业内部的知识库系统。员工可以通过提问…

03-Java单例模式 ( Singleton Pattern )

单例模式 单例模式设计要点单例模式基础实现摘要实现范例 单例模式的几种实现方式1. 懒汉式&#xff0c;线程不安全2. 懒汉式&#xff0c;线程安全3. 饿汉式4. 双检锁/双重校验锁&#xff08;DCL&#xff0c;即 double-checked locking&#xff09;5. 登记式/静态内部类6. 枚举…

记录关于node接收并解析前端上传excel文件formData踩的坑

1.vue2使用插件formidable实现接收文件&#xff0c;首先接口不可以使用任何中间件&#xff0c;否则form.parse()方法不执行。 const express require(express) const multipart require(connect-multiparty); const testController require(../controller/testController)/…

【论文精读】多模态情感分析 —— VLP-MABSA

Vision-Language Pre-Training for Multimodal Aspect-Based Sentiment Analysis 本篇论文发表于ACL-2022 原文链接 https://arxiv.org/abs/2204.07955 源码 GitHub - NUSTM/VLP-MABSA 模态&#xff1a;图像文本 基于多模态方面的情感分析(MABSA)近年来越来越受到关注。然而&am…

【Power Automate】规避流程30天的运行时限(只针对审批流)

众所周知&#xff0c;Power Automate的流程最多只能运行30天&#xff0c;到点之后直接超时&#xff0c;如果我们有超时时间设置的比较长的审批就会很麻烦&#xff0c;可能我们把审批节点的超时时间都设置为25天&#xff0c;结果第一个审批人就把25天拉满了&#xff0c;那第二个…

SpringBoot实战第三天

今天主要完成了&#xff1a; 新增棋子分类 棋子分类列表 获取棋子分类详情 更新棋子分类 更新棋子分类和添加棋子分类_分组校验 新增棋子 新增棋子参数校验 棋子分类列表查询(条件分页) 先给出分类实体类 Data public class Category {private Integer id;//主键IDNot…

七月论文审稿GPT第2.5和第3版:分别微调GPT3.5、Llama2 13B以扩大对GPT4的优势

前言 自去年7月份我带队成立大模型项目团队以来&#xff0c;我司至今已有5个项目组&#xff0c;其中 第一个项目组的AIGC模特生成系统已经上线在七月官网第二项目组的论文审稿GPT则将在今年3 4月份对外上线发布第三项目组的RAG知识库问答第1版则在春节之前已就绪至于第四、第…

C语言-3

定义指针 /*指针的概念:1.为了方便访问内存中的内容&#xff0c;给每一个内存单元&#xff0c;进行编号&#xff0c;那么我们称这个编号为地址&#xff0c;也就是指针。2.指针也是一种数据类型&#xff0c;指针变量有自己的内存&#xff0c;里面存储的是地址&#xff0c;也就是…

Vue ElementUI中el-table表格嵌套样式问题

一、表格嵌套要求&#xff1a; 两个表格嵌套&#xff0c;当父表格有children数组时子表格才展示&#xff1b;子表格数据少于父表格展示字段&#xff0c;且对应固定操作列不同&#xff1b; 二、嵌套问题&#xff1a; 当使用el-table的typeexpand实现表格嵌套时&#xff0c;样…

hbuiderX打包为apk后无法停止录音的解决方案

同一个APP在hbuilder和hbuilderX打包&#xff0c;出现没有麦克风权限 - DCloud问答 第一步&#xff1a; 在manifest.json的“模块权限配置”中勾选以下权限&#xff1a; <uses-permission android:name"android.permission.MODIFY_AUDIO_SETTINGS" /> <use…

Linus进程概念

冯诺依曼体系结构 我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系 截至目前&#xff0c;我们所认识的计算机&#xff0c;都是有一个个的硬件组件组成 输入单元&#xff1a;包括键盘, 鼠标&#xff0c;扫…

golang 引入swagger(iris、gin)

golang 引入swagger&#xff08;iris、gin&#xff09; 在开发过程中&#xff0c;我们不免需要调试我们的接口&#xff0c;但是有些接口测试工具无法根据我们的接口变化而动态变化。文档和代码是分离的。总是出现文档和代码不同步的情况。这个时候就可以在我们项目中引入swagge…

如何利用边缘计算网关进行机床数据采集,以提高数据采集的效率和准确性-天拓四方

边缘计算网关集成了数据采集、处理和传输功能的嵌入式设备。它位于传感器和执行器组成的设备层与云计算平台之间&#xff0c;能够实时处理和响应本地设备的数据请求&#xff0c;减轻云平台的压力&#xff0c;提高数据处理的速度和效率。同时&#xff0c;边缘计算网关还可以将处…

0206作业

TCP&#xff08;传输控制协议&#xff09;和 UDP&#xff08;用户数据报协议&#xff09;是两种常用的网络传输协议。它们之间的主要区别在于&#xff1a; 可靠性&#xff1a;TCP 是一种可靠的传输协议&#xff0c;它提供了数据传输的确认、重传和排序功能。如果数据在传输过程…

ROS笔记二:launch

目录 launch node标签 参数 参数服务器 节点分组 launch launch文件是一种可以可实现多节点启动和参数配置的xml文件,launch文件用于启动和配置ROS节点、参数和其他相关组件。launch文件通常使用XML格式编写&#xff0c;其主要目的是方便地启动ROS节点和设置节点之间的连…

一周学会Django5 Python Web开发-Django5介绍及安装

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计10条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

有奖讨论丨你能看出来哪些是 AI 写的代码么?

随着 AI 智能浪潮到来&#xff0c;AI 智能编码助手成为越来越多开发者的必备工具&#xff0c;Github Copilot、Amazon CodeWhisperer 等 AI 编码工具陆续登场&#xff0c;去年云栖大会阿里云发布的 “通义灵码” 同样令人期待。 通义灵码&#xff1a; https://tongyi.aliyun.co…

数据加密算法多样化的安全需求

数据加密算法是信息安全领域中非常重要的一环&#xff0c;它能够确保数据在传输和存储过程中的机密性和完整性。随着技术的发展&#xff0c;数据加密算法也在不断地演进和改进&#xff0c;以满足更为复杂和多样化的安全需求。 数据加密算法的基本原理是使用加密密钥和加密算法对…

86.分布式锁理论分析

文章目录 前言一、为什么需要分布式锁&#xff1f;二、基于 Redis 分布式锁怎么实现&#xff1f;三、Redis 分布锁存在的问题3.1 死锁问题3.2 锁过期时间问题3.3 锁被别人释放问题 四、Redis 分布锁小结五、Redis 主从同步对分布式锁的影响六、Redlock 方案七、Redlock 的争论7…

Java笔记 --- 七、多线程

七、多线程 线程 线程是操作系统能够运行调度的最小单位 被包含在进程之中&#xff0c;是进程的实际运行单位 应用软件中相互独立&#xff0c;可以同时运行的功能 每一个线程都有自己的栈 并发和并行 并发&#xff1a;在同一时刻&#xff0c;有多个指令在单个CPU上交替执…