RabbitMQ详解-06RabbitMQ高级

1. 过期时间TTL

可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息自动被删除。RabbitMQ可以对消息和队列设置TTL。有以下两种设置方法:

  • 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 对消息进行单独设置,每条消息TTL可以不同。

若两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

1.1. 设置队列TTL

配置类中设置:

args.put("x-message-ttl",5000);
return QueueBuilder.durable(ITEM_QUEUE).withArguments(args).build();

参数 x-message-ttl 的值必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前消息将最多只存活 6 秒钟。

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
在这里插入图片描述

1.2. 设置消息TTL

在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:

/**
     * 过期消息
     * 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
     */
    @Test
    public void ttlMessageTest(){
        MessageProperties messageProperties = new MessageProperties();
        //设置消息的过期时间,5秒
        messageProperties.setExpiration("5000");Message message = new Message("测试过期消息,5秒钟过期".getBytes(), messageProperties);
        //路由键与队列同名
        rabbitTemplate.convertAndSend("my_ttl_queue", message);
    }

expiration 字段以毫秒为单位表示 TTL 值。且与 x-message-ttl 具有相同的约束条件。因为 expiration 字段必须为字符串类型,broker 将只会接受以字符串形式表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。

2. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也称为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

2.1消息TTL过期

2.1.1演示

生产者:

public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel()) {
				channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
				//设置消息的 TTL 时间
				AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
				//该信息是用作演示队列个数限制
				for (int i = 1; i <11 ; i++) {
					String message="info"+i;
					channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties, message.getBytes());
					System.out.println("生产者发送消息:"+message);
				}
			}
		} 
	}
}

消费者1代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");

		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println("Consumer01 接收到消息"+message);
		};
		channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述
消费者2代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		System.out.println("等待接收死信队列消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println("Consumer02 接收死信队列的消息" + message);
		};
		channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述

2.1.2流程

具体因为队列消息过期而被投递到死信队列的流程:
在这里插入图片描述

2.2 队列达到最大长度

2.2.1演示

生产者:

public class Producer {
	private static final String NORMAL_EXCHANGE = "normal_exchange";
		public static void main(String[] argv) throws Exception {
			try (Channel channel = RabbitMqUtils.getChannel()) {
				channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
				//该信息是用作演示队列个数限制
				for (int i = 1; i <11 ; i++) {
					String message="info"+i;
					channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
					System.out.println("生产者发送消息:"+message);
				}
			}
		} 
	}
}

消费者1修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)
在这里插入图片描述
注意此时需要把原先队列删除 因为参数改变了
消费者2代码不变(启动 C2 消费者)
在这里插入图片描述

2.2.2流程

消息超过队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。

2.3消息被拒

消息生产者代码同上生产者一致

消费者1代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
	//普通交换机名称
	private static final String NORMAL_EXCHANGE = "normal_exchange";
	//死信交换机名称
	private static final String DEAD_EXCHANGE = "dead_exchange";
	public static void main(String[] argv) throws Exception {
		Channel channel = RabbitUtils.getChannel();
		//声明死信和普通交换机 类型为 direct
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		//声明死信队列
		String deadQueue = "dead-queue";
		channel.queueDeclare(deadQueue, false, false, false, null);
		//死信队列绑定死信交换机与 routingkey
		channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
		//正常队列绑定死信队列信息
		Map<String, Object> params = new HashMap<>();
		//正常队列设置死信交换机 参数 key 是固定值
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//正常队列设置死信 routing-key 参数 key 是固定值
		params.put("x-dead-letter-routing-key", "lisi");
		String normalQueue = "normal-queue";
		channel.queueDeclare(normalQueue, false, false, false, params);
		channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
		System.out.println("等待接收消息.....");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), "UTF-8");
			if(message.equals("info5")){
				System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
				//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
			}else {
				System.out.println("Consumer01 接收到消息"+message);
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		};
		boolean autoAck = false;
		channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
		});
	}
}

在这里插入图片描述
消费者代码不变
启动消费者 1 然后再启动消费者 2
在这里插入图片描述

3. 延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:
在这里插入图片描述
在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。

延迟队列的应用场景;如:

  • 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理。
  • 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理。

4. 消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

4.1 发布确认

有两种方式:消息发送成功确认和消息发送失败回调。

4.1.1消息发送成功确认

在配置文件当中需要添加:

spring.rabbitmq.publisher-confirm-type=correlated

⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
经测试有两种效果:
其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

消息确认回调方法:

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息确认成功....");
        } else {
            //处理丢失的消息
            System.out.println("消息确认失败," + cause);
        }
    }
}

