第十二章 RabbitMQ之失败消息处理策略

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack
        # 消费者重试机制配置
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false




在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;

import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 异常交换机/队列/消息回收器配置类
 * ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用
 */
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Bean
    Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    Binding errorBind(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer() {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 常规的RabbitMQ 交换机/队列绑定配置类
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    Queue simpleQueue() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("simple.queue").build();
    }
}

2.2.3. 消费者代码

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "simple.queue")
    public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
        throw new Exception();
    }
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
        # 消费者重试机制配置
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false




2.2.5. 生产者代码 

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * 生产者
 */
@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");
    }
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:
  rabbitmq:
    # MQ连接配置
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou





 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/893411.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MiGPT让你的小爱音响更聪明

大家好,我是晓凡。 今天要给大家带来一个超级有趣的开源项目MiGPT。 这个项目,简直就是给小爱音箱装上了超级大脑,让你的小爱音箱更聪明。 想象一下,当小爱音箱接入大模型后,上知天文,下知地理&#xff…

Cuda By Example - 7 (光线追踪)

第6章以实现简单的光线追踪为例子,引入了Constant Memory和性能测量方法。 Constant Memory NVIDIA的硬件提供了64K的constant只读内存。定义constant内存的变量,使用关键字__constant__。从constant内存里读取出来的数据,可以缓存起来&…

星河飞雪计划_day1

安全见闻 编程语句应用介绍 程序介绍 操作系统介绍 操作系统 ios mac Iinux android Windows wince vxworks RT-ThreadWindows、mac0S、i0S和Linux通常被认为是非实时操作系统。 非实时操作系统: 主要致力于在各种情况下提供良好的整体性能、用户体验和多任务处理能力&…

10.13论文阅读

通过联合学习检测和描述关键点增强可变形局部特征 摘要 局部特征提取是计算机视觉中处理图像匹配和检索等关键任务的常用方法。大多数方法的核心理念是图像经历仿射变换,忽略了诸如非刚性形变等更复杂的效果。此外,针对非刚性对应的新兴工作仍然依赖于…

UE4 材质学习笔记06(布料着色器/体积冰着色器)

一.布料着色器 要编写一个着色器首先是看一些参考图片,我们需要找出一些布料特有的特征,下面是一个棉织物,可以看到布料边缘的纤维可以捕捉光线使得边缘看起来更亮 下面是缎子和丝绸的图片,与棉织物有几乎相反的效果,…

docker harbor

文章目录 一,搭建私有仓库1.1下载registry1.2在 daemon.json 中添加私有镜像仓库地址1.3重新加载重启docker1.4运行容器1.5拉取一个centos7镜像1.6给镜像加标签1.7上传镜像1.8显示私有仓库的所有镜像1.8查看私有仓库的 centos 镜像有哪些tag 二,什么是ho…

Matlab中HybridFcn参数的用法

在 MATLAB 中,HybridFcn 参数允许你在全局优化(如遗传算法 ga 或粒子群算法 particleswarm)之后使用局部优化算法进一步微调解的精确度。HybridFcn 通过在全局优化找到的解基础上,进一步调用局部优化器,如 fmincon、pa…

ARM嵌入式学习--第四天

汇编与C混合编程 -汇编指令中调用C语言 .global _start _start:mov r0,#5mov r1,#3bl add stop:b stop int add(int a,int b) {int c a b;return c; } 无优化情况:(反汇编之后,发现多了很多很多指令,运行之后结果是错误的&a…

掌握关键:全面数据分析键盘市场

键盘数据分析 一、市场分析 大盘销售额,销量: 共获取100个品牌每个品牌至多100页的数据,共计3***9个商品及其销量,销售额。 大盘数据 9月份键盘销售额:9***124.58元, 9月份键盘销量:1***0…

遇到“mfc100u.dll丢失”的系统错误要怎么处理?科学修复mfc100u.dll

