RabbitMQ 死信交换机的详述➕应用

🥳🥳Welcome 的Huihui's Code World ! !🥳🥳

接下来看看由辉辉所写的关于RabbitMQ的相关操作吧

目录

🥳🥳Welcome 的Huihui's Code World ! !🥳🥳

一.什么是死信交换机

二. 死信队列的应用场景

三.死信队列【TTL】

1.创建主交换机和主队列

2.创建死信交换机和死信队列

3.设置主队列的死信参数

4.将主队列绑定到主交换机

5.将死信队列绑定到死信交换机

6.消费者

7.测试

四. 死信队列【Reject】

1.配置消息确认模式

⭐⭐消息消费者如何通知 Rabbit 消息消费成功?

2.消费者


一.什么是死信交换机

这个在上篇博文已经讲到了,想看的可以点击一下

简单来说,就是生产者将消息投递到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。

⭐⭐注意:死信交换机本质上就是一个普通的交换机,只是因为队列设置了参数指定了死信交换机,这个普通的交换机才成为了死信的接收者

二. 死信队列的应用场景

  1. 错误处理:当消息无法被成功处理时,可以将其发送到死信队列,以便后续进行错误处理、日志记录或告警。
  2. 延迟消息:通过设置消息的过期时间,可以实现延迟消息的功能。当消息过期时,将被发送到死信队列,可以用于实现定时任务或延迟任务。
  3. 重试机制:当消息处理失败时,可以将消息发送到死信队列,并设置适当的重试策略。例如,可以使用指数退避算法对消息进行重试,以提高消息处理的成功率。
  4. 消息分析:通过监听死信队列,可以对无法被正常消费的消息进行分析和统计,以了解系统中存在的问题或异常情况

三.死信队列【TTL】

这里死信消息是设置了过期的时间【TTL】

1.创建主交换机和主队列

首先,需要创建一个主交换机和一个主队列。这些是正常消息传递的目标,当消息无法被正常消费时,它们将成为死信的来源。

// 创建一个名为queueA的队列
@Bean
public Queue queueA() {
    return new Queue("queueA");
}

// 创建一个名为ExchangeA的直接交换机
@Bean
public DirectExchange ExchangeA() {
    return new DirectExchange("ExchangeA");
}


2.创建死信交换机和死信队列

接下来,需要创建一个死信交换机和一个死信队列。这些将作为死信消息的目标。

// 创建一个名为queueB的队列
@Bean
public Queue queueB() {
    return new Queue("queueB");
}

// 创建一个名为ExchangeB的直接交换机
@Bean
public DirectExchange ExchangeB() {
    return new DirectExchange("ExchangeB");
}


3.设置主队列的死信参数

在创建主队列时,需要为其设置一些参数来定义死信的行为。具体而言,需要设置x-dead-letter-exchange参数为死信交换机的名称,以及x-dead-letter-routing-key参数为死信队列的路由键。

 // 创建一个名为queueA的队列
    @Bean
    public Queue queueA() {
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "BB");
        return new Queue("queueA",true,true,false,config);
    }

关于其中的参数:

  • name:队列的唯一标识符,用于在消息中间件中识别特定的队列。每个队列都有一个名称,发送方将消息发送到指定的队列,接收方从队列中获取消息进行处理。
  • durable:指定队列是否需要持久化存储。如果队列被标记为持久化,那么即使在消息中间件重启之后,队列中的消192息也不会丢失。这对于重要的消息和应用场景非常关键。
  • exclusive:用于指定队列是否为独占队列。当一个队列被标记为独占时【exclusive=true】,只有创建该队列的连接或通道可以访问或使用这个队列。
  • autoDelete:用于指定队列是否在没有任何消费者订阅或连接时自动删除。当一个队列中的"autodelete"属性为true时,如果没有消费者订阅该队列或者没有生产者向该队列发送消息,那么该队列将自动被删除。

