RabbitMQ死信队列

RabbitMQ死信队列

1、过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被

删除。RabbitMQ可以对消息和队列设置TTL,目前有两种方法可以设置:

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。

  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者TTL较小的那个数值为准。消息在队列的生存时间一旦超

过设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再收到该消息。

1.1 设置队列TTL

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-boot-rabbitmq-ttl</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-ttl</name>
    <description>spring-boot-rabbitmq-ttl</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置类

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {

    // 1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    // 2.队列的过期时间
    @Bean
    public Queue directttlQueue() {
        //设置过期时间
        Map<String, Object> args = new HashMap<>();
        //这里一定是int类型
        args.put("x-message-ttl", 5000);
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    @Bean
    public Binding ttlBingding() {
        return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");
    }
}

Service

package com.example.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder() {
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "ttl_direct_exchange";
        String routingKey = "ttl";
        // 队列中会产生一条消息并且5秒钟后会消失
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
}

启动类

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(RabbitmqDemoApplication.class, args);
	}

}

配置文件

# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030

测试

package com.example;

import com.example.service.OrderService;
import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {

    @Autowired
    private OrderService orderService;

    @Test
    public void test() {
        orderService.makeOrder();
    }

}
订单生产成功:8a965457-330b-4e2b-9087-a40cbfdee033

在这里插入图片描述

在这里插入图片描述

1.2 设置消息TTL

配置类

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLRabbitMQConfiguration1 {

    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttlMessageDirectExchange() {
        return new DirectExchange("ttl_message_direct_exchange", true, false);
    }

    @Bean
    public Queue directttlMessageQueue() {
        return new Queue("ttl.message.direct.queue", true, false, false);
    }

    @Bean
    public Binding ttlMessageBingding() {
        return BindingBuilder.bind(directttlMessageQueue()).to(ttlMessageDirectExchange()).with("ttlmessage");
    }
}

Service

package com.example.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class OrderService1 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder() {
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "ttl_message_direct_exchange";
        String routingKey = "ttlmessage";
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            public Message postProcessMessage(Message message) {
                //这里就是字符串
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);
    }
}

测试类

package com.example;

import com.example.service.OrderService1;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest1 {

    @Autowired
    private OrderService1 orderService1;

    @Test
    public void test1() {
        orderService1.makeOrder();
    }

}
订单生产成功:9eb9e120-1379-4e54-bc43-1944b3c22713

在这里插入图片描述

在这里插入图片描述

2、死信队列

DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变

成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。消

息变成死信,可能是由于以下原因:

  • 消息被拒绝

  • 消息过期

  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队

列的属性,当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由

到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可。

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-boot-rabbitmq-dlx</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq-dlx</name>
    <description>spring-boot-rabbitmq-dlx</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置类

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadRabbitMqConfiguration {

    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange deadDirect() {
        return new DirectExchange("dead_direct_exchange", true, false);
    }

    //2.队列的过期时间
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }

    @Bean
    public Binding deadbinds() {
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
}
package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {

    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    //2.队列的过期时间
    @Bean
    public Queue directttlQueue() {
        //设置过期时间
        Map<String, Object> args = new HashMap<>();
        // ttl队列最大可以接受5条消息,超过的条数也是会被移入死信队列,过期之后依然会被移入死信队列
        // args.put("x-max-length",5);
        // 这里一定是int类型
        args.put("x-message-ttl", 5000);
        // 死信队列的交换机
        args.put("x-dead-letter-exchange", "dead_direct_exchange");
        // fanout不需要配置
        // 路由
        args.put("x-dead-letter-routing-key", "dead");
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    @Bean
    public Binding ttlBingding() {
        return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");
    }


}

Service

package com.example.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟用户下单
    public void makeOrder() {
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:" + orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "ttl_direct_exchange";
        String routingKey = "ttl";
        // 队列中会产生一条消息并且5秒钟后会消失
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
}

启动类

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(RabbitmqDemoApplication.class, args);
	}

}

配置文件

