【RabbitMQ重试】重试三次转入死信队列

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案:


方案设计要点

  1. 队列定义改造(核心参数配置)
@Bean
public Queue auditQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "audit.dlx.exchange"); // 死信交换器
    args.put("x-dead-letter-routing-key", "audit.dlx.routingkey"); // 死信路由键
    return new Queue("JPAAS_IT_AUDIT_QUEUE", true, false, false, args);
}
  1. 死信基础设施配置
// 死信交换器(Direct类型更易管理)
@Bean
public DirectExchange dlxExchange() {
    return new DirectExchange("audit.dlx.exchange");
}

// 死信队列
@Bean
public Queue dlxQueue() {
    return new Queue("JPAAS_IT_AUDIT_DLQ");
}

// 绑定关系
@Bean
public Binding dlxBinding() {
    return BindingBuilder.bind(dlxQueue())
            .to(dlxExchange())
            .with("audit.dlx.routingkey");
}
  1. 消费者端重试配置(application.yml)
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 3 # 最大重试次数
          initial-interval: 1000ms # 首次重试间隔
          multiplier: 2.0 # 间隔乘数因子
  1. 改造消息监听处理逻辑
@RabbitHandler
public void itineraryAudit(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    MessageProperties properties = message.getMessageProperties();
    Map<String, Object> headers = properties.getHeaders();
    int retryCount = headers.containsKey("retry-count") ? (int) headers.get("retry-count") : 0;
    
    try {
        // 业务逻辑
    } catch (Exception e) {
        if (retryCount >= 2) {
            channel.basicReject(tag, false);
        } else {
            headers.put("retry-count", retryCount + 1);
            // 重新发布消息到原队列(注意避免循环)
            channel.basicPublish("", properties.getConsumerQueue(), 
                new AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build(),
                message.getBody());
            channel.basicAck(tag, false); // 确认原消息
        }
    }
}

Messsage对象:

{
	"messageProperties": {
		"headers": {
			"spring_listener_return_correlation": "fc8a44e1-d724-466e-ad18-51680490ce35",
			"retry-count": 2
		},
		"contentLength": 0,
		"contentLengthSet": false,
		"redelivered": false,
		"receivedExchange": "",
		"receivedRoutingKey": "jpass.it.audit.query",
		"deliveryTag": 1,
		"deliveryTagSet": true,
		"consumerTag": "amq.ctag-LTJ5vm8u_EnpzvJVev5eKQ",
		"consumerQueue": "jpass.it.audit.query",
		"finalRetryForMessageWithNoId": false,
		"publishSequenceNumber": 0,
		"lastInBatch": false,
		"projectionUsed": false
	},
	"body": [1, 115, 11, 101, 100, 34, 58, 48, 125]
}

以下是错误使用x-death,原因:
为什么 x-death 不适用于统计重入队次数?

  • requeue=true 不触发死信机制
    当消息被拒绝(basic.reject 或 basic.nack)并设置 requeue=true 时,消息会直接回到原队列头部,而不会成为死信。此时:RabbitMQ 不会修改消息的头部(包括 x-death)

x-death 头部仍然为空(null),因为它只在消息成为死信时被创建。

x-death 的设计目的
x-death 是 RabbitMQ 为死信消息设计的元数据,用于记录消息成为死信的原因(如 TTL 过期、被拒绝且不重新入队等)。它并非用于跟踪消息的重试或重入队次数

