RabbitMQ之死信队列

文章目录

  • 一、死信的概念
  • 二、死信的来源
  • 三、实战
    • 1、消息 TTL 过期
    • 2、队列达到最大长度
    • 3、消息被拒
  • 总结


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

二、死信的来源

  • 消息 TTL 过期

  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)

  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

三、实战

在这里插入图片描述

1、消息 TTL 过期

生产者代码:

public class Producer {
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
 	public static void main(String[] argv) throws Exception {
 		try (Channel channel = RabbitMqUtils.getChannel()) {
 			channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 			//设置消息的 TTL 时间
 			AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
 			//该信息是用作演示队列个数限制
 			for (int i = 1; i <11 ; i++) {
 				String message="info"+i;
 				channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
 				System.out.println("生产者发送消息:"+message);
 			}
 		}
 	}
}

消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
 	//普通交换机名称
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
 	//死信交换机名称
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		//声明死信和普通交换机 类型为 direct
 		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		//声明死信队列
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		//死信队列绑定死信交换机与 routingkey
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		//正常队列绑定死信队列信息
 		Map<String, Object> params = new HashMap<>();
 		//正常队列设置死信交换机 参数 key 是固定值
 		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
 		//正常队列设置死信 routing-key 参数 key 是固定值
 		params.put("x-dead-letter-routing-key", "lisi");
 
 		String normalQueue = "normal-queue";
 		channel.queueDeclare(normalQueue, false, false, false, params);
 		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
 		System.out.println("等待接收消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			System.out.println("Consumer01 接收到消息"+message);
 		};
 		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
 		});
 	}
}

在这里插入图片描述
消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		System.out.println("等待接收死信队列消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			System.out.println("Consumer02 接收死信队列的消息" + message);
	 	};
 		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
 		});
 	}
}

在这里插入图片描述

2、队列达到最大长度

消息生产者代码去掉 TTL 属性

public class Producer {
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
	public static void main(String[] argv) throws Exception {
 		try (Channel channel = RabbitMqUtils.getChannel()) {
 			channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 			//该信息是用作演示队列个数限制
 			for (int i = 1; i <11 ; i++) {
 				String message="info"+i;
 				channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
 				System.out.println("生产者发送消息:"+message);
 			}
 		}
 	}
}

C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
 	//普通交换机名称
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
 	//死信交换机名称
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		//声明死信和普通交换机 类型为 direct
 		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		//声明死信队列
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		//死信队列绑定死信交换机与 routingkey
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		//正常队列绑定死信队列信息
 		Map<String, Object> params = new HashMap<>();
 		//正常队列设置死信交换机 参数 key 是固定值
 		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
 		//正常队列设置死信 routing-key 参数 key 是固定值
 		params.put("x-dead-letter-routing-key", "lisi");
 		//设置正常队列长度限制
 		params.put("x-max-length",6);
 		
 		String normalQueue = "normal-queue";
 		channel.queueDeclare(normalQueue, false, false, false, params);
 		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
 		System.out.println("等待接收消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			System.out.println("Consumer01 接收到消息"+message);
 		};
 		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
 		});
 	}
}

注意此时需要把原先队列删除 因为参数改变了
C2 消费者代码

public class Consumer02 {
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		System.out.println("等待接收死信队列消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			System.out.println("Consumer02 接收死信队列的消息" + message);
	 	};
 		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
 		});
 	}
}

在这里插入图片描述

3、消息被拒

消息生产者代码

public class Producer {
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
	public static void main(String[] argv) throws Exception {
 		try (Channel channel = RabbitMqUtils.getChannel()) {
 			channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 			//该信息是用作演示队列个数限制
 			for (int i = 1; i <11 ; i++) {
 				String message="info"+i;
 				channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
 				System.out.println("生产者发送消息:"+message);
 			}
 		}
 	}
}

C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
 	//普通交换机名称
 	private static final String NORMAL_EXCHANGE = "normal_exchange";
 	//死信交换机名称
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		//声明死信和普通交换机 类型为 direct
 		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		//声明死信队列
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		//死信队列绑定死信交换机与 routingkey
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		//正常队列绑定死信队列信息
 		Map<String, Object> params = new HashMap<>();
 		//正常队列设置死信交换机 参数 key 是固定值
 		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
 		//正常队列设置死信 routing-key 参数 key 是固定值
 		params.put("x-dead-letter-routing-key", "lisi");
 		String normalQueue = "normal-queue";
 		channel.queueDeclare(normalQueue, false, false, false, params);
 		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
 		System.out.println("等待接收消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			if(message.equals("info5")){
 				System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
 				//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
 				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
 			}else {
 				System.out.println("Consumer01 接收到消息"+message);
 				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 			}
 		};
 		boolean autoAck = false;
 		channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
 		});
 	}
}