# RabbitMQ基本配置
# RabbitMQ的主机地址(默认为:localhost)
spring.rabbitmq.host=localhost
# 指定该用户要连接到的虚拟host端(注:如果不指定,那么默认虚拟host为“/”)
spring.rabbitmq.virtual-host = /
# amqp协议端口号:5672; 集群端口号:25672;http端口号:15672;
spring.rabbitmq.port=5672
# 登录到RabbitMQ的用户名、密码
spring.rabbitmq.username=zsx242030
spring.rabbitmq.password=zsx242030

测试类

package com.example;

import com.example.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqDemoApplication.class)
@RunWith(SpringRunner.class)
public class MessageProducerTest {

    @Autowired
    private OrderService orderService;

    @Test
    public void test() {
        orderService.makeOrder();
    }

}
订单生产成功:74adb096-734c-4484-8947-032ef1ee7c5d

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/91540.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

#systemverilog# 之 event region 和 timeslot 仿真调度(六)疑惑寄存器采样吗

一 象征性啰嗦 想必大家在刚开始尝试写Verilig HDL代码的时候,都是参考一些列参考代码,有些来自于参考书,有些来自于网上大牛的笔记,甚至有写来自于某宝FPGA开发板的授权代码。我还记得自己当时第一次写代码,参考的是一款Altera 芯片,结合Quartus 开发软件, 在上面练习…

【Go Web 篇】Go 语言进行 Web 开发:构建高性能网络应用

随着互联网的快速发展&#xff0c;Web 开发已经成为了软件开发领域中不可或缺的一部分。随之而来的是对于更高性能、更高效的网络应用的需求。在这个领域&#xff0c;Go 语言因其并发性能、简洁的语法以及丰富的标准库而备受关注。本篇博客将深入探讨如何使用 Go 语言进行 Web …

requests模板成功下载,但是不能在pycharm中运行

在做实验的过程中&#xff0c;需要用到requests&#xff0c;但是在pycharm中成功下载&#xff0c;仍然无法使用&#xff0c;找了很久&#xff0c;解决方法如下&#xff1a; 进入win中的命令提示符 下载requests模块 pip install requests输入python显示你的python的基本信息&…

分布式数据库架构:高可用、高性能的数据存储

在现代信息时代&#xff0c;数据是企业发展的核心。为了支持海量数据的存储、高并发访问以及保证数据的可靠性&#xff0c;分布式数据库架构应运而生。分布式数据库架构是一种将数据存储在多个物理节点上&#xff0c;并通过一系列复杂的协调和管理机制来提供高可用性和高性能的…

[Linux]文件IO

文章目录 1. 文件描述符1.1 虚拟地址空间1.1.1 存在的意义1.1.2 分区 1.2 文件描述符1.2.1 文件描述符1.2.2 文件描述符表 2. Linux系统文件IO2.1 open/close2.1.1 函数原型2.1.2 close函数原型2.1.3 打开已存在文件2.1.4 创建新文件2.1.5 文件状态判断 2.2 read/write2.2.1 re…

【Go Web 篇】从零开始:构建最简单的 Go 语言 Web 服务器

随着互联网的迅速发展&#xff0c;Web 服务器成为了连接世界的关键组件之一。而在现代编程语言中&#xff0c;Go 语言因其卓越的性能和并发能力而备受青睐。本篇博客将带你从零开始&#xff0c;一步步构建最简单的 Go 语言 Web 服务器&#xff0c;让你对 Go 语言的 Web 开发能力…

【UniApp开发小程序】私聊功能后端实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】

声明 本文提炼于个人练手项目&#xff0c;其中的实现逻辑不一定标准&#xff0c;实现思路没有参考权威的文档和教程&#xff0c;仅为个人思考得出&#xff0c;因此可能存在较多本人未考虑到的情况和漏洞&#xff0c;因此仅供参考&#xff0c;如果大家觉得有问题&#xff0c;恳…

vue关闭弹窗刷新父页面 this.$refs

代码截图 主页面 弹出框页面 接这一篇文章后续 参考链接

【C++】AVL树(高度平衡二叉树)

AVL树 概念AVL树节点定义AVL树节点插入AVL树四种旋转情况左单旋右单旋先左单旋再右单旋先右单旋后左单旋 元素的插入及控制平衡判断最后节点是否平衡 概念 二叉搜索树虽然可以缩短查找的效率&#xff0c;但如果数据有序或者接近有序二叉搜索树将退化为单支树&#xff0c;查找元…