4.将主队列绑定到主交换机

将主队列与主交换机进行绑定,以确保正常消息能够被正确路由到主队列。

// 将queueA与ExchangeA进行绑定,并设置路由键为"AA"
@Bean
public Binding bindingAA() {
    return BindingBuilder
            .bind(queueA()) // 将queueA与ExchangeA进行绑定
            .to(ExchangeA()) // 指定绑定的目标交换机为ExchangeA
            .with("AA"); // 设置路由键为"AA"
}

5.将死信队列绑定到死信交换机

将死信队列与死信交换机进行绑定,以确保死信消息能够被正确路由到死信队列。

// 将queueB与ExchangeB进行绑定,并设置路由键为"BB"
@Bean
public Binding bindingBB() {
    return BindingBuilder
            .bind(queueB()) // 将queueB与ExchangeB进行绑定
            .to(ExchangeB()) // 指定绑定的目标交换机为ExchangeB
            .with("BB"); // 设置路由键为"BB"
}

6.消费者

这个消费者监听的是queueB,当queueA的消息过期之后就会交到死信交换机中的queueB进行处理

package com.example.consumer.exchange;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = {"queueB"}) // 监听名为"queueB"的队列
public class DeadLetterReceive {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg) { // 参数为接收到的消息,类型为Map<String, Object>
        System.out.println("QB接到消息"+msg); // 打印接收到的消息
    }
}

7.测试

先疯狂的访问queueA队列的方法

现在可以看到queueA这里有六条消息

queueA的消息过期了之后,queueB消费者中就接收到消息了

现在把queueB的消费者停掉

现在再去疯狂的访问queueA的方法,此时就可以发现,queueA的消息都到queueB那里去了

四. 死信队列【Reject】

这里死信消息是设置成了拒绝,【requeue 参数为 false】,还是用的上面所创建的交换机以及队列...

1.配置消息确认模式

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

server:
  port: 9999
spring:
  rabbitmq:
    host: 192.168.101.129
    password: 123456
    port: 5672
    username: wh
    virtual-host: my_vhost
    listener:
      simple:
        acknowledge-mode: manual

消息接收确认

⭐⭐消息消费者如何通知 Rabbit 消息消费成功?

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息

  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失

  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

2.消费者

这个消费者是queueA的消费者

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = "queueA") // 监听名为"queueA"的队列
public class DeadLetterReceiveA {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QA接到消息"+msg); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费
    }
}

此时来访问一下queueA

queueA的消费者也已经将消息消费掉了

上面是queueA正常消费了,现在我直接将消息拒绝掉,并且不让它再重新入队了

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = "queueA") // 监听名为"queueA"的队列
public class DeadLetterReceiveA {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QA接到消息"+msg); // 打印接收到的消息
        channel.basicReject(tag,false); // 拒绝消息,不重新入队
        Thread.sleep(1000); // 等待1秒
    }
}

这时,再访问一下方法

刚开始这个消息会到queueA中,但是queueA会把它拒绝掉,拒绝之后消息就会到queueB 中

此时消息全部都放在了queueB中,但是还没有消费,这是因为我们已经将消息确认的模式变成了手动,所以需要手动确认之后,消息才会被消费

package com.example.consumer.exchange;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component // 声明这是一个Spring组件
@RabbitListener(queues = {"queueB"}) // 监听名为"queueB"的队列
public class DeadLetterReceive {