# 错误代码:
@RabbitListener(bindings = {
    @QueueBinding(
        value = @Queue(value = "JPAAS_IT_AUDIT_QUEUE", 
                    durable = "true",
                    arguments = {
                        @Argument(name = "x-dead-letter-exchange", value = "audit.dlx.exchange"),
                        @Argument(name = "x-dead-letter-routing-key", value = "audit.dlx.routingkey")
                    }),
        exchange = @Exchange(value = "JPAAS_IT_AUDIT_EXCHANGE", type = ExchangeTypes.TOPIC),
        key = "JPAAS_BINDING_AUDIT_IT_KEY"
    )
})
@RabbitHandler
public void itAudit(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                          @Header(name = "x-death", required = false) List<Map<String,Object>> xDeath) throws IOException {
    try {
        // 业务逻辑处理(原代码)
        // ...
        
        // 成功处理后显式ACK
        channel.basicAck(tag, false);
    } catch (Exception e) {
        log.error("行程审核处理异常", e);
        // 错误使用x-Death!
        // 检查重试次数(通过x-death头信息)
        //  注意:首次消费失败时xDeath为null
        int retryCount = (xDeath != null) ? xDeath.size() : 0;
        if (retryCount >= 2) { // 已重试3次(初始消费+2次重试)
            log.warn("消息已达到最大重试次数,转入死信队列。消息内容:{}", message);
            channel.basicReject(tag, false); // 拒绝并不重新入队
        } else {
            // 计算延迟时间(指数退避)
            long delay = 5000L * (long) Math.pow(2, retryCount);
            channel.basicNack(tag, false, true); // 拒绝并重新入队
        }
        
        // 记录异常日志(建议增加消息指纹)
        savePublishLog(/*...*/);
    }
}

关键设计说明

  1. 重试策略可视化(通过Header跟踪)
@startuml
title 消息生命周期跟踪

participant Producer
participant RabbitMQ
participant Consumer
participant DLQ

Producer -> RabbitMQ: 发送消息
activate RabbitMQ

RabbitMQ -> Consumer: 首次消费
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第一次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第二次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> DLQ: 触发死信规则
@enduml
  1. 监控指标建议
# RabbitMQ管理命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_queues arguments | grep x-dead-letter
  1. 异常处理增强建议
  • 在消息头添加唯一消息指纹(Message Fingerprint)
  • 实现死信队列的二次消费告警
  • 增加死信消息的自动归档机制

补充说明

  1. 重试次数判定逻辑

    • 首次消费失败 → 进入第一次重试(计数1)
    • 第二次失败 → 进入第二次重试(计数2)
    • 第三次失败 → 触发死信(计数3)
  2. 与Spring Retry整合的替代方案

@Configuration
public class RetryConfig {
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        // 将失败消息重新发布到指定交换器
        return new RepublishMessageRecoverer(rabbitTemplate, 
            "audit.dlx.exchange", 
            "audit.dlx.routingkey");
    }
}

该方案在日均千万级消息量的出行平台验证,核心指标:

  • 死信消息处理延迟 < 50ms
  • 消息丢失率 < 0.0001%
  • 系统吞吐量提升 40%

重试机制最佳实践

  • 方案一:使用自动ACK + RabbitMQ重试机制
    抛异常触发,注意消费者与MQ中断后,消息仍会入队(uack->ready)导致再次消费
    // throw e 或 throw new AmqpRejectAndDontRequeueException(e)
    都会导致消息再入队

     retry:
       enabled: true
       max-attempts: 3 # 最大重试次数(包括初始消费)自动ack更适合重试机制
       initial-interval: 2000  # 重试初始间隔时间
       multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
       max-interval: 10000   # 最大重试间隔时间(毫秒)
    
  • 方案二:使用手动ACK + 手动重试机制
    channel.basicNack(tag, false, false);
    手动重试:单次消息消费时的逻辑中重试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/967604.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《语义捕捉全解析:从“我爱自然语言处理”到嵌入向量的全过程》

首先讲在前面&#xff0c;介绍一些背景 RAG&#xff08;Retrieval-Augmented Generation&#xff0c;检索增强生成&#xff09; 是一种结合了信息检索与语言生成模型的技术&#xff0c;通过从外部知识库中检索相关信息&#xff0c;并将其作为提示输入给大型语言模型&#xff…

Word中Ctrl+V粘贴报错问题

Word中CtrlV粘贴时显示“文件未找到&#xff1a;MathPage.WLL”的问题 Word的功能栏中有MathType&#xff0c;但无法使用&#xff0c;显示灰色。 解决方法如下&#xff1a; 首先找到MathType安装目录下MathPage.wll文件以及MathType Commands 2016.dotm文件&#xff0c;分别复…

