RabbitMQ5-死信队列

目录

死信的概念

死信的来源

死信实战

死信之TTl

死信之最大长度

死信之消息被拒


死信的概念

死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息 TTL 过期

    TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 mq 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue=false

死信实战

死信之TTl

消费者 C1:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //设置消息的 TTL 时间 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }

    }
}

消费者 C2:

package com.qcby.sixin;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

启动 C1 ,之后关闭消费者,模拟其接收不到消息,再启动 Producer:

启动 C2 消费者,它消费死信队列里面的消息:

死信之最大长度

消费者 C1:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //设置正常队列的长度限制
        params.put("x-max-length",6);
        
        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

消费者 C2:

package com.qcby.sixin.two;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

死信之消息被拒

拒收消息 "info5"

消费者 C1:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列 - 消费者1
 *
 */
public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机,参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key,参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");


        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        //开启手动应答
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {
        });
    }

}

生产者:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

消费者 C2:

package com.qcby.sixin.three;

import com.qcby.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/959838.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

git常用命令学习

目录 文章目录 目录第一章 git简介1.Git 与SVN2.Git 工作区、暂存区和版本库 第二章 git常用命令学习1.ssh设置2.设置用户信息3.常用命令设置1.初始化本地仓库init2.克隆clone3.查看状态 git status4.添加add命令5.添加评论6.分支操作1.创建分支2.查看分支3.切换分支4.删除分支…

私有包上传maven私有仓库nexus-2.9.2

一、上传 二、获取相应文件 三、最后修改自己的pom文件

汽车定速巡航

配备定速巡航功能的车型&#xff0c;一般在方向盘附近设有4~6个按键&#xff08;可能共用键位&#xff09;。 要设置定速巡航&#xff0c;不仅需要方向盘上的按键&#xff0c;还要油门配合。 设置的一般流程&#xff1a; 开关&#xff1a;类似步枪上的“保险”&#xff0c;按…

安宝特方案 | AR在供应链管理中的应用:提升效率与透明度

随着全球化的不断深入和市场需求的快速变化&#xff0c;企业对供应链管理的要求也日益提高。如何在复杂的供应链环境中提升效率、降低成本&#xff0c;并确保信息的透明度&#xff0c;成为了各大行业亟待解决的问题。而增强现实&#xff08;AR&#xff09;技术&#xff0c;特别…

JavaScript(8)-函数

一.什么是函数&#xff1a;执行特定任务的代码块 讲js中需要的公共部分抽取并封装&#xff0c;谁用谁调用&#xff0c;代码复用。 先声明&#xff0c;后调用 使用function函数名需要调用的内容 使用&#xff1a;函数名&#xff08;&#xff09; 二.使用方式 声明&a…

HTML<label>标签

例子 三个带标签的单选按钮&#xff1a; <form action"/action_page.php"> <input type"radio" id"html" name"fav_language" value"HTML"> <label for"html">HTML</label><br&…

4.flask-SQLAlchemy,表Model定义、增删查改操作

介绍 SQLAlchemy是对数据库的一个抽象 开发者不用直接与SQL语句打交道 Python对象来操作数据库 SQLAlchemy是一个关系型数据库 安装 flask中SQLAlchemy的配置 from flask import Flask from demo.user_oper import userdef create_app():app Flask(__name__)# 使用sessi…

【C++初阶】第11课—vector

文章目录 1. 认识vector2. vector的遍历3. vector的构造4. vector常用的接口5. vector的容量6. vector的元素访问7. vector的修改8. vector<vector\<int\>>的使用9. vector的使用10. 模拟实现vector11. 迭代器失效11.1 insert插入数据内部迭代器失效11.2 insert插入…

GPT 结束语设计 以nanogpt为例

GPT 结束语设计 以nanogpt为例 目录 GPT 结束语设计 以nanogpt为例 1、简述 2、分词设计 3、结束语断点 1、简述 在手搓gpt的时候&#xff0c;可能会遇到一些性能问题&#xff0c;即关于是否需要全部输出或者怎么节约资源。 在输出语句被max_new_tokens 限制&#xff0c…

PTMD2.0-疾病相关的翻译后修饰数据库

