RabbitMQ 入门到应用 ( 六 ) 消息可靠性

7.RabbitMQ可靠性投递

为了保证信息不丢失, 可靠抵达,引入确认机制

在这里插入图片描述

消息从生产者传递到消费者的过程中, 不同的阶段使用不同的确认方式.

7.0.准备请求

一次性发送10 个消息 通过 new.exchange.direct交换机 接收消息, 使用 new.admin路由键new.admin队列 发送消息.

@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("/sender/test2")
public String test2(){
    String msg = "Mode Batch Confirm, Rabbit MQ";
    for (int i = 0; i < 10; i++) {
        String cd = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(cd);
        //   交换机, 路由键, 消息, 消息id
        rabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);
    }
    return msg;
}

7.1.消息发送到borker消息代理

从 Producer 生产者到 borker消息代理时, 有两种方式: Transaction(事务)模式Confirm(确认)模式

其中 Transaction(事务)模式是使用阻塞模式, 效率低, 官方说法是性能下降 270 倍. 所以通常使用的是 Confirm(确认)模式

7.1.1.配置启动代理borker确认

# 启动 代理borker 确认
##spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

spring.rabbitmq.publisher-confirms=true 是旧版的写法

spring.rabbitmq.publisher-confirm-type

NONE : 禁用发布确认模式,是默认值

CORRELATED : 发布消息成功到交换器后会触发回调方法

SIMPLE : 经测试有两种效果,

​ 其一效果和CORRELATED值一样会触发回调方法,

​ 其二在发布消息成功后使用rabbitTemplate调用waitForConfirms()或waitForConfirmsOrDie()方法等待broker节点返回发送结果,

​ 根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie()方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

7.1.2.增加 回调函数

建立配置类, 并为 RabbitTemplate增加回调函数 ConfirmCallback()

RabbitTemplate只允许设置一个callback方法,可以将RabbitTemplate的bean设为单例然后设置回调,但是这样有个缺点是使用RabbitTemplate的地方都会执行这个回调,如果直接在别的地方设置,会报如下错误

only one ConfirmCallback is supported by each RabbitTemplate

可以通过将RabbitTemplate的作用域设为@Scope,每次bean都是新的,来解决这个问题

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@Configuration
public class RabbitMessageConfig {

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 设置代理borker接收确认回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData   消息id
             * @param ack     消息是否成功收到, 投递到代理borker 返回 true , 失败返回 false
             * @param cause   失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ack = " + ack);
                if (!ack) {
                    System.out.println("发送消息到 [代理borker] 失败{ correlationData : " + correlationData + ", cause : " + cause + "}");
                }else{
                    System.out.println("发送消息到 [代理borker] 成功{ correlationData : " + correlationData + "}");
                }
            }
        });
        
        
        return rabbitTemplate;
    }
}

这样当 代理borker接收到消息时, 会自动调用方法

7.1.3.发请求测试

在 控制台输出

ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fc39288a-86e3-4122-9c68-50d084e483f5]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=09d765f9-8317-4c87-8e7f-4e66ef382bca]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=0d9a414c-b5ae-4cf5-a825-6c14f4030643]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=55c0fe51-18d3-4d4c-b221-114341a69ad3]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1d45b2e0-6bec-40d4-90ab-da52a6252d28]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fb4142a4-7f98-4517-bc74-b8ed0d048741]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=97ee4871-735d-4264-be62-bc7749804504]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=637e57a7-a440-45a9-bd90-87963e4108f9]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=e6260609-a118-4ad1-a864-a6777583826a]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b9e2db4f-c259-42dd-81a9-3dbfc33755d7]}

rabbit控制台, 收到10条消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wtFbj8g-1679806873943)(RabbitMQ.assets/image-20230323104854513.png)]

7.2.消息发送到queue队列

第二个环节就是从交换机Exchange路由到队列queue。

消息无法路由到正确的队列的原因有 1)、路由键错误 2)、队列不存在。
有两种方式处理无法路由的消息,一种是让服务器重发给生产者,一种是让交换机路由到另一个备份的交换机。

7.2.1.配置启动队列queue确认

# 启动 队列queue 确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列, 以异步发送优先回调我们这个returnconfirm
spring.rabbitmq.template.mandatory=true