在这里插入图片描述
C2 消费者代码

public class Consumer02 {
 	private static final String DEAD_EXCHANGE = "dead_exchange";
 	public static void main(String[] argv) throws Exception {
 		Channel channel = RabbitUtils.getChannel();
 		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
 		String deadQueue = "dead-queue";
 		channel.queueDeclare(deadQueue, false, false, false, null);
 		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
 		System.out.println("等待接收死信队列消息.....");
 		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 			String message = new String(delivery.getBody(), "UTF-8");
 			System.out.println("Consumer02 接收死信队列的消息" + message);
	 	};
 		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
 		});
 	}
}

启动消费者 1 然后再启动消费者 2

在这里插入图片描述


总结

以上就是RabbitMQ之死信队列的相关知识点,希望对你有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/151447.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

转载:YOLOv8改进全新Inner-IoU损失函数:扩展到其他SIoU、CIoU等主流损失函数,带辅助边界框的损失

0、摘要 随着检测器的快速发展&#xff0c;边界框回归&#xff08;BBR&#xff09;损失函数不断进行更新和优化。然而&#xff0c;现有的 IoU 基于 BBR 仍然集中在通过添加新损失项来加速收敛&#xff0c;忽略了 IoU 损失项本身的局限性。尽管从理论上讲&#xff0c;IoU 损失可…

Android10 手势导航

种类 Android10 默认的系统导航有三种&#xff1a; 1.两个按钮的 2.三个按钮的 3.手势 它们分别对应三个包名 frameworks/base/packages/overlays/NavigationBarMode2ButtonOverlay frameworks/base/packages/overlays/NavigationBarMode3ButtonOverlay frameworks/base/packa…

怎么恢复删除的数据? 8个有效的数据恢复方法

无论您在保存备份方面多么小心&#xff0c;灾难有时还是会发生。有时您的备份软件无法按预期运行。 如果您的外部驱动器靠近您的设备&#xff0c;发生火灾/洪水/故障时&#xff0c;有时备份会与原始文件一起丢失。即使是云存储也不能避免故障。 还有一个事实是&#xff0c;我…

C#检查服务状态,以及进行服务启停

1. linux环境 linux环境通过执行bash命令直接执行&#xff1a; public string RunCmdLinux(string cmd){var proc new Process();System.Console.Write($"Run Linux cmd > [{cmd}] START!");proc.StartInfo.CreateNoWindow true;proc.StartInfo.FileName &…

TOUGH系列软件教程

查看原文>>>全流程TOUGH系列软件实践技术应用 TOUGH系列软件是由美国劳伦斯伯克利实验室开发的&#xff0c;旨在解决非饱和带中地下水、热运移的通用模拟软件。和传统地下水模拟软件Feflow和Modflow不同&#xff0c;TOUGH系列软件采用模块化设计和有限积分差网格剖分…

『C++成长记』C++入门——内联函数

&#x1f525;博客主页&#xff1a;小王又困了 &#x1f4da;系列专栏&#xff1a;C &#x1f31f;人之为学&#xff0c;不日近则日退 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、内联函数 &#x1f4d2;1.1内联函数的概念 &#x1f4d2;1.2内联函数的特征 …

js中的instance,isPrototype和getPrototypeOf的使用,来判断类的关系

&#x1f601; 作者简介&#xff1a;一名大四的学生&#xff0c;致力学习前端开发技术 ⭐️个人主页&#xff1a;夜宵饽饽的主页 ❔ 系列专栏&#xff1a;JavaScript小贴士 &#x1f450;学习格言&#xff1a;成功不是终点&#xff0c;失败也并非末日&#xff0c;最重要的是继续…

网络类型及数据链路层的协议

网络类型 --- 根据数据链路层使用的协议来进行划分的。 MA网络 --- 多点接入网络 BMA --- 广播型多点接入网络---以太网协议 NBMA --- 非广播型多点接入网络 以太网协议 --- 需要使用mac地址对不同的主机设备进行区分和标识 --- 以太网之所以需要使用mac地址进行数据寻址&…

基于SSM的校园停车场管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

Adobe Illustrator——原创设计的宝藏软件