发送消息:

@Test
    public void queueTest(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
    }

管理界面确认消息发送成功:
在这里插入图片描述
消息确认回调:
在这里插入图片描述

4.1.2消息发送失败回调

消息失败回调方法:

@Component
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	//rabbitTemplate 注入之后就设置该值
	@PostConstruct
	private void init() {
		rabbitTemplate.setConfirmCallback(this);
		/**
		* true:
		* 交换机无法将消息进行路由时,会将该消息返回给生产者
		* false:
		* 如果发现消息无法进行路由,则直接丢弃
		*/
		rabbitTemplate.setMandatory(true);
		//设置回退消息交给谁处理
		rabbitTemplate.setReturnCallback(this);
	}

    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        String msgJson  = new String(message.getBody());
        System.out.println("Returned Message:"+msgJson);
    }
}

模拟消息发送失败:

@Test
public void testFailQueueTest() throws InterruptedException {
    //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
    amqpTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
}

在这里插入图片描述

4.2 事务支持

场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/725974.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

省市区下拉选择:3个el-select(附完整代码+json)

目录 直接上做出的效果&#xff1a; 页面代码&#xff1a; 使用click.native&#xff1a; data及引入&#xff1a; 初始化&#xff1a; methods&#xff1a; JSON: 示例结构&#xff1a; 1.code.json 2.pca-code.json 回显&#xff1a; 视频效果&#xff1a; 直接上做出…

5个好用的中文AI大语言模型_中文大语言模型

AI大语言模型&#xff08;Large Language Models, LLMs&#xff09;是近1-2年来人工智能领域的重要发展&#xff0c;它们通过深度学习技术&#xff0c;特别是基于Transformer的架构&#xff08;如GPT、BERT等&#xff09;&#xff0c;实现了对自然语言处理的巨大突破。 AI大语…

Vulkan入门系列2- 绘制三角形

概述&#xff1a; Vulkan的学习曲线是比较陡峭的&#xff0c;学习Vulkan刚开始像是在爬一个陡坡&#xff0c;等上了这个陡坡之后&#xff0c;后面学习曲线就相对比较平缓了。那么在Vulkan中绘制一个三角形&#xff0c;就相当于是在爬这样一个陡坡&#xff0c;因为绘制三角形需…

「51媒体」时尚类媒体邀约宣发资源

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 时尚类媒体邀约宣发资源可以多样化且针对性地满足品牌或活动的推广需求。以下是一些主要的资源及其特点&#xff1a; 时尚杂志&#xff1a;国内外知名时尚杂志&#xff0c;如《Vogue》、…

SparkSQL的分布式执行引擎-Thrift服务:学习总结(第七天)

系列文章目录 SparkSQL的分布式执行引擎 1、启动Thrift服务 2、beeline连接Thrift服务 3、开发工具连接Thrift服务 4、控制台编写SQL代码 文章目录 系列文章目录前言一、SparkSQL的分布式执行引擎(了解)1、启动Thrift服务2、beeline连接Thrift服务3、开发工具连接Thrift服务4、…

MS3121地隔离放大器

MS3121 是一款应用于车载音频系统的地隔离放大 器。芯片可以很好地解决汽车音频系统中的绕线电阻问 题&#xff0c;以及由车载电子设备带来的噪声问题。另外&#xff0c;芯片 所需要的外围电容小&#xff0c;便于系统的集成。注意&#xff0c;芯片的 地电位需要和后级音频功…

Flutter第十四弹 抽屉菜单效果

目标&#xff1a; 1.怎么构建抽屉菜单效果&#xff1f; 2.抽屉菜单怎么定制&#xff1f; 一、抽屉菜单 侧滑抽屉菜单效果 1.1 抽屉菜单入口 Flutter 的脚手架Scaffold&#xff0c;默认提供了抽屉菜单效果入口。 主页面采用一个简单的页面&#xff0c;侧滑菜单首先使用一个I…

ARP地址解析协议详解:

ARP&#xff1a;地址解析协议 – 以下3种ARP正常均只能在同一个广播域内使用 AARP 正向ARP 已知对端IP地址&#xff0c;通过广播来获取对端的MAC地址 RARP 反向ARP 已知对端的MAC地址&#xff0c;通过二层单播、三层广播来获取对端的IP地址 FARP 无故ARP 在设备刚获取…

电商API接口详述:涵盖订单、库存等多功能接口介绍

电商商家自研管理系统&#xff0c;线下ERP系统或WMS系统想要接入电商平台订单打单发货&#xff0c;通过点三电商API可以一键对接多个电商平台&#xff0c;帮助商家、ERP/WMS服务商快速开发电商模块&#xff0c;实现电商业务管理功能&#xff0c;那么点三电商API接口有哪些可用接…

