死信队列理解与使用

一、简介

在rabbitMQ中常用的交换机有三种,直连交换机、广播交换机、主题交换机;

直连交换机中队列与交换机需要约定好routingKey去进行绑定;

广播交换机并不需要routingKey绑定,只需队列与交换机绑定即可;

主题交换机最大的特点可以通过*和#去匹配队列;

而死信队列其实就是平常的队列的一种,通常我会使用直连交换机来作为死信队列;所以说,死信队列其实就是我们在处理业务中慢慢衍生出来的一个名词、一种方案;它和普通的队列是一样的。

二、使用场景

        我们知道在使用队列时有几种应答模式,比如自动应答(auto)、手动应答(manual)等,而在使用自动应答时,无论消息是否成功消费,达到重试次数后就会自动的把此消息给删除掉了,当然我们是不想把没有消费成功的消息给删除掉的。而开启手动应答时,配置的重试机制会失效 当有消费失败的消息时 会进入死循环。

        那么为了解决此场景,就引入了死信队列。当有不能正常消费的消息时 就把此消息给打到死信队列中,然后再根据实际情况去处理此信息。

关于自动应答和手动应答可参考这篇博客: 

rabbitMQ手动应答与自动应答_骑着蜗牛打天下的博客-CSDN博客

 

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时。 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。

3. 消息的 TTL 过期时。

4. 消息队列达到最大长度。

三、代码实现

父pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.1</version>
<!--        <version>2.2.5.RELEASE</version>-->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.chensir</groupId>
    <artifactId>spring-boot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq</name>

    <properties>
        <java.version>8</java.version>
        <hutool.version>5.8.3</hutool.version>
        <lombok.version>1.18.24</lombok.version>
    </properties>

    <description>spring-boot-rabbitmq</description>

    <packaging>pom</packaging>

    <modules>
        <module>direct-exchange</module>
        <module>fanout-exchange</module>
        <module>topic-exchange</module>
        <module>game-exchange</module>
        <module>dead-letter-queue</module>
        <module>delay-queue</module>
        <module>delay-queue2</module>
    </modules>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>${hutool.version}</version>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>


</project>

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.chensir</groupId>
        <artifactId>spring-boot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>dead-letter-queue</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置文件

server.port=8084
#host
spring.rabbitmq.host=121.40.100.66
#默认5672
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#连接到代理时用的虚拟主机
spring.rabbitmq.virtual-host=/
#每个消费者每次可最大处理的nack消息数量
spring.rabbitmq.listener.simple.prefetch=1
#表示消息确认方式,其有三种配置方式,分别是none、manual(手动)和auto(自动);默认auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#监听重试是否可用
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#最大重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
#第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.multiplier=2
#决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.default-requeue-rejected=false

config

正常队列config

package com.chensir.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange directExchange(){
        return  new DirectExchange("DirectExchange",true,false);
    }

    @Bean
    public Queue directQueueLong(){
        return   QueueBuilder.durable("DirectQueue")
                .deadLetterExchange("DeadLetterExchange")
                .deadLetterRoutingKey("dead")
                //20s还没消费就打到死信队列中
                .ttl(20000)
                //当队列中长度有500个消息,也打入死信队列
                .maxLength(500)
                .build();
    }

    @Bean
    public Binding binding(){
        return  BindingBuilder.bind(directQueueLong()).to(directExchange()).with("direct123");
    }
}

死信队列config

package com.chensir.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 死信队列 一般由运维在rebbitMQ服务创建交换机和队列 不需要代码配置
 */
//@Configuration
public class DeadLetterConfig {
    @Bean
    public MessageConverter messageConverter(){

        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("DeadLetterExchange");
        return directExchange;
    }

    @Bean
    public Queue queue() {
        Queue deadLetterQueue = QueueBuilder.durable("DeadLetterQueue").build();
        return deadLetterQueue;
    }

    @Bean
    public Binding binding() {
        Binding binding = BindingBuilder.bind(queue()).to(directExchange()).with("dead");
        return binding;
    }


}

生产者

package com.chensir.provider;

import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class DirectProvider {

    @Resource
    private RabbitTemplate rabbitTemplate;


    public  void  send(){

        // 死信队列
//        rabbitTemplate.convertAndSend("DeadLetterExchange", "dead","123");

        for (int i=1;i<7;i++){
            OrderIngOk orderIngOk = new OrderIngOk();
            orderIngOk.setOrderNo("202308289687-"+i);
            orderIngOk.setId(i);
            orderIngOk.setUserName("倪海杉");
//            String s = JSONUtil.toJsonStr(orderIngOk);
            rabbitTemplate.convertAndSend("DirectExchange", "direct123",orderIngOk);
        }


    }

}

消费者

正常队列消费者

package com.chensir.consumer;