今天&#xff0c;我们来谈谈一款在Adobe系列中曾经多次给大家都提到的原创性极强的设计理念丰富的矢量图形编辑软件——Adobe Illustrator。 Adobe Illustrator&#xff0c;其定位是一款与Photoshop相类似对矢量图形进行编辑的软件。 Adobe Illustrator&#xff0c;作为全球最著…

未来10年,NAND 与DRAM依然是存储主角

根据Yole Group调查机构的数据显示&#xff0c;在2022年独立记忆体&#xff08;Stand-alone Memory&#xff09;整体市场达到了1440亿美元。其中DRAM占比55.4%&#xff0c;NAND占比40.8%。剩下的NOR、(NV)SRAM/FRAM、EEPROM、新型非易失存储(PCM, ReRAM and STT-MRAM)等占比3.8…

2023.11.14 关于 Spring Boot 创建和使用

目录 Spring Boot Spring Boot 项目的创建 网页版创建 Spring Boot 项目 Spring Boot 目录说明 项目运行 Spring Boot Spring Boot 是基于 Spring 设计的一个全新的框架&#xff0c;其目的是用来简化 Spring 的应用、初始搭建、开发的整个过程Spring Boot 就是一个整合了…

C语言入门这一篇就够了(入门篇2)

接上篇C语言入门这一篇就够了&#xff08;入门篇1&#xff09; 今天主要讲解基本语法&#xff0c;话不多说&#xff0c;直接上干货。 C语言语法有哪些 数据类型&#xff1a;C语言有多种数据类型&#xff0c;包括整数类型&#xff08;如 int、long&#xff09;、浮点类型&#x…

移动机器人路径规划(二)--- 图搜索基础,Dijkstra,A*,JPS

目录 1 图搜索基础 1.1 机器人规划的配置空间 Configuration Space 1.2 图搜索算法的基本概念 1.3 启发式的搜索算法 Heuristic search 2 A* Dijkstra算法 2.1 Dijkstra算法 2.2 A*&&Weighted A*算法 2.3 A* 算法的工程实践中的应用 3 JPS 1 图搜索基础 1.1…

Mysql中的JDBC编程

JDBC编程 1.JDBC的数据库编程2.JDBC工作原理3.JDBC使用3.1JDBC开发案例3.2JDBC使用步骤总结 4.JDBC API4.1数据库连接Connection4.2 Statement对象4.3 ResultSet对象4.4 释放 5.Java代码操作数据库 1.JDBC的数据库编程 JDBC&#xff0c;即Java Database Connectivity&#xff0…

20 - 欲知JVM调优先了解JVM内存模型

从今天开始&#xff0c;我将和你一起探讨 Java 虚拟机&#xff08;JVM&#xff09;的性能调优。JVM 算是面试中的高频问题了&#xff0c;通常情况下总会有人问到&#xff1a;请你讲解下 JVM 的内存模型&#xff0c;JVM 的性能调优做过吗&#xff1f; 1、为什么 JVM 在 Java 中…

springboot--单元测试

单元测试 前言1、写测试要用的类2、写测试要用的类3、运行测试类4、spring-boot-starter-test默认提供了以下库4.1 junit54.1.1 DisplayName:为测试类或者测试方法设置展示名称4.1.2 BeforeAll&#xff1a;所有测试方法运行之前先运行这个4.1.3 BeforeEach&#xff1a;每个测试…

编码自动化:使用MybatisX初体验,太爽了!

使用Mybatis当前最火的插件&#xff1a;MybatisX。 在IDEA中安装MyBatisX插件。 该插件主要功能如下&#xff1a; 生成mapper xml文件 快速从代码跳转到mapper及从mapper返回代码 mybatis自动补全及语法错误提示 集成mybatis Generate GUI界面 根据数据库注解&#xff0c;…

win11无损关闭系统更新

1、窗口键R&#xff0c;打开运行窗口&#xff0c;输入regedit。 2、打开地址&#xff1a;计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings 3、新建DWORD&#xff08;32位&#xff09;值(D)&#xff0c;重命名“FlightSettingsMaxPauseDays” 4、…

MacBook投屏到安卓电视的操作步骤,用网页浏览器也能投屏

如果你想将苹果电脑投屏到家里的安卓大电视上&#xff0c;AirDroid Cast给你提供两种方法&#xff0c;其中一种就是大家都熟悉的AirPlay&#xff0c;AirPlay有个限制&#xff0c;需要连接同一个网络才可以投屏&#xff0c;所以AirPlay适应本地投屏。如果你需要远程投屏&#xf…