HTTP 抓包工具——Fiddler项目实战

网络爬虫实质上是模拟浏览器向 Web 服务器发送请求。对于一些简单的网络请求&#xff0c;我们 可以通过查看 URL 地址来构造请求&#xff0c;但对于一些稍复杂的网络请求&#xff0c;仍然通过观察 URL 地 址将无法构造正确。因此我们需要对这些复杂的网络请求进行捕获分…

一文带你理清同源和跨域

1、概述 前后端数据交互经常会碰到请求跨域&#xff0c;什么是跨域&#xff0c;为什么需要跨域&#xff0c;以及常用有哪几种跨域方式&#xff0c;这是本文要探讨的内容。 同源策略(英文全称 Same origin policy)是浏览器提供的一个安全功能。同源策略限制了从同一个源加载的…

协同编辑:只是在线协作这么简单吗?揭秘协同编辑的深层价值

经常很多朋友咨询&#xff0c;无忧企业文档是否支持协同编辑&#xff0c;首先肯定是支持的。但是&#xff0c;我发现很多人对于“协同编辑”的理解可能比较表面&#xff0c;仅仅停留在多人同时编辑一份文档的层面。实际上&#xff0c;协同编辑的功能远不止于此&#xff0c;它更…

Stable Diffusion 设计 Logo 成品惊艳,比起人类手工设计的有什么不足之处?

Stable Diffusion不仅可以创作出精美的绘画作品&#xff0c;还能通过简单的prompt生成logo图案&#xff0c;并进一步衍生出更多的视觉海报和banner。 checkpoint ReV Animated ReV Animated - v1.2.2-EOL | Stable Diffusion Checkpoint | Civitai 这是我个人最喜欢的 2.5/3…

云平台DNS故障导致网站访问卡顿异常排查过程,wireshark、strace等工具在实际问题排查过程中的应用方法

一、问题现象 项目上使用华为私有云&#xff0c;前段时间华为升级云平台后&#xff0c;云上用户反馈业务系统出现卡顿&#xff0c;之前几秒可以刷新出来的页面现在需要几十秒。提供了一个比较明显的url和curl调用方法。 10.213.x.xxx:8082/files/login curl -H "Content-…

【Java学习笔记】异常处理

生活中我们在使用一些产品的时候&#xff0c;经常会碰到一些异常情况。例如&#xff0c;使用ATM机取钱的时&#xff0c;机器会突然出现故障导致无法完成正常的取钱业务&#xff0c;甚至吞卡&#xff1b;在乘坐地铁时&#xff0c;地铁出现异常无法按时启动和运行&#xff1b;使用…

电脑怎么卸载软件?多个方法合集(2024年新版)

在电脑的日常使用中&#xff0c;我们经常需要安装各种软件来满足不同的需求&#xff0c;但随着时间的推移&#xff0c;可能会出现一些软件不再需要或需要更换的情况。此时&#xff0c;及时从电脑上卸载这些不必要的软件是非常重要的。它不仅可以释放硬盘空间&#xff0c;还可以…

第二证券股市资讯:股票中什么叫龙头?

龙头&#xff0c;也就是龙头股&#xff0c;指的是某一职业中有必定影响力和号召力的股票&#xff0c;龙头股的涨跌通常对其他同职业板块股票的涨跌有必定演示和引导作用&#xff0c;是一种风向标一般的存在。龙头股的技能面表现和成交量都会比同时刻的大盘和地块要强。 具体分…

【尚庭公寓SpringBoot + Vue 项目实战】移动端项目初始化(十九)

【尚庭公寓SpringBoot Vue 项目实战】移动端项目初始化&#xff08;十九&#xff09; 文章目录 【尚庭公寓SpringBoot Vue 项目实战】移动端项目初始化&#xff08;十九&#xff09;1、 SpringBoot配置2、Mybatis-Plus配置3、Knife4j配置4、导入基础代码5、导入接口定义代码6…

Python语言修改控制台输出文字的颜色和背景颜色

Python语言修改控制台输出文字的颜色和背景颜色 格式显示模式字体颜色背景颜色文字加效果显示类 格式 \033[显示模式;字体颜色;背景颜色m 显示模式 显示模式格式将文本颜色和背景颜色重置为默认值&#xff0c;取消所有其他文本属性\033[0m高亮&#xff08;加粗&#xff09;\03…

华为OD机试 - 部门人力分配 - 二分查找(Java 2024 D卷 200分)

华为OD机试 2024D卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;D卷C卷A卷B卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测…