import cn.hutool.json.JSONUtil;
import com.chensir.model.OrderIngOk;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DirectConsumer {

    @RabbitHandler
    @RabbitListener(queues = "DirectQueue" )
    public void  process(OrderIngOk orderIngOk) throws IOException {

        try {
           // 处理业务开始
            if(orderIngOk.getId().equals(5)){

                int a =  0;
                int b= 2/a;
            }
            System.out.println("接受到消息,并正常处理结束"+ JSONUtil.toJsonStr(orderIngOk));
        } catch (Exception ex){
            System.out.println(ex.getMessage());
            System.out.println("接受到消息,发生异常"+ JSONUtil.toJsonStr(orderIngOk));
            //自动应答 当消费者成功消费消息时会自动把消息删除,而没有成功消费消息时需要给重试机制抛出个异常 重试机制才会开启重试
            throw ex;
            //手动模式
            //channel.basicReject(deliveryTag,true);
            //channel.basicNack(deliveryTag,false,true);
        }
    }
}

死信队列消费者

package com.chensir.consumer;

import com.chensir.model.OrderIngOk;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadConsumer {
    @RabbitHandler
    @RabbitListener(queues = "DeadLetterQueue")
    public void  process(OrderIngOk orderIngOk)  {

        System.out.println("这条信息在运行时发生了未知的异常,此信息被打到了死信队列,被死信队列消费者消费成功"+orderIngOk);
    }
}

结果

