【程序大侠传】服务发布引发mq消息重复消费

前序

在编程武侠世界中,有一个门派“天机楼”,连接并协调各大门派之间的关系,确保整个江湖的运作流畅无阻。天机楼住要的业务范围主要如下:

  • 信息传递的信使:
    天机楼就像是江湖中的飞鸽传书,确保各门派之间的信息能够快速、准确地传递。无论是战斗指令、情报交换还是紧急求援,天机楼都能可靠地完成任务。
  • 系统稳定的守护者:
    它如同一位隐形的护法,时刻监控着江湖的运作,确保各门派的系统稳定运行,避免因系统故障而引发的江湖动乱。
  • 性能优化的高手:
    天机楼精通各种优化技巧,能够在复杂的江湖环境中找到最佳的解决方案,提升系统的性能,让各系统的操作更加高效与流畅。
  • 负载均衡的调度者:
    如同武林盟主一般,天机楼可以合理调配各门派的资源,确保每个门派都能均衡发展,避免资源过度集中或分配不均。
  • 问题解决的医者:
    当江湖中出现问题时,天机楼能够迅速诊断并修复问题,像神医华佗一样,确保江湖的和平与稳定。

天机楼的武器:

  • 消息队列:
    如同传递消息的飞鸽,确保信息快速且准确地到达目的地。
  • 缓存系统:
    犹如藏在暗处的密库,能快速提供所需的资源。
  • 负载均衡:
    如同武林盟主,能够调配各系统的资源,确保每个系统都能均衡发展。

阿强所在的世界中,天机楼一直是基石般的存在,这个神秘的组织中每个人的技术功力都非常强悍。以至于天机楼虽然人数不多,但是地位与实力却一直很高。此组织成员一般散落在世界各地,统一由天机阁管理。而天机阁中的成员所接的任务难度跟积分跟平常门派所发布的任务都要高出不少,也正是因为如此,很多觉得自己实力还不错的人都想加入其中,但很多人都低估 了其考核难度,而此组织如果你实力没有达到他的考核标准是没有其他的途径进入的。而加入天机楼除了能够获取高积分的任务之外,还有的好处是,此组织跟自己的门派不冲突,也就是说,天机楼的身份不影响你可以在任何门派中担任职位。而阿强就是除了是目前所在代码剑宗的武林高手身份外,也在天机阁中担任中级侠客一职。而所谓的天机阁内部也有着自己的一套等级划分:

  • 初级侠客(新手):
    职责:负责基础的中间件配置和维护工作,处理常见的小问题和简单的优化任务。
    能力:掌握基础的中间件技术,能够进行基本的安装、配置和常见问题排查。
  • 中级侠客(高手):
    职责:负责复杂的中间件部署和优化任务,能够独立解决较为复杂的问题。
    能力:精通常见的中间件技术,具备较强的系统优化和问题解决能力,能够进行性能调优和负载均衡配置。
  • 高级侠客(大侠):
    职责:负责整个系统的中间件架构设计和优化,能够处理高难度的技术问题和进行系统级的优化。
    能力:深谙各种中间件技术,具备系统架构设计能力,能够进行复杂的系统集成和全面的性能优化。
  • 宗师(顶级专家):
    职责:负责中间件技术的战略规划和创新,领导团队进行技术攻关和前沿技术探索。
    能力:拥有深厚的技术积累和丰富的实战经验,能够在技术上引领团队,推动中间件技术的发展和创新。

阿强处理完上次门派中的pagehelper的紧急任务后就一直沉浸在进行门派中的另一个开发任务中,不知道过了多久,阿强满脸兴奋地从自己的洞府走出,他经过这段时间的闭关,终于将那个开发任务给开发完并交给断点神教小美测试,此时的他一身轻松地望向天空不禁有些恍惚,大约过了2刻钟,天机阁突然发布了一条任务“L服务出现mq重新消费,请及时处理”,本来此时的阿强就是无事一身轻,正想着去天机阁找个任务,因此看到任务这么合时宜地发布,阿强毫不犹豫就认领了下来,不多时,阿强就全身心地投入到此任务中…。

第四章 什么?mq重复消费了?

15分钟过后,阿强了解到此次的问题暴露是由于L服务侧下游F服务(没错,就是上次pagehelper 分页sql问题服务)生成了两条一样的由L服务调用产生的订单。而L调用F服务生成订单的时序图如下:
在这里插入图片描述

从时序图不难看出,这个生成订单的过程是一个异步过程,而L服务的异步处理是通过MQ来实现。知道了整体交互逻辑的阿强打开idea查看L服务中MQ的消费逻辑:

//下面是L服务消息消费的伪代码
public MsgStatus onMessage(ConsumeMessage msg){
		log.info("【xxx异步处理】接收MQ消息:{}", message);
		if (StringUtils.isBlank(message)) {
            return MsgStatus.SUCCEED;
        }
        Entity entity = dataConversion();//数据转换
        LOrderEntity  LOrderEntity = LOrderRepository.selectByLOrderId(entity.getLOrderId());
        //幂等校验,但是在某些场景下没有用,如本案例中
		if(StringUtils.isNotEmpty(LOrderEntity.getTaskId()))){
			log.warn("【xxx异步处理】mq重复发送消息");
			return MsgStatus.SUCCEED;
		}
		//构建请求入参
		FOrderReq req = buildReq(LOrderEntity,msg);
		RpcResponse res = FRpc.createOrder(req);
		//构建更新字段实体
		LOrderEntity  LOrderEntity1  = buildLOrderField(res.getTaskId(),res.getStates());
		LOrderRepository.updateOrder(LOrderEntity1);
        return MsgStatus.SUCCEED;
    }

阿强梳理完L服务的异步消息处理逻辑,随即就开始通过天书法器查看L服务的日志,不多时,阿强就通过“【xxx异步处理】接收MQ消息”关键字在天书上看到了两条很奇怪的日志,这两条日志都是mq消费逻辑打印出来,且后面输出的mq消息体字段全都是一样。
在这里插入图片描述
看到两条一样的消息体,阿强陷入了沉思,随即他又通过“发送mq”关键字,在这条链路上搜索,发现只出现了一条日志,也就是说,mq生产者只发送了一条消息,却消费了两次。
在这里插入图片描述
看到这种情况,阿强脑子里面浮现几种猜想,第一种是:天机楼的消息处理发生抖动导致的重复消费;第二种是:L服务的消费者没有去ack。此时的阿强也没办法确定是那种情况导致的重复消费,此时最好的办法就是使用排除法。同身为天机阁成员,阿强有权限去查看某个消息的一个消费情况跟整个消息处理服务的监控。当阿强打开天眼系统查看消息处理服务的监控,发现整个服务的指标都很正常,并没有什么抖动。此时第一种猜想已经验证是没有问题的,阿强快马加鞭地开始第二种猜想验证。只见他打开天书系统的链路耗时
在这里插入图片描述
发现整个消费的耗时远远没有达到消息ack处理超时时间3分钟的。此时的阿强双眉紧凑,脑中已经开始了头脑风暴。如果没有达到消息ack处理超时时间,那么还会有什么场景会让mq消息ack失效呢?阿强在脑中重新回顾了一下RocketMQ的ack机制(因为L系统使用的RocketMQ消息中间件):

  1. 消息消费确认机制
    RocketMQ 采用了“消费者主动确认”的机制,即消费者在成功处理完消息后,主动向 Broker 发送 ACK 确认。这与一些消息队列系统的自动确认机制不同,能够确保消息被成功处理后才被确认。
  2. 消息消费过程
    消息发送:生产者将消息发送到 RocketMQ Broker。
    消息存储:Broker 接收到消息后,将消息存储在磁盘中,并将消息写入 CommitLog。
    消息拉取:消费者从 Broker 拉取消息进行消费。
    消息处理:消费者接收消息并进行处理。
    确认消息:消息处理成功后,消费者向 Broker 发送 ACK 确认。
  3. 消息重试机制
    如果消费者在处理消息过程中出现异常或失败,没有发送 ACK 确认,RocketMQ 会认为该消息未被成功消费,并会进行重试。重试机制确保消息不会丢失,但可能会出现消息重复消费的情况。为此,消费者需要实现幂等性处理。

此时的阿强已经没有一开始接收这个任务时轻松的心态,他的脑海不断浮现各种知识内容,去思考可能出现场景。大约过了2小时,阿强的眼睛闪过一丝光亮,他连忙打开了天剑部署系统去查看最后一次L服务部署时间,不多时,阿强深沉的眼神中闪过一丝兴奋,嘴角露出了一丝笑容。但是为了确认自己的猜想,他回顾了一下rocketmq消费的存储方式、消费模式:

  1. 消费进度的存储方式
    RocketMQ 支持两种消费进度(offset)的存储方式:
    Broker 端存储:消费进度保存在 Broker 上,消费者重启后会从 Broker 获取最新的消费进度。
    消费者本地存储:消费进度保存在本地磁盘,消费者重启后会从本地磁盘读取消费进度。
    如果消费进度存储在 Broker 上,那么即使消费者重新部署,重新启动后也会从 Broker 获取最新的消费进度,避免消息重复消费的情况。
  2. 消费模式
    RocketMQ 支持两种消费模式:
    集群消费(Clustering):同一个消费组内的多个消费者实例会均摊消息,每条消息只会被其中一个消费者实例消费。当某个消费者实例重启时,其他实例会接管它的消费任务。
    广播消费(Broadcasting):每个消费者实例都会消费所有的消息。当某个消费者实例重启时,重新启动后会重新消费所有未确认的消息。