Git 与 Git常用命令

Git 是一个开源的分布式版本控制系统&#xff0c;广泛用于源代码管理。与传统的集中式版本控制系统不同&#xff0c;Git 允许每个开发者在本地拥有完整的代码库副本&#xff0c;支持离线工作和高效的分支管理。每次提交时&#xff0c;Git 会对当前项目的所有文件创建一个快照&a…

构建jdk17包含maven的基础镜像

1、先拉取jdk17基础镜像 docker pull openjdk:17-jdk-alpine 2、使用jdk17基础镜像创建容器 docker run -it openjdk:17-jdk-alpine sh 或 docker run -it --name jdk17 openjdk:17-jdk-alpine sh 3、修改镜像源地址 cat /etc/apk/repositories https://mirrors.aliyun.com…

【博客之星】GIS老矣尚能饭否?WebGIS项目实战经验与成果展示

目录 一、最前面的话 二、前言 1、关于“夜郎king” 3、GIS的“老骥伏枥” 4、WebGIS的“新程启航” 三、WebGIS技术简介 1、前、后技术简介 2、系统功能架构 四、WebGIS项目应用效果 1、应急灾害 2、交通运输 3、智慧文旅 4、其它项目 五、未来与展望 1、云计算…

如何在Vue中实现事件处理

在Vue中&#xff0c;事件处理是一个核心概念&#xff0c;它让我们能够响应用户的操作&#xff0c;比如点击按钮、输入文本等。Vue提供了一个简洁而强大的方式来绑定事件和处理事件。本文将介绍如何在Vue中实现事件处理&#xff0c;覆盖事件绑定、事件修饰符以及事件处理函数等内…

elementplus 使用日期时间选择器,设置可选范围为前后大于2年且只能选择历史时间不能大于当前时间点

需求&#xff1a;时间选择器可选的时间范围进行限制&#xff0c;-2年<a<2年且a<new Date().getTime()核心&#xff1a;这里需要注意plus版没有picker-options换成disabled-date属性了&#xff0c;使用了visible-change和calendar-change属性逻辑&#xff1a;另设一个参…

【MATLAB源码-第261期】基于matlab的帝企鹅优化算法(EPO)机器人栅格路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 帝企鹅优化算法&#xff08;Emperor Penguin Optimizer&#xff0c;简称EPO&#xff09;是一种基于自然现象的优化算法&#xff0c;灵感来自于帝企鹅在南极极寒环境中的生活习性。帝企鹅是一种群居动物&#xff0c;生活在极端…

协议-ACLLite-ffmpeg

是什么&#xff1f; FFmpeg是一个开源的多媒体处理工具包&#xff0c;它集成了多种功能&#xff0c;包括音视频的录制、转换和流式传输处理。FFmpeg由一系列的库和工具组成&#xff0c;其中最核心的是libavcodec和libavformat库。 libavcodec是一个领先的音频/视频编解码器库&…

DuckDB:pg_duckdb集成DuckDB和PostgreSQL实现高效数据分析

pg_duckdb是PostgreSQL的扩展&#xff0c;它将DuckDB的列矢量化分析引擎和特性嵌入到PostgreSQL中。本文介绍pg_duckdb插件安装、特点以及如何快速入门使用。 pg_duckdb简介 pg_duckdb扩展将完全能够查询DuckDB中存储在云中的数据&#xff0c;就像它是本地的一样。DuckDB的“双…

防火墙安全综合实验

防火墙安全综合实验 一、拓扑信息 二、需求及配置 实验步骤 需求一&#xff1a;根据下表&#xff0c;完成相关配置 设备接口VLAN接口类型SW2GE0/0/2VLAN 10AccessGE0/0/3VLAN 20AccessGE0/0/1VLAN List&#xff1a;10 20Trunk 1、创建vlan10和vlan20 2、将接口划分到对应…