接受到消息,并正常处理结束{"id":1,"OrderNo":"202308289687-1","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":2,"OrderNo":"202308289687-2","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":3,"OrderNo":"202308289687-3","userName":"倪海杉"}
接受到消息,并正常处理结束{"id":4,"OrderNo":"202308289687-4","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
/ by zero
接受到消息,发生异常{"id":5,"OrderNo":"202308289687-5","userName":"倪海杉"}
2023-08-28 16:45:39.848  WARN 24432 --- [ntContainer#1-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@1a6e663a(byte[58])' MessageProperties [headers={__TypeId__=com.chensir.model.OrderIngOk}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DirectExchange, receivedRoutingKey=direct123, deliveryTag=5, consumerTag=amq.ctag-f9Up1UES-F3rDvb-AK16xw, consumerQueue=DirectQueue])


这条信息在运行时发生了未知的异常,此信息被打到了死信队列,被死信队列消费者消费成功OrderIngOk(id=5, OrderNo=202308289687-5, userName=倪海杉)
接受到消息,并正常处理结束{"id":6,"OrderNo":"202308289687-6","userName":"倪海杉"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/94356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

monorepo更新组件报错,提示“无法加载文件 C:\Program Files\nodejs\pnpm.ps1,因为在此系统上禁止运行脚本”

解决方法&#xff1a; 第一步&#xff1a;管理员身份运行 window.powershell&#xff0c; win x打开powerShell命令框&#xff0c;进入到对应项目路径。 第二步&#xff1a;执行&#xff1a;get-ExecutionPolicy&#xff0c;显示Restricted&#xff0c;表示状态是禁止的; 第…

TCP协议的重点知识点

TCP协议的重点知识点 TCP(传输控制协议)是一种面向连接、可靠的数据传输协议,工作在传输层,提供可靠的字节流服务。它是互联网协议栈中最重要、最复杂的协议之一,也是面试中常被问到的知识点。本文将详细介绍TCP协议的各个重要概念。 TCP基本特性 TCP主要具有以下基本特性: …

如何用 QGIS 下载高清天地图影像机,同时解决下载质量差的问题!

使用 QGIS 我们可以获得下面这种图像,既有大范围,又有更高的细节(地图级别),基本上把整个苏州市中心城区的建筑物都囊括进去了。 还可以下载大范围、高清晰度的各种在线卫星底图服务的影像,比如大面积的哨兵2影像,但国外的服务器一般都很烂,不可能是电信、移动的问题,…

python爬虫12:实战4

python爬虫12&#xff1a;实战4 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生不好…

QGIS 如何添加天地图

相信很多小伙伴在 QGIS 里面添加天地图的时候一定感觉很困惑,按照官网的操作申请 Key 之后,添加相对应的服务地址之后看不到地图或者地图不正常显示,今天我们就来解决这个问题 以下所有操作基于 QGIS 3.22 版本 申请 Key 1. 添加天地图的第一步需要申请 Key,首先要注册天…

Git基础教程-常用命令整理:学会Git使用方法和错误解决

目录 一、了解Git的基本概念 二、Git的安装和配置 Git的安装 Git的配置 用户信息 文本编辑器 差异分析工具 查看配置信息 三、Git的基本操作 基本原理 基本操作命令 基本操作示例 场景一&#xff1a;创建新仓库 场景二&#xff1a;拉取并编辑远程仓库 四、常见问…

MySQL之事务与引擎

目录 一、事物 1、事务的概念 2、事务的ACID特点 3、事务之间的相互影响 4、Mysql及事务隔离级别(四种) 5、演示 1、查询会话事务隔离级别 2、查询会话事务隔离级别 3、设置全局事务隔离级别 4、设置会话事务隔离级别 6、事务控制语句 7、演示 1、测试提交事务 2、测试事…

countDown+react+hook

道阻且长&#xff0c;行而不辍&#xff0c;未来可期 知识点一&#xff1a; new Date().getTime()可以得到得到1970年01月1日0点零分以来的毫秒数。单位是毫秒 new Date().getTime()/1000获取秒数1分钟60秒&#xff0c;1小时60分钟1hour:60*60>单位是秒 60*60*1000>单位…

Java的成员类可以被private修饰

说明 Java的成员类可以被private修饰&#xff0c;但外部类、局部类不能被private修饰。 示例 成员类用private修饰—允许 下面代码中的成员类Class2 被private修饰&#xff0c;是允许的&#xff1a; package com.thb;public class Parent {public class Class1 { }private…

ChatGPT Prompting开发实战(一)

一、关于ChatGPT Prompting概述 当我们使用ChatGPT或者调用OpenAI的API时&#xff0c;就是在使用prompt进行交互&#xff0c;用户在对话过程中输入的一切信息都是prompt&#xff08;提示词&#xff09;&#xff0c;当然工业级的prompt与人们通常理解的prompt可能不太一样。下面…

基于java swing和mysql实现的仓库商品管理系统(源码+数据库+运行指导视频)

一、项目简介 本项目是一套基于java swing和mysql实现的仓库商品管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经…

新SDK平台下载开源全志V853的SDK

获取SDK SDK 使用 Repo 工具管理&#xff0c;拉取 SDK 需要配置安装 Repo 工具。 Repo is a tool built on top of Git. Repo helps manage many Git repositories, does the uploads to revision control systems, and automates parts of the development workflow. Repo is…

若依vue打印的简单方法

像我们后端程序员做前端的话,有时候真不需要知道什么原理,直接塞就好了 我们选用基于hiprint 的vue-plugin-hiprint来打印 目的是为了实现点击某些行的数据,然后点击某个按钮直接弹出下面的打印 此链接 大佬是原创,我拿来总结梳理一下 插件进阶功能请移步: 链接 插件模板制作页…

Leetcode每日一题:1267. 统计参与通信的服务器(2023.8.24 C++)

目录 1267. 统计参与通信的服务器 题目描述&#xff1a; 实现代码与解析&#xff1a; 写法一&#xff1a;两次遍历 hash 原理思路&#xff1a; 写法二&#xff1a;三次遍历 原理思路&#xff1a; 1267. 统计参与通信的服务器 题目描述&#xff1a; 这里有一幅服务器分…

三维模型OBJ格式轻量化压缩并行计算处理方法浅析

三维模型OBJ格式轻量化压缩并行计算处理方法浅析 三维模型的轻量化是指通过一系列技术和算法来减小三维模型的文件大小&#xff0c;以提高模型在计算机中的加载、渲染和传输效率。并行计算是利用多个计算单元同时执行任务&#xff0c;以加速计算过程的一种技术。在三维模型的O…

基于Spring Boot的软件缺陷追踪系统的设计与实现(Java+spring boot+MySQL)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于Spring Boot的软件缺陷追踪系统的设计与实现&#xff08;Javaspring bootMySQL&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java spri…

【数据分析】统计量

1. 均值、众数描述数据的集中趋势度量&#xff0c;四分位差、极差描述数据的离散程度。 2. 标准差、四分位差、异众比率度量离散程度&#xff0c;协方差是度量相关性。 期望值分别为E[X]与E[Y]的两个实随机变量X与Y之间的协方差Cov(X,Y)定义为&#xff1a; 从直观上来看&…

ppt转pdf免费的工具哪个好用?ppt在线转pdf的方法分享

在工作和学习中&#xff0c;将PPT文件转换为PDF格式具有重要意义。PDF文件的大小较小&#xff0c;适用于各种平台和设备&#xff0c;保持了原始文件的内容和格式&#xff0c;具有广泛的可读性和兼容性。那么小编就来为大家详细地说一说“ppt转pdf免费的工具哪个好用?ppt在线转…

US-DAS1、US-P2A单路及双路插头式比例放大器

US-P1、US-P2A、US-P2F插头式安装比例放大器控制不带电反馈的双路比例电磁铁的比例阀&#xff0c;如直动式或先导式比例方向阀的驱动控制。 工作电源24VDC标准&#xff1b; 兼容指令10V、4-20mA、0~10V、0~5V(电位器控制&#xff09;&#xff1b; 输出电流0~2A&#xff1b; …

JavaScript—面向对象、作用域

C#&#xff1a;从类继承 js&#xff1a;从对象继承 什么叫继承&#xff1f; 模板&#xff08;类&#xff09; 原型继承&#xff08;实体&#xff09; 有一个对象存在&#xff0c;构造函数设置原型为这个对象 创建出来的对象就继承与这个对象&#xff08;从对象那里继承&am…