再结合L项目的消费模式,真相大白了,L项目当时由于有人在重启项目,消息还未ack,机器重启导致的消息重复消费。知道原因的阿强呼出一大口浊气,脸上的眉目也放松下来。接下来,就只需怎么处理这种特殊场景。
在这里插入图片描述
怎么处理这种问题,阿强心里已经有了方案,第一种方案是让F服务做幂等处理;第二种方案是在L服务的mq消息消费逻辑里面做一个幂等处理。最终阿强结合本次任务选择了第二种方案,他用idea
打开了L系统的代码,并进行了一个改造:

//下面是L服务消息消费的伪代码
public MsgStatus onMessage(ConsumeMessage msg){
		log.info("【xxx异步处理】接收MQ消息:{}", message);
		if (StringUtils.isBlank(message)) {
            return MsgStatus.SUCCEED;
        }
        Entity entity = dataConversion();//数据转换
        RedisLock lock = RedisClientManagement.createLock(entity.getUserId,entity.getApplyNo());
        try {
        	if (lock.blockAcquireLock(RedisTimeOut.FIVE_MINUTE,RedisTimeOut.SECOUND)) {
		        LOrderEntity  LOrderEntity = LOrderRepository.selectByLOrderId(entity.getLOrderId());
		        //幂等校验,但是在某些场景下没有用,如本案例中
				if(StringUtils.isNotEmpty(LOrderEntity.getTaskId()))){
					log.warn("【xxx异步处理】mq重复发送消息");
					return MsgStatus.SUCCEED;
				}
				//构建请求入参
				FOrderReq req = buildReq(LOrderEntity,msg);
				RpcResponse res = FRpc.createOrder(req);
				//构建更新字段实体
				LOrderEntity  LOrderEntity1  = buildLOrderField(res.getTaskId(),res.getStates());
				LOrderRepository.updateOrder(LOrderEntity1);
        	}
        }else{
          log.info("Existing lock key = {}",key);
        }
        }catch (Exception e) {
           log.error("消费异常,不重新消费",e);
           return MsgStatus.SUCCEED;
        } finally {
            lock.releaseLock();
        }

        return MsgStatus.SUCCEED;
    }

当阿强把改完的代码提交完,恋恋不舍地看了看美好世界,一会后又开始进行回到洞府内修炼起来…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/803364.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

泛微Ecology8明细表对主表赋值

文章目录 [toc]1.需求及效果1.1 需求1.2 效果2.思路与实现3.结语 1.需求及效果 1.1 需求 在明细表中的项目经理,可以将值赋值给主表中的项目经理来作为审批人员 1.2 效果 在申请人保存或者提交后将明细表中的人名赋值给主表中对应的值2.思路与实现 在通过js测…

【大型实战】企业网络实验(华为核心交换、ESXI7.0vmware虚拟机、DHCP中继、服务端网络及用户端网络配置)

需求 实验 vmware网络配置(企业内部一般为ESXI) 这样服务器虚拟机使用192.168.200.X网段才能与用户侧互通 vmware虚拟机配置(DHCP服务器网络配置) 打开网络管理页面 nmtui重置一下网络连接(重启网卡) …

JAVA @interface自定义注解(自定义注解+环绕通知 记录操作日志)

简介 注解interface是一种在Java代码中添加元数据(metadata)的方式,它可以用于提供程序的额外信息,但本身并不会直接影响程序的执行。注解可以应用于类、方法、字段和其他程序元素,用于提供关于这些元素的额外信息。 …

计算机组成原理 运算器

运算方法和运算器(重点) B二进制(binary), D十进制(decimal), H十六进制(hexadecimal) 纯小数和纯整数表示范围 设机器字长n1位,规定最高位(第n1位)为符号位 纯小数最大范围中的可理解为小数部分全为0的“1”&#…

TCP连接三次握手的过程,为什么是三次,可以是两次或者更多吗?

(1) 三次握手的过程 第一次握手:客户端向服务器发送一个包含SYN (同步序列编号)和初始序列号(ISN)的报文,请求建立连接,客户端进入SYN_SENT (同步已发送)状态。第二次握手:服务器收…

Python酷库之旅-第三方库Pandas(027)

目录 ​一、用法精讲 68、pandas.infer_freq函数 68-1、语法 68-2、参数 68-3、功能 68-4、返回值 68-5、说明 68-6、用法 68-6-1、数据准备 68-6-2、代码示例 68-6-3、结果输出 69、pandas.interval_range函数 69-1、语法 69-2、参数 69-3、功能 69-4、返回值…

Open3D Ransac拟合空间直线