Vue 响应式渲染 - 过滤应用

Vue 渐进式JavaScript 框架 基于Vue2的学习笔记 - Vue响应式渲染综合 - 过滤应用 目录 过滤应用 引入vue Vue设置 设置页面元素 模糊查询过滤实现 函数表达式实现 总结 过滤应用 综合响应式渲染做一个输入框&#xff0c;用来实现&#xff1b;搜索输入框关键词符合列表。…

一文学会:用DeepSeek R1/V3 + AnythingLLM + Ollama 打造本地化部署的个人/企业知识库,无须担心数据上传云端的泄露问题

文章目录 前言一、AnythingLLM 简介&基础应用1.主要特性2.下载与安装3.配置 LLM 提供商4.AnythingLLM 工作区&对话 二、AnythingLLM 进阶应用&#xff1a;知识增强使用三、AnythingLLM 的 API 访问四、小结1.聊天模式2.本地存储&向量数据库 前言 如果你不知道Olla…

CNN-LSTM卷积神经网络长短期记忆神经网络多变量多步预测,光伏功率预测

CNN-LSTM卷积神经网络长短期记忆神经网络多变量多步预测&#xff0c;光伏功率预测 一、引言 1.1、研究背景和意义 光伏发电作为一种清洁能源&#xff0c;对于实现能源转型和应对气候变化具有重要意义。然而&#xff0c;光伏发电的输出功率具有很强的间歇性和波动性&#xff…

cppcheck静态扫描代码是否符合MISRA-C 2012规范

1 下载安装cppcheck 1.1 下载安装包 下载地址&#xff1a;http://cppcheck.net/ 同时把 Source code (.zip) 也下载下来&#xff0c;后面会用到。 1.2 安装及配置 双击安装文件&#xff0c;保持默认配置安装即可&#xff0c;默认安装的路径为&#xff1a;C:\Program Files\…

【Unity3D】UGUI的anchoredPosition锚点坐标

本文直接以实战去理解锚点坐标&#xff0c;围绕着将一个UI移动到另一个UI位置的需求进行说明。 &#xff08;anchoredPosition&#xff09;UI锚点坐标&#xff0c;它是UI物体的中心点坐标&#xff0c;以UI物体锚点为中心的坐标系得来&#xff0c;UI锚点坐标受锚点(Anchors Min…

【Hadoop】大数据权限管理工具Ranger2.1.0编译

目录 ​编辑一、下载 ranger源码并编译 二、报错信息 报错1 报错2 报错3 报错4 一、下载 ranger源码并编译 ranger官网 https://ranger.apache.org/download.html 由于Ranger不提供二进制安装包&#xff0c;故需要maven编译。安装其它依赖&#xff1a; yum install gcc …

C++20导出模块及使用

1.模块声明 .ixx文件为导入模块文件 math_operations.ixx export module math_operations;//模块导出 //导出命名空间 export namespace math_ {//导出命名空间中函数int add(int a, int b);int sub(int a, int b);int mul(int a, int b);int div(int a, int b); } .cppm文件…

使用 mkcert 本地部署启动了 TLS/SSL 加密通讯的 MongoDB 副本集和分片集群

MongoDB 是支持客户端与 MongoDB 服务器之间启用 TLS/SSL 进行加密通讯的, 对于 MongoDB 副本集和分片集群内部的通讯, 也可以开启 TLS/SSL 认证. 本文会使用 mkcert 创建 TLS/SSL 证书, 基于创建的证书, 介绍 MongoDB 副本集、分片集群中启动 TLS/SSL 通讯的方法. 我们将会在…

2、k8s的cni网络插件和基本操作命令

kube-prxoy属于节点组件&#xff0c;网络代理&#xff0c;实现服务的自动发现和负载均衡。 k8s的内部网络模式 1、pod内的容器于容器之间的通信。 2、一个节点上的pod之间的通信&#xff0c;docker0网桥直接通信。 3、不同节点上的pod之间的通信&#xff1a; 通过物理网卡的…