遇到“mfc100u.dll丢失”的系统错误会非常麻烦,因为mfc100u.dll是Microsoft Visual C 2010 Redistributable Package的重要部分,许多应用程序和游戏在运行时都需要调用这个文件。如果这个文件缺失,可能会导致相关软件或游戏启动失败。面对这种…

操作系统实验三:基于BPF机制的系统跟踪与探测

实验内容 学习BPF机制,了解BCC(BPF Compiler Collection)和bpftarce的实现原理,利用BPF工具实现对系统的跟踪和探测,如跟踪新创建的进程,统计线程占用CPU的时间,统计某内核函数的调用次数&…

98. UE5 GAS RPG 实现技能眩晕效果

我们在技能伤害基类上面设置了对应的负面效果应用的配置项,用来实现技能的负面效果应用。 在之前实现火球术的负面效果时,我们我们在创建火球时,通过伤害基类上的创建技能配置用于后续应用。 在火球攻击到敌人时,通过函数库书写…

电脑技巧:优化Edge浏览器占用C盘空间的解决方案

大家在日常使用电脑的使用,Edge浏览器作为Windows电脑自带的浏览器,使用体验还是非常不错的。对于电脑新手来说直接使用微软自带的Edge浏览器也可以满足使用需求。但是随着电脑使用的越久,整体Edge浏览器也会占用几个G甚至更多的磁盘空间,并且还是C盘。 今天给大家分享如何…

使用Three.js和Force-Directed Graph实现3D知识图谱可视化

先看样式: 在当今信息爆炸的时代,如何有效地组织和展示复杂的知识结构成为一个重要的挑战。3D知识图谱可视化是一种直观、交互性强的方式来呈现知识之间的关系。本文将详细介绍如何使用HTML、JavaScript、Three.js和Force-Directed Graph库来实现一个交互…

【电商购物管理系统】Python+Django网页界面平台+商品管理+数据库

一、介绍 电商购物管理系统,本系统前端使用HTML、CSS、BootStrap等技术搭建前端界面,后端使用Django框架处理用户的逻辑请求。主要功能有: 管理员登录与管理:管理员可以登录后台,对用户和商品进行增删改查的操作。用…

stm32 bootloader写法

bootloader写法: 假设app的起始地址:0x08020000,则bootloader的范围是0x0800,0000~0x0801,FFFF。 #define APP_ADDR 0x08020000 // 应用程序首地址定义 typedef void (*APP_FUNC)(void); // 函数指针类型定义 /*main函数中调用rum_app&#x…

vue + 百度地图GL版判断一个点位是否在地图可视区内

利用BMapGLLib中isPointInRect 因为没有找到官方文档因此直接下载了该工具的源码,复制以下部分到自己的项目中,避免再次引用完整的BMapGLLib脚本 关键方法 isPointInRect(point, bounds) {if (!(point.toString() "Point" || point.toString(…

解锁机器人视觉与人工智能的潜力,从“盲人机器”改造成有视觉能力的机器人(下)

机器视觉产业链全景回顾 视觉引导机器人生态系统或产业链分为三个层次。 上游(供应商) 该机器人视觉系统的上游包括使其得以运行的硬件和软件提供商。硬件提供商提供工业相机、图像采集卡、图像处理器、光源设备(LED)、镜头、光…

英飞达医学影像存档与通信系统 WebUserLogin.asmx 信息泄露漏洞复现

0x01 产品简介 英飞达医学影像存档与通信系统 Picture Archiving and Communication System,它是应用在医院影像科室的系统,主要的任务就是把日常产生的各种医学影像(包括核磁,CT,超声,各种X光机,各种红外仪、显微仪等设备产生的图像)通过各种接口(模拟,DICOM,网络…

93、Python之异常:了解常见的内置异常,遇到不慌

引言 本文接着来聊Python中的异常管理,对于新手来说,一旦看到异常,就会比较慌张。其实,倒不是对异常比较害怕,而是担心不知道该怎么处理这种异常才是比较可怕的。本文就简单列举一下Python中比较常见的异常&#xff0…