7.2.2.增加 回调函数

依然在 RabbitTemplate增加回调函数 ReturnCallback()

这个回调只有在队列接收失败时才会被调用

同时注意要 加上 rabbitTemplate.setMandatory(true); 的设置

//将消息退回给 producer 。并执行回调函数returnedMessage。
rabbitTemplate.setMandatory(true);

// 设置Queue队列接收确认回调函数
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
    /**
             * 这个方法在投递到队列queue [失败] 时才会执行
             * @param message  投递失败的消息的详细信息
             * @param replyCode  回复的状态码
             * @param replyText  回复的文本内容
             * @param exchange   交换机信息
             * @param routingKey   路由键
             */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
        System.out.print("发送消息到 [队列Queue] 失败, 回发的消息:{ ");
        System.out.print("replyCode: "+replyCode);
        System.out.print(", replyText: "+replyText);
        System.out.print(", exchange: "+exchange);
        System.out.print(", routingKey: "+routingKey);
        System.out.println( " }");
    }
});

7.2.3.测试

7.2.3.1.修改请求

加入一个不存在请求

@RequestMapping("/sender/test2")
public String test2(){
    String msg = "Mode Batch Confirm, Rabbit MQ";
    for (int i = 0; i < 10; i++) {
        String cd = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(cd);
        if(i%2==0){
            //   交换机, 路由键, 消息, 消息id
            rabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);
        }else{
            //   写了一个不存在的路由键
            rabbitTemplate.convertAndSend("new.exchange.direct","1234567890", msg +":" + i , correlationData);
        }
    }
    return msg;
}

7.2.3.2.控制台输出

其中一部分, 消息成功发到broker代理, 但由于错误的路由键, 所以不能发到队列里

ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=7310b457-a6b2-4c76-b28b-d854fcccaefd]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=06478d12-9e5d-432b-ad8e-e244e543b0ee]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=24c8fed0-617b-4a00-8dec-98d71dace4a2]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=bc565b4f-c98c-4412-ad49-7fce3b233521]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fd87a581-931b-44e7-9bbf-d3879e26da20]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b45113af-1a04-4e2b-8d1b-b1bfd78e8404]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=3ef73907-f217-4676-aac8-cb3dfe30eea0]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1e4ce655-2f4c-43f3-8f2e-c7d2d7d46b68]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=5ebb5f26-6cb0-4952-a00b-de218b1a144a]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=9a930da2-12a0-4911-b219-2c5ff9407fac]}

7.3.消费者接收消息

如果 消息不被消费会一直存储在MQ里 , 直到被消费

但自动消费模式下, 如果 多条消息只有一条被消费, 其它的消息也被从队列中清除, 所以要改为手动消费

7.3.1.配置启动手动消费

# 将 消息消费确认 修改为手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