翻译后修饰&#xff08;PTMs&#xff0c;post-translational modifications&#xff09;通过调节蛋白质功能参与了几乎所有的生物学过程&#xff0c;而 PTMs 的异常状态常常与人类疾病相关。在此&#xff0c;PTMD 2.0展示与疾病相关的 PTMs 综合数据库&#xff0c;其中包含 93 …

ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提

1、问题描述 在ArcGIS License Administrator中&#xff0c;手动点击“启动”无响应&#xff1b;且在计算机管理-服务中&#xff0c;无ArcGIS License 或者License的启动、停止、禁止等均为灰色&#xff0c;无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…

spring框架之IoC学习与梳理(1)

目录 一、spring-IoC的基本解释。 二、spring-IoC的简单demo&#xff08;案例&#xff09;。 &#xff08;1&#xff09;maven-repository官网中找依赖坐标。 &#xff08;2&#xff09;.pom文件中通过标签引入。 &#xff08;3&#xff09;使用lombok帮助快速开发。 &#xff…

系统架构设计师教材:信息系统及信息安全

信息系统 信息系统的5个基本功能&#xff1a;输入、存储、处理、输出和控制。信息系统的生命周期分为4个阶段&#xff0c;即产生阶段、开发阶段、运行阶段和消亡阶段。 信息系统建设原则 1. 高层管理人员介入原则&#xff1a;只有高层管理人员才能知道企业究竟需要什么样的信…

基于STM32单片机设计的宠物喂食监控系统

1. 项目开发背景 随着宠物数量的增加&#xff0c;尤其是人们对宠物的养护需求日益增多&#xff0c;传统的人工喂养和管理方式难以满足现代养宠生活的需求。人们越来越希望通过智能化手段提高宠物养护的质量和效率&#xff0c;特别是对于宠物喂食、饮水、温湿度控制等方面的智能…

帕金森患者:科学锻炼,提升生活质量

帕金森病&#xff0c;作为一种常见的神经系统退行性疾病&#xff0c;给患者的日常生活带来了诸多挑战。然而&#xff0c;通过科学的锻炼&#xff0c;患者不仅可以在一定程度上缓解症状&#xff0c;还能提升生活质量。本文将详细介绍帕金森患者应该进行的几种关键锻炼&#xff0…

GA-CNN-LSTM-Attention、CNN-LSTM-Attention、GA-CNN-LSTM、CNN-LSTM四模型多变量时序预测一键对比

GA-CNN-LSTM-Attention、CNN-LSTM-Attention、GA-CNN-LSTM、CNN-LSTM四模型多变量时序预测一键对比 目录 GA-CNN-LSTM-Attention、CNN-LSTM-Attention、GA-CNN-LSTM、CNN-LSTM四模型多变量时序预测一键对比预测效果基本介绍程序设计参考资料 预测效果 基本介绍 基于GA-CNN-LST…

redis 实践与扩展

文章目录 前言一、springboot整合redis1、jedis1.1、单点模式1.2、哨兵模式1.3 集群模式1.4、关于jedis线程不安全的验证 2、lettuce(推荐)2.1、单点模式2.2、哨兵模式2.3、集群模式 3、RedisTemplate config 二、redis常用知识点1、缓存数据一致性2、缓存雪崩3、缓存击穿4、缓…

積分方程與簡單的泛函分析7.希爾伯特-施密特定理

1)def函數叫作"由核生成的(有源的)" 定义: 设 是定义在区域上的核函数。 对于函数,若存在函数使得, 则称函数是“由核生成的(有源的)”。 这里的直观理解是: 函数的“来源”可以通过核函数 与另一个函数的积分运算得到。 在积分方程理论中,这种表述常…

RH850F1KM-S4-100Pin_ R7F7016453AFP MCAL PWM 测试

1、PortPin配置 2、PwmChannel配置 4、PwmComplexDriverInit 配置注意事项 此参数指定通道是否初始化并由复杂驱动程序使用。 True:通道将被初始化并由复杂驱动程序使用。False:通道不会被初始化并被复杂驱动程序使用。

Spring Boot Starter介绍

前言 大概10来年以前&#xff0c;当时springboot刚刚出现并没有流行&#xff0c;当时的Java开发者们开发Web应用主要是使用spring整合springmvc或者struts、iBatis、hibernate等开发框架来进行开发。项目里一般有许多xml文件配置&#xff0c;其中配置了很多项目中需要用到的Be…