目录 一、概述 1.1实现步骤 1.2优势与局限 二、代码实现 2.1关键代码 2.2完整代码 三、实现效果 前期试读,后续会将博客加入该专栏,欢迎订阅 Open3D点云算法与点云深度学习案例汇总(长期更新)-CSDN博客 一、概述 RANSAC&…

VScode终端和外部终端中文乱码问题

VScode终端和外部终端中文乱码问题 前言VScode终端VScode的第二大特点方法一方法二外部终端(命令为ctrlf5) 总结实现VScode终端和外部终端都能运行可执行文件 心得 前言 如果只想要看解决方案可直接跳转到总结部分,其余的章节只是用来说明原…

解决C#读取US7ASCII字符集oracle数据库的中文乱码

👨 作者简介:大家好,我是Taro,全栈领域创作者 ✒️ 个人主页:唐璜Taro 🚀 支持我:点赞👍📝 评论 ⭐️收藏 文章目录 前言一、解决方法二、安装System.Data.OleDb连接库三…

第7章 模块(2)

目录 7.3 插入和删除模块 7.3.1 模块的表示 7.3.2 依赖关系和引用 7.3.3 模块的二进制结构 7.3.4 插入模块 7.3.5 移除模块 本专栏文章将有70篇左右,欢迎关注,查看后续文章。 7.3 插入和删除模块 两个系统调用: init_module&#xff1…

考研数学二战,怎么准备才能提升大?

一战70多...二战提升空间那是相当的大 我身边很多一战甚至不到60,二战成绩飙到120的,真的很猛 所以你根本不用担心是自己学数学没天赋,其实知识方法没用对而已 本人属于基础很差相当于是零基础的23考研党,经过一年备考成功上岸…

k8s集群 安装配置 Prometheus+grafana+alertmanager

k8s集群 安装配置 Prometheusgrafanaalertmanager k8s环境如下:机器规划: node-exporter组件安装和配置安装node-exporter通过node-exporter采集数据显示192.168.40.180主机cpu的使用情况显示192.168.40.180主机负载使用情况 Prometheus server安装和配置…

JayChou周杰伦的歌曲网易云音乐怎么听

听Jay自由 网易云导入 专辑介绍 周杰伦(Jay Chou)是一位著名的台湾流行歌手、词曲创作人和演员。他以其独特的音乐风格和才华横溢的创作能力而闻名于世。以下是对周杰伦所有专辑的简要介绍: 《Jay》(2000年)&#xf…

独立开发者系列(26)——域名与解析

域名(英语:Domain Name),又称网域,是由一串用点分隔的名字组成的互联网上某一台计算机或计算机组的名称,用于在数据传输时对计算机的定位标识(有时也指地理位置)。 由于IP地址不方便…

Leaflet集成wheelnav在WebGIS中的应用

目录 前言 一、两种错误的实现方式 1、组件不展示 2、意外中的空白 二、不同样式的集成 1、在leaflet中集成wheelnav 2、给marker绑定默认组件 2、面对象绑定组件 3、如何自定义样式 三、总结 前言 在之前的博客中,我们曾经介绍了使用wheelnav.js构建酷炫…

Flink底层原理解析:案例解析(第37天)

系列文章目录 一、flink架构 二、Flink底层原理解析 三、Flink应用场景解析 四、fink入门案例解析 文章目录 系列文章目录前言一、flink架构1. 作业管理器(JobManager)2. 资源管理器(ResourceManager)3. 任务管理器(Ta…

【八股系列】CSS盒模型:掌握网页布局的核心

🎉 博客主页:【剑九 六千里-CSDN博客】 🎨 上一篇文章:【Vue中的<keep-alive>组件:深入解析与实践指南】 🎠 系列专栏:【面试题-八股系列】 💖 感谢大家点赞&…

夏日狂欢水上漂流的爆笑奇遇记

【夏日狂欢,水上漂流的爆笑奇遇记 —— 月亮姐姐的“睫毛漂流记”】在这个炎炎夏日,当烈日炙烤着大地,每一寸空气弥漫着对清凉的渴望时,一场别开生面的“暑期嘉年华”正悄然掀起一场水上狂欢的浪潮。而在这场盛宴中,月…

FPGA实训报告DAY 1(Verilog HDL)

实习日志与总结 日期:2024 年 7 月 10 日 星期三 姓名:XXX 一、实习日志 上午 9:00 - 9:30 按时到达工位,参加部门早会,了解了今天的实习任务和目标,即初步学习 FPGA 简介和 Verilog 基础语法知识。 9:30 - 10:30…

springboot 集成minio,启动报错

springboot 集成 minio 8.5.10 报错 *************************** APPLICATION FAILED TO START *************************** Description: An attempt was made to call a method that does not exist. The attempt was made from the following location: io.minio.S3Base.…