7.3.2.增加消息消费手动确认

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiverServiceImpl {

	// 监听队列
    @RabbitListener(queues = "new.admin")
    @RabbitHandler
    public void goodsProcess(Message message, Channel channel) {
        System.out.println("new.admin 队列 接收消息 : " + message);
        // DeliveryTag 是 channel 内顺序号 , 自增形式
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag = " + deliveryTag);

        try {
            if (deliveryTag%2 == 0) {
                // 确认消费, 第二个参数是 是否批量
                channel.basicAck(deliveryTag, false);
                System.out.println("签收了消息>>> = " + deliveryTag);
            }else {
                // long deliveryTag(序号), boolean multiple(是否批量签收), boolean requeue(是否重新入队)
                // 重新入队的消息会 , 重新从队列投递给消费者
                channel.basicNack(deliveryTag, false, true);
                System.out.println("没有签收到消息<<< = " + deliveryTag);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

这里使用 deliveryTag%2==0 来 模拟 有的消息 确认消费, 有的消息不能确认消费

通过 channel.basicNack(deliveryTag, false, true); 来处理 不能确认消费的消息

其中第三个参数, 代表是否重新进入队列, true为重新进入, 这样消息会重新发送到消费者这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/3447.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【 构造 HTTP 请求 】

文章目录一、通过 form 表单构造 HTTP 请求1.1 form 发送 GET 请求1.2 form 发送 POST 请求二、通过 ajax 构造 HTTP 请求2.1 ajax 发送 GET 请求2.2 ajax 发送POST 请求2.3 关于 ajax三、通过 Java socket 构造 HTTP 请求(了解)一、通过 form 表单构造 HTTP 请求 form (表单)…

Element Plus 实例详解(六)___Progress 进度条

Element Plus 实例详解&#xff08;六&#xff09;___Progress 进度条 本文目录&#xff1a; 一、前言 二、搭建Element Plus试用环境 1、搭建Vue3项目&#xff08;基于Vite Vue&#xff09; 2、安装Element Plus 三、Element Plus Progress 进度条功能试用 1、直线进度条…

【数据结构】栈和队列

&#x1f680;write in front&#x1f680; &#x1f4dc;所属专栏&#xff1a;初阶数据结构 &#x1f6f0;️博客主页&#xff1a;睿睿的博客主页 &#x1f6f0;️代码仓库&#xff1a;&#x1f389;VS2022_C语言仓库 &#x1f3a1;您的点赞、关注、收藏、评论&#xff0c;是对…

血细胞智能检测与计数软件(Python+YOLOv5深度学习模型+清新界面版)

摘要&#xff1a;血细胞智能检测与计数软件应用深度学习技术智能检测血细胞图像中红细胞、镰状细胞等不同形态细胞并可视化计数&#xff0c;以辅助医学细胞检测。本文详细介绍血细胞智能检测与计数软件&#xff0c;在介绍算法原理的同时&#xff0c;给出Python的实现代码以及Py…

HTTP协议详解(上)

目录 前言&#xff1a; 认识URL HTTP协议方法 通过Fiddler抓包 GET和POST之间典型区别 header详解 HTTP响应状态码 常见状态码解释 状态码分类 HTTP协议报文格式 小结&#xff1a; 前言&#xff1a; HTTP协议属于应用层协议&#xff0c;称为超文本传输协议&#xff…

C++中的string类【详细分析及模拟实现】

string类 目录string类一、stirng的介绍及使用1.为什么学习string类&#xff1f;2.标准库中的string类2.1 引入&#xff1a;编码2.2 basic_string3.string类的使用3.1 构造函数3.2 遍历string方式1&#xff1a;for循环方式2&#xff1a;范围for4.迭代器4.1 正向迭代器4.2反向迭…

STM-32:按键控制LED灯 程序详解

目录一、基本原理二、接线图三、程序思路3.1库函数3.2程序代码注&#xff1a;一、基本原理 左边是STM322里电路每一个端口均可以配置的电路部分&#xff0c;右边部分是外接设备 电路图。 配置为 上拉输入模式的意思就是&#xff0c;VDD开关闭合&#xff0c;VSS开关断开。 浮空…

互联网数据挖掘与分析讲解

一、定义 数据挖掘&#xff08;英语&#xff1a;Data mining&#xff09;&#xff0c;又译为资料探勘、数据采矿。它是数据库知识发现&#xff08;英语&#xff1a;Knowledge-Discovery in Databases&#xff0c;简称&#xff1a;KDD)中的一个步骤。数据挖掘一般是指从大量的数…

多线程(四):线程安全

在开始讲解线程安全之前我们先来回顾一下我们学了那些东西了&#xff1a; 1. 线程和进程的认识 2. Thread 类的基本用法 3. 简单认识线程状态 4. 初见线程安全 上一章结束时看了一眼线程安全问题&#xff0c;本章将针对这个重点讲解。 一个代码在单线程中能够安全执行&am…

204. 计数质数 (埃式筛法详解)——【Leetcode每日一题】

素数最朴素判断思路&#xff1a;&#xff08;一般会超时&#xff09; 对正整数 n&#xff0c;如果用 2 到 n\sqrt{n}n​ 之间的所有整数去除&#xff0c;均无法整除&#xff0c;则 n 为素数又称为质数。 为什么到n\sqrt{n}n​ 就可以了&#xff0c;因为因数如果存在一定是成对…

【三】一起算法---栈:STL stack、手写栈、单调栈

纸上得来终觉浅&#xff0c;绝知此事要躬行。大家好&#xff01;我是霜淮子&#xff0c;欢迎订阅我的专栏《算法系列》。 学习经典算法和经典代码&#xff0c;建立算法思维&#xff1b;大量编码让代码成为我们大脑的一部分。 ⭐️已更系列 1、基础数据结构 1.1、链表➡传送门 1…

使用Node.js+Koa 从零开始写个人博客系统——后端部分(一)

使用Node.jsKoa 从零开始写个人博客系统系列 提示&#xff1a;在此文章中你可以学习到的内容如下&#xff1a; 1 如何使用Koa快速搭建项目 2 对Koa的核心组件Koa-Route的简单使用 3 3层架构思想 4 nodejs的ORM框架——sequelize的使用 5 sequelize-auto的使用 6 简单的增删查改…

【蓝桥杯嵌入式】第十三届蓝桥杯嵌入式国赛客观题以及详细题解

题1 概念题。 USRAT&#xff1a;异步串口通信&#xff0c;常用于数据传输&#xff1b;SW-DP&#xff1a;SWD 的全称应该是 The Serial Wire Debug Port (SW-DP),也就是串行调试端口&#xff0c;是 >ARM 目前支持的两种调试端口之一&#xff1b;JTAG-DP&#xff1a;另一个调试…

git基本用法教程(fork软件+git命令)

git基本用法教程1. git commit2. git branch3. git checkout4. git merge5. git rebase6. 在提交树中移动7. 撤销变更8. 整理提交记录9. 提交的技巧10. git clone11. git push12. git pull13. git fetch14. git flow15. git stash16. fork的使用当然除了环境和demo的运行和改写…

chartgpt 告诉我的,loss 函数的各种知识

一、libtorch中常见的损失函数及其使用场景的总结1. CrossEntropyLoss:CrossEntropyLoss&#xff08;交叉熵损失&#xff09;主要用于分类任务。它适用于多分类问题&#xff0c;其中每个样本只属于一个类别&#xff08;互斥&#xff09;。该损失函数将预测概率与真实标签的one-…

应届生投腾讯,被面试官问了8个和 ThreadLocal 相关的问题。

问&#xff1a;谈一谈ThreadLocal的结构。 ThreadLocal内部维护了一个ThreadLocalMap静态内部类&#xff0c;ThreadLocalMap中又维护了一个Entry静态内部类&#xff0c;和Entry数组。 Entry类继承弱引用类WeakReference&#xff0c;Entry类有一个有参构造函数&#xff0c;参数…

【数据结构】用队列实现栈

&#x1f4af;&#x1f4af;&#x1f4af; 本篇总结利用队列如何实现栈的相关操作&#xff0c;不难观察&#xff0c;栈和队列是可以相互转化的&#xff0c;需要好好总结它们的特性&#xff0c;构造出一个恰当的结构来实现即可&#xff0c;所以本篇难点不在代码思维&#xff0c;…

大数据应用——Hadoop运行模式(伪分布式运行)

4.2 伪分布式运行模式4.2.1 启动HDFS并运行MapReduce程序1. 分析 &#xff08;1&#xff09;配置集群&#xff08;2&#xff09;启动、测试集群增、删、查没有改&#xff08;多台机子麻烦&#xff09;&#xff08;3&#xff09;执行WordCount案例2. 执行步骤&#xff08;1&…

前端vue实现导出pdf文件报告组件

大屏项目有一个需求&#xff0c;需要对展示的内容进行文件导出&#xff0c;但是目前后台没有相关的逻辑&#xff0c;所以只能前端硬上&#xff0c;在参考了其他许多的逻辑之后&#xff0c;目前我自己这边做了一套比较笨的组件&#xff0c;通过拼接标签这种方法来实现对你想需要…

队列-我的基础算法刷题之路(六)

本篇博客旨在整理记录自已对队列的一些总结&#xff0c;以及刷题的解题思路&#xff0c;同时希望可给小伙伴一些帮助。本人也是算法小白&#xff0c;水平有限&#xff0c;如果文章中有什么错误之处&#xff0c;希望小伙伴们可以在评论区指出来&#xff0c;共勉 &#x1f4aa;。…