    @RabbitHandler // 当接收到消息时,调用此方法处理
    public void handler(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { // 参数为接收到的消息,类型为Map<String, Object>
        System.out.println("QB接到消息"+msg); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费
    }
}

好啦,今天的分享就到这了,希望能够帮到你呢!😊😊 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355012.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++1.0

思维导图 提示输入一个字符串&#xff0c;统计该字符中大写&#xff0c;小写字母个数&#xff0c;数字个数&#xff0c;空格个数以及特殊字符个数&#xff0c;要求使用C风格字符串完成 #include <iostream>using namespace std;int main() {cout << "请输入一…

路由懒加载(React和Vue)

1、为了提升性能&#xff0c;将懒加载的文件单独打包 在webpack.config.js配置打包成chunks // 打包到不同的chunks optimization: {// 将动态加载(懒加载)的文件(imort())单独打包splitChunks: {chunks: "all",},// 避免分割缓存失效runtimeChunk: {name: (entrypo…

数据结构(C语言版)代码实现(五)——双向循环链表的部分实现

目录 参考材料与格式 线性表的有关知识 头文件 库、宏定义、数据类型声明 线性表的双向链表存储结构 构造空链表 销毁链表 链表长度 按位查找 插入元素 删除元素 打印链表 完整头文件DuLinkList.h 测试函数&#xff08;主函数&#xff09; 测试结果 收获 参考材…

电磁兼容(EMC):产品如何做到可靠的防静电设计

工业产品所应用的电磁环境之恶劣。要想产品在如此恶劣的电磁环境下正常工作&#xff0c;需要具备强大的抗干扰能力方能胜任。其中以静电干扰最为常见且棘手。本文将手把手教你如何将工业产品做到可靠的防静电设计。 1 了解静电 你想要打倒对手&#xff0c;必须先深入地了解他…

Redis 学习笔记 2:Java 客户端

Redis 学习笔记 2&#xff1a;Java 客户端 常见的 Redis Java 客户端有三种&#xff1a; Jedis&#xff0c;优点是API 风格与 Redis 命令命名保持一致&#xff0c;容易上手&#xff0c;缺点是连接实例是线程不安全的&#xff0c;多线程场景需要用线程池来管理连接。Redisson&…

预训练语言模型transformer

预训练语言模型的学习方法有三类&#xff1a;自编码&#xff08;auto-encode, AE)、自回归&#xff08;auto regressive, AR&#xff09;&#xff0c;Encoder-Decoder结构。 决定PTM模型表现的真正原因主要有以下几点&#xff1a; 更高质量、更多数量的预训练数据增加模型容量…

双非本科准备秋招(8.2)——JVM1

第一天系统学习JVM&#xff01;今天学了JVM是什么&#xff0c;学习JVM的作用&#xff0c;运行时的数据区域&#xff08;重点&#xff09;&#xff0c;内存溢出。明天学GC。 运行时数据区域 整体认识 JDK1.7 JDK1.8 先写一下每个线程私有的三个数据区&#xff0c;分别是程序计…

Docker—入门及Centos7安装

1、Docker入门 1.1、Docker是什么&#xff1f; Docker是基于Go语言实现的云开源项目。 Docker的主要目标是“Build&#xff0c;Ship&#xff0c;and Run Any App,Anywhere”&#xff0c;也就是通过对应组件的封装、分发、部署、运行等生命周期的管理&#xff0c;使用户的APP&…

模板笔记 ST表 区间选数k

本题链接&#xff1a;用户登录 题目&#xff1a; 样例&#xff1a; 输入 5 3 1 1 2 2 3 1 2 3 3 1 5 输出 4 6 思路&#xff1a; . 根据题意&#xff0c;给出数组&#xff0c;以及多个区间&#xff0c;问这些区间中&#xff0c;最小值之和 和 最大值之和&#xff0c;…

CHS_02.2.3.2_1+进程互斥的软件实现方法

CHS_02.2.3.2_1进程互斥的软件实现方法 知识总览如果没有注意进程互斥&#xff1f;单标志法双标志先检查法双标志后检查法Peterson 算法 知识回顾 在这个小节中 我们会学习进程互斥的几种软件实现方法 知识总览 那我们会学习单标志法 双标志 先检查 双标志后检查和Peterson算法…

前端工程化基础(三):Webpack基础

Webpack和打包过程 学习webpack主要是为了了解目前前端开发的整体流程&#xff0c;实际开发中&#xff0c;我们并不需要去手动配置&#xff0c;因为框架的脚手架都已经帮助我们完成了配置 内置模块path 该模块在Webpack中会经常使用 从路径中获取信息 const path require(&qu…

前端Vue v-for 的使用

目录 ​编辑 简介 使用方式 基本使用 v-for"(item, index)中item和index作用 示例 迭代对象 示例 结果 前言-与正文无关 生活远不止眼前的苦劳与奔波&#xff0c;它还充满了无数值得我们去体验和珍惜的美好事物。在这个快节奏的世界中&#xff0c;我们往往容易陷入…

【Linux】第三十八站:信号处理

文章目录 一、信号处理二、再谈进程地址空间三、内核如何实现信号的捕捉四、sigaction 一、信号处理 我们知道&#xff0c;信号保存以后&#xff0c;会在合适的时候进行处理这个信号。 那么信号是如何被处理的&#xff1f;什么时候进行处理呢&#xff1f; 当我们的进程从内核…

精通Python第13篇—数据之光:Pyecharts旭日图的魔法舞台

文章目录 引言准备工作绘制基本旭日图调整颜色和样式添加交互功能定制标签和标签格式嵌套层级数据高级样式与自定义进阶主题&#xff1a;动态旭日图数据源扩展&#xff1a;外部JSON文件总结 引言 数据可视化在现代编程中扮演着重要的角色&#xff0c;而Pyecharts是Python中一个…

【深度学习每日小知识】Bias 偏差

计算机视觉是人工智能的一个分支&#xff0c;它使机器能够解释和分析视觉信息。然而&#xff0c;与任何人造技术一样&#xff0c;计算机视觉系统很容易受到训练数据产生的偏差的影响。计算机视觉中的偏见可能会导致不公平和歧视性的结果&#xff0c;从而使社会不平等长期存在。…

Python进阶(1) | 使用VScode写单元测试

Python进阶(1) | 单元测试 2024.01.28 VSCode: 1.85.1 Linux(ubuntu 22.04) 文章目录 Python进阶(1) | 单元测试1. 目的2. Python Profile3. 单元测试框架3.1 什么是单元测试3.2 选一个单元测试框架3.3 编写 Python 单元测试代码3.4 在 VSCode 里发现单元测试3.5 再写一个单元…

问题:github上不了,但是其他网页可以正常打开

问题&#xff1a; github上不了&#xff0c;但是其他网页可以正常打开&#xff0c;试了关闭防火墙&#xff0c;dns刷新&#xff0c;都没用后&#xff0c;参考以下文章成功打开Github 1.Github无法访问解决方法 2.github访问不了&#xff1f;详细解决方法 解决办法&#xff1a…

用Python编写的简单双人对战五子棋游戏

本文是使用python创建的一个基于tkinter库的GUI界面&#xff0c;用于实现五子棋游戏。编辑器使用的是spyder&#xff0c;该工具。既方便做数据分析&#xff0c;又可以做小工具开发&#xff0c; 首先&#xff0c;导入tkinter库&#xff1a;import tkinter as tk&#xff0c;这…

leetcode刷题日志-146LRU缓存

思路&#xff1a;使用hashmap储存key&#xff0c;vaule&#xff0c;使用双向链表以快速查到尾结点&#xff08;待逐出的节点&#xff09;&#xff0c;链表的题一定要在纸上画一下&#xff0c;不然连着连着就不知道连在哪里去了 class LRUCache {public class ListNode {int ke…

Java基础常见面试题总结(下)

常见的Exception有哪些&#xff1f; 常见的RuntimeException&#xff1a; ClassCastException //类型转换异常IndexOutOfBoundsException //数组越界异常NullPointerException //空指针ArrayStoreException //数组存储异常NumberFormatException //数字格式化异常ArithmeticE…