视频云存储/安防监控EasyCVR视频汇聚平台接入GB国标设备时,无法显示通道信息该如何解决?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

语言模型(language model)

文章目录 引言1. 什么是语言模型2. 语言模型的主要用途2.1 言模型-语音识别2.2 语言模型-手写识别2.3 语言模型-输入法 3. 语言模型的分类4. N-gram语言模型4.1 N-gram语言模型-平滑方法4.2 ngram代码4.3 语言模型的评价指标4.4 两类语言模型的对比 5. 神经网络语言模型6. 语言…

百度工程师浅析解码策略

作者 | Jane 导读 生成式模型的解码方法主要有2类&#xff1a;确定性方法&#xff08;如贪心搜索和波束搜索&#xff09;和随机方法。确定性方法生成的文本通常会不够自然&#xff0c;可能存在重复或过于简单的表达。而随机方法在解码过程中引入了随机性&#xff0c;以便生成更…

改进YOLO系列:9.添加S2Attention注意力机制

添加S2Attention注意力机制 1. S2Attention注意力机制论文2. S2Attention注意力机制原理3. S2Attention注意力机制的配置3.1common.py配置3.2yolo.py配置3.3yaml文件配置1. S2Attention注意力机制论文 论文题目:S 2 -MLPV2: IMPROVED SPATIAL-SHIFT MLP ARCHITECTURE…

Unity 之 GameObject.Find()在场景中查找指定名称的游戏对象

文章目录 GameObject.Find 是 Unity 中的一个函数&#xff0c;用于在场景中查找指定名称的游戏对象。这个函数的主要作用是根据游戏对象的名称来查找并返回一个引用&#xff0c;使您能够在代码中操作该对象。以下是有关 GameObject.Find 的详细介绍&#xff1a; 函数签名&…

SpringBoot简单上手

spring boot 是spring快速开发脚手架&#xff0c;通过约定大于配置&#xff0c;优化了混乱的依赖管理&#xff0c;和复杂的配置&#xff0c;让我们用java-jar方式,运行启动java web项目 入门案例 创建工程 先创建一个空的工程 创建一个名为demo_project的项目&#xff0c;并且…

【MySQL系列】表的内连接和外连接学习

「前言」文章内容大致是对MySQL表的内连接和外连接。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、内连接二、外连接2.1 左外连接2.2 右外连接 一、内连接 内连接实际上就是利用where子句对两种表形成的笛卡儿积进行筛选&#xff0c;前面篇章学习的…

Java进阶篇--创建线程的四种方式

目录 继承Thread类 扩展小知识&#xff1a; Thread类的常见方法 Thread 类的静态方法 实现Runnable接口 使用Callable和Future创建线程 使用Executor框架创建线程池 继承Thread类 创建一个继承自Thread类的子类&#xff0c;并重写其run()方法&#xff0c;将相关逻辑实现…

EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks [2022 CVPR]

长期以来&#xff0c;仅使用单视角二维照片集无监督生成高质量多视角一致图像和三维形状一直是一项挑战。现有的三维 GAN 要么计算密集&#xff0c;要么做出的近似值与三维不一致&#xff1b;前者限制了生成图像的质量和分辨率&#xff0c;后者则对多视角一致性和形状质量产生不…

mmdetection基于 PyTorch 的目标检测开源工具箱 入门教程

安装环境 MMDetection 支持在 Linux&#xff0c;Windows 和 macOS 上运行。它需要 Python 3.7 以上&#xff0c;CUDA 9.2 以上和 PyTorch 1.8 及其以上。 1、安装依赖 步骤 0. 从官方网站下载并安装 Miniconda。 步骤 1. 创建并激活一个 conda 环境。 conda create --name…

windows中安装sqlite

1. 下载文件 官网下载地址&#xff1a;https://www.sqlite.org/download.html 下载sqlite-dll-win64-x64-3430000.zip和sqlite-tools-win32-x86-3430000.zip文件&#xff08;32位系统下载sqlite-dll-win32-x86-3430000.zip&#xff09;。 2. 安装过程 解压文件 解压上一步…