RabbitMQ——死信队列和延迟队列

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 14:08
 * @Description
 */
public class Produce {


	private static final String NORMAL_EXCHANGE = "normal_exchange";


	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
        //模拟消息过期 10s
		//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
		for (int i = 0; i < 10; i++) {
			String message = "hello world" + i;
			channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());
		}
	}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 13:50
 * @Description
 */
public class Consumer01 {


	private static final String NORMAL_EXCHANGE = "normal_exchange";
	private static final String DEAD_EXCHANGE = "dead_exchange";

	private static final String NORMAL_QUEUE = "normal_queue";
	private static final String DEAD_QUEUE = "dead_queue";


	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		//声明死信交换机和队列
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
		//绑定
		channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");


		//声明普通交换机和队列
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		//死信配制 指定死信交换机和死信路由键
		Map<String, Object> map = new HashMap<>();
		map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		map.put("x-dead-letter-routing-key", "dead-routing-key");
		//最大长度
		//map.put("x-max-length", 6);
		channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
		channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
			if (message.contains("5")){
				System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");
				//拒绝消息并把消息丢入死信队列
				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
			}else {
				System.out.println("Consumer01接收消息:" + message);
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		};
		channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});
	}
}

死信队列:

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 13:50
 * @Description
 */
public class Consumer02 {
	
	private static final String DEAD_QUEUE = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.basicConsume(DEAD_QUEUE, true,
			(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),
			(consumerTag, e) -> {});
	}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;


@Configuration
public class DelayedQueueConfig {
    //    队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //    交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //    routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    //    声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //    声明交换机 基于插件的交换机
    @Bean
    public CustomExchange delayedExchange() {

        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        /*
         * 1.交换机名称
         * 2.交换机类型
         * 3.是否需要持久化
         * 4.是否需要自动删除
         * 5.其他参数
         * */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }

    //    绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
        //            发送消息的时候 延迟时长
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {
    //监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/465102.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT编程实现简易聊天工具

ChatGPT编程实现简易聊天工具 今天借助[[小蜜蜂]][https://zglg.work]网站的ChatGPT练习socket编程&#xff0c;实现一个简易聊天工具软件。 环境&#xff1a;Pycharm 2021 系统&#xff1a;Mac OS 向ChatGPT输入如下内容&#xff1a; ChatGPT收到后&#xff0c;根据返回结…

企业内部培训考试系统培训计划功能说明

培训计划是预设好的一套课程系列&#xff0c;包含课程和考试&#xff0c;分多个阶段&#xff0c;每完成一个阶段就会在学习地图上留下标记&#xff0c;让用户看到自己的努力成果&#xff0c;增强成就感&#xff0c;从而坚持完成课程。 企业内部培训考试系统中如何设置培训计划…

动态代理原理- JDK动态代理、CGLIB动态代理

概述&#xff1a;在不改变原有功能代码的前提下&#xff0c;能动态的实现方法的增强 JDK动态代理原理&#xff1a; 通过实现接口&#xff0c;获取到接口里面的所有方法通过Proxy创建代理实例通过反射机制&#xff0c;获取到一个一个的方法对象调用InvocationHandler接口中的in…

Python之Web开发中级教程----ubuntu中下载安装Postman

Python之Web开发中级教程----ubuntu中下载安装Postman PostMan 是一款功能强大的网页调试与发送网页 HTTP 请求的 Chrome 插件&#xff0c;可以直接去对我们写出来的路由和视图函数进行调试&#xff0c;作为后端程序员是必须要知道的一个工具。 查看ubuntu系统中是否已经安装了…

Java BIO (同步阻塞型IO) 内容上集

IO简介 一、前言 在java软件设计开发中&#xff0c;通信框架是不可避免的&#xff0c;我们在不同的系统或者这不同的进程之间进行数据交互&#xff0c;或者在高并发的场景下需要用到网络通信相关的技术&#xff0c;从上节课的例子当中我们看出同步阻塞式的IO通信(BIO)效率过于…

【计算机网络】基本概念

基本概念 IP 地址端口号协议协议分层封装分用客户端服务器请求和响应两台主机之间的网络通信流程 IP 地址 概念&#xff1a;IP 地址主要是用于唯一标识网络主机、其他网络设备&#xff08;如路由器&#xff09;的网络地址。简单来说&#xff0c;IP地址用来唯一定位主机。格式&…

cartographer学习与使用

记录一下在配置和使用cartographer建图时遇到的各种问题吧。 我的数据 配置文件&#xff1a; my_rslidar.launch <launch> <param name"/use_sim_time" value"false" /> <!--启动建图节点--> <node name"cartographer_n…

【JACS】:用于稳定单原子分散的催化剂架构可对吸附到 Pt 原子、氧化 Pt 簇和 TiO2上金属 Pt 簇的 CO 进行特定位点光谱和反应性测量

摘要&#xff1a;氧化物负载的贵金属纳米粒子是广泛使用的工业催化剂。由于费用和稀有性&#xff0c;开发降低贵金属纳米颗粒尺寸并稳定分散物质的合成方案至关重要。负载型原子分散的单贵金属原子代表了最有效的金属利用几何结构&#xff0c;尽管由于合成均匀且稳定的单原子分…

关于MySQL数据库的学习3

目录 前言: 1.DQL&#xff08;数据查询语言): 1..1基本查询&#xff1a; 1.2条件查询&#xff1a; 1.3排序查询&#xff1a; 1.3.1使用ORDER BY子句对查询结果进行排序。 1.3.2可以按一个或多个列进行排序&#xff0c;并指定排序方向&#xff08;升序ASC或降序DESC&#…

(十八)【Jmeter】取样器(Sampler)之BeanShell 取样器

简述 操作路径如下: 作用:通过Beanshell脚本来编写自定义请求。配置:编写Beanshell脚本代码,实现请求逻辑。使用场景:在JMeter中利用Beanshell脚本语言的特性进行自定义请求。优点:可以利用Beanshell脚本语言的丰富功能。缺点:脚本语言的性能可能不如其他编译语言,且…

Day67:WEB攻防-Java安全JNDIRMILDAP五大不安全组件RCE执行不出网

知识点&#xff1a; 1、Java安全-RCE执行-5大类函数调用 2、Java安全-JNDI注入-RMI&LDAP&高版本 3、Java安全-不安全组件-Shiro&FastJson&JackJson&XStream&Log4j Java安全-RCE执行-5大类函数调用 Java中代码执行的类&#xff1a; GroovyRuntimeExecPr…

<el-tab>样式自定义——一个可以触类旁通的小例子

首先在网页的检查确定想要自定义的部分叫什么 例如&#xff1a; 我想修改的组件是el-tabs__header.is-top 的margin-bottom 则在相应vue文件的<style>里面增加这一属性 其中&#xff0c;::v-deep可以帮助覆盖子组件内部元素的样式。 ::v-deep .el-tabs__header.is-to…

Ubuntu18.04 中编译 TI 官方的ros驱动包中 autonomous_robotics_ros 包所存在的问题及解决方案

环境&#xff1a; 安装有 ROS 系统的 Ubuntu18.04 环境&#xff0c;并且已将 TI 官方的毫米波雷达 ROS 驱动下载到Ubuntu18.04系统中&#xff0c;如需获取此代码请点击此链接根据教程下载即可。 代码下载链接&#xff1a;TI IWR6843ISK ROS驱动程序搭建-CSDN博客 问题1&…

计算机设计大赛 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基…

基于Spark的气象数据处理与分析

文章目录 一、实验环境二、实验数据介绍三、数据获取1.观察数据获取方式2.数据爬取3.数据存储4.数据读取5.数据结构6.爬虫过程截图 四、数据分析1.计算各个城市过去24小时累积雨量2.计算各个城市当日平均气温3.计算各个城市当日平均湿度4.计算各个城市当日平均风速 五、数据可视…

WebRTC:真正了解 RTP 和 RTCP

介绍 近年来&#xff0c;通过互联网进行实时通信变得越来越流行&#xff0c;而 WebRTC 已成为通过网络实现实时通信的领先技术之一。WebRTC 使用多种协议&#xff0c;包括实时传输协议 (RTP) 和实时控制协议 (RTCP)。 RTP负责通过网络传输音频和视频数据&#xff0c;而RTCP负责…

Uibot (RPA设计软件)RPA基础培训-财务会计Web应用自动化(批量开票机器人)

Uibot (RPA设计软件&#xff09;Mage AI智能识别&#xff08;发票识别&#xff09;———机器人的小项目友友们可以参考小北的课前材料五博客~ (本博客中会有部分课程ppt截屏,如有侵权请及请及时与小北我取得联系~&#xff09; 紧接着小北的前两篇博客&#xff0c;友友们我们…

【全面了解自然语言处理三大特征提取器】RNN(LSTM)、transformer(注意力机制)、CNN

目录 一 、RNN1.RNN单个cell的结构2.RNN工作原理3.RNN优缺点 二、LSTM1.LSTM单个cell的结构2. LSTM工作原理 三、transformer1 Encoder&#xff08;1&#xff09;position encoding&#xff08;2&#xff09;multi-head-attention&#xff08;3&#xff09;add&norm 残差链…

PyCharm实现一个简单的注册登录Django项目

之前已经实现了一个简单的Django项目&#xff0c;今天我们j基于之前的项目来实现注册、登录以及登录成功之后跳转到StuList页面。 1、连接数据库 1.1 配置数据库信息&#xff1a; 首先在myweb的settings.py 文件中设置MySQL数据库连接信息&#xff1a; DATABASES {default…

在线疫苗预约小程序|基于微信小程序的在线疫苗预约小程序设计与实现(源码+数据库+文档)

在线疫苗预约小程序目录 目录 基于微信小程序的在线疫苗预约小程序设计与实现 一、前言 二、系统设计 三、系统功能设计 1、疫苗管理 2、疫苗订单管理 3、论坛管理 4、公告管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源…