Spring AMQP-保证发送者消息的可靠性

1. 消息发送者的可靠性

保证消息的可靠性可以通过发送者重连发送者确认来实现


发送者重连

发送者重连机制就是在发送信息的时候如果连接不上mq不会立即结束,而是会在一定的时间间隔之类进行重新连接,连接的次数和时间都是由我们在配置文件中指定的,具体的就是通过retry属性来

spring: 
  rabbitmq: # rabbitmq配置 
    host: localhost # rabbitmq地址
    port: 5672 # rabbitmq端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
    template: # 消息发送相关配置
      retry: # 重试相关配置
        enabled: true # 启用重试
        max-attempts: 3 # 最大重试次数
        initial-interval: 1000 # 初始重试间隔
        multiplier: 2 # 重试间隔倍数
        max-interval: 10000 # 最大重试间隔

测试

将MQ关闭,然后随便写一个消息发送案例,就能够看见效果

package com.itheima.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
public class PublisherApplicationTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        String exchangeName = "fanout.hamll";
        Map map=new HashMap();
        map.put("name","hamll");
        map.put("age",18);
        map.put("sex","男");
        rabbitTemplate.convertAndSend("fanout.hamll.query2", map);
    }

}

发送者确认

在一般的情况下,消息很少会出现问题,但是还是有出现问题的可能性,比如:

1. 消息发送后无法路由键找不到相关队列

2. 绑定的交换机不存在

3. 消息发送出现异常

针对这一情况,MQ为我们提供了多种消息确认机制,比如:Publisher Return、Publisher Confirm

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

Publisher Return

着重于绑定的队列、交换机、路由是否成功,并且能够监听到相关的信息,比如交换机、路由、提示等

在使用的过程总需要一个全局的配置类

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * RabbitMQ 配置类,用于配置 RabbitTemplate 的回调函数。
 */
@Slf4j // 使用 Lombok 注解引入日志记录器
@AllArgsConstructor // 使用 Lombok 注解生成全参构造函数
@Configuration // 标记为 Spring 配置类
public class MqConfig {
    private final RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例

    /**
     * 初始化方法,在 Bean 创建后立即执行。
     * 设置 RabbitTemplate 的返回消息回调函数。
     */
    @PostConstruct // 标记为初始化方法
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            /**
             * 当消息被 broker 返回时触发的回调函数。
             * @param returned 返回的消息对象
             */
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,"); // 记录错误日志,表示触发了返回回调
                log.debug("exchange: {}", returned.getExchange()); // 记录交换机名称
                log.debug("routingKey: {}", returned.getRoutingKey()); // 记录路由键
                log.debug("message: {}", returned.getMessage()); // 记录消息内容
                log.debug("replyCode: {}", returned.getReplyCode()); // 记录回复代码
                log.debug("replyText: {}", returned.getReplyText()); // 记录回复文本
            }
        });
    }
}


Publisher Confirm

适用于更加复杂复杂的业务,MQ通过方法回调来告诉发送者消息是否发送成功,提供了两个方法的回调:

1. onFailure 在发送消息出现异常的时候会被捕获、并且接收了一个异常对象来返回异常信息。

2. onSuccess 在发送的时候如果成功被MQ接收到就会触发、onSuccess通常会接收两个参数作为参数(CorrelationData.Confirm )、Confirm有一个IsAck()方法来表示是否被确认:

  • true:表示消息被成功确认(ack),即消息已经被 RabbitMQ 正确接收并处理。
  • false:表示消息未被确认(nack),可能是因为 RabbitMQ 内部错误或其他原因导致消息无法被正确处理
package com.itheima.publisher;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 测试类,用于验证消息发布功能。
 */
@SpringBootTest
@Slf4j // 使用 Lombok 注解引入日志记录器
public class PublisherApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例

    /**
     * 测试方法,验证消息发布功能。
     * @throws InterruptedException 可能抛出的中断异常
     */
    @Test
    public void test() throws InterruptedException {
        // 创建 CorrelationData 对象,用于唯一标识消息
        CorrelationData correlationData = new CorrelationData();

        // 设置回调函数,处理消息发送的结果
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            /**
             * 消息发送失败时的回调函数。
             * @param ex 异常信息
             */
            @Override
            public void onFailure(Throwable ex) {
                // 记录消息发送失败的异常信息
                log.info("消息发送失败、异常!{}", ex.getMessage());
            }

            /**
             * 消息发送成功时的回调函数。
             * @param result 确认结果
             */
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 检查消息是否被确认
                if (result.isAck()) {
                    // 记录消息发送成功的日志
                    log.info("消息发送成功");
                } else {
                    // 记录消息发送失败的原因
                    log.info("消息发送失败!{}", result.getReason());
                }
            }
        });

        // 发送消息到指定的交换机和路由键
        rabbitTemplate.convertAndSend("pay.direct", "pay.success", "hello rabbitmq", correlationData);
    }
}

数据持久化

默认情况下MQ的数据都是临时数据,MQ故障重启后消息都会丢失,为了保证消息的可靠性就需要做持久化操作,MQ的持久化包括:

1. 交换机持久化

2. 队列持久化

3. 消息持久化


交换机持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,交换机通过注解创建一般都是默认的持久化

 @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "fanout.hamll.query1",durable = "true"),
            exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")
    ))

队列持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,队列一般都是默认不持久化,需要手动设置
 

@RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "fanout.hamll.query1",durable = "true"),
            exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")
    ))

消息持久化

消费者发送的消息默认情况下都是临时的消息,在MQ重启的时候消息会丢失。而开启持久化之后消息会被永久保存在MQ,即使MQ服务器挂了也不会丢失。

在发送消息的时候会由java的api将我们传入的object转换成Message对象,默认是不会帮我们持久化的,MQ重启消息就没了

想要持久化也很简单,就是我们自己来创建Message对象然后开启持久化

 //消息持久化
        Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8)) // 消息内容
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 消息持久化
                .build();// 构建消息
        rabbitTemplate.convertAndSend("pay.direct", "pay.su1ccess", message, correlationData);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/950933.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vs2022编译webrtc步骤

1、主要步骤说明 概述&#xff1a;基础环境必须有&#xff0c;比如git&#xff0c;Powershell这些&#xff0c;就不写到下面了。 1.1 安装vs2022 1、选择使用C的桌面开发 2、 Windows 10 SDK安装10.0.20348.0 3、勾选MFC及ATL这两项 4、 安装完VS2022后&#xff0c;必须安…

UnityWebGl:打包成webgl后UGUI不显示文字(中文)问题

是由于unity默认使用的是Arial,导致打包成webgl时中文不显示 解决方案&#xff1a; 可在电脑C盘下&#xff0c;路径为C:\Windows\Fonts 找个中文简体的字体文件放到unity里面&#xff0c;格式必须为. ttf

ffmpeg-avio实战:打开本地文件或者网络直播流dome

使用ffmpeg打开打开本地文件或者网络直播流的一个小dome。流程产靠ffmpeg4.x系列的解码流程-CSDN博客 #include <libavcodec/avcodec.h> #include <libavformat/avformat.h> #include <libavformat/avio.h> #include <libavutil/file.h> #include &l…

英伟达打造个人 AI 超级计算机:Project DIGITS 震撼登场

手掌大小的超级计算机 Nvidia 在 CES 2025 上为桌面用户推出了 一款大小和手掌差不多的超级计算机——Project DIGITS AI 超级计算机。虽然它的大小和一个手掌差不多&#xff0c;但性能方面可以说是强到惊人。 Project DIGITS Project DIGITS Project DIGITS 搭载全新的 GB10 G…

SAP SD学习笔记26 - 贩卖契约(框架协议)的概要,基本契约 - 数量契约

上一章讲了品目阶层&#xff08;产品层次结构&#xff09;&#xff0c;品揃Module(分类模块) 。 SAP SD学习笔记25 - 品目阶层&#xff08;产品层次结构&#xff09;、品揃Module&#xff08;分类模块&#xff09;-CSDN博客 本章继续讲SAP的知识&#xff1a;贩卖契约&#xff…

ESP32 IDF VScode出现头文件“无法打开 源 文件 ”,并有红色下划线警告

问题背景&#xff1a; ESP32 IDF VScode出现头文件“无法打开 源 文件 ”&#xff0c;并有红色下划线警告&#xff1a; 解决办法&#xff1a; 在工程里面的.vscode文件夹下&#xff0c;检查是否存在c_cpp_properties.json文件&#xff0c;如果没有可以手动创建添加。如图…

GaussDB事务和并发控制机制

目录 一、并发控制机制 二、MVCC实现方式 三、快照实现方式 四、GaussDB的并发控制机制 五、GaussDB基于事务提交时间戳的MVCC和快照机制 六、GaussDB分布式事务 七、总结与展望 事务是数据库的核心功能之一&#xff0c;其主要目的是保障数据库系统在并发处理、系统故障…

【YOLOv8老鼠检测】

YOLOv8老鼠检测 yolo老鼠检测数据集和模型YOLOv8老鼠检测步骤YOLOv8算法说明 yolo老鼠检测数据集和模型 数据集类别信息 train: E:\python_code\dataset_1\yolo_mouse_data_5000\train/images val: E:\python_code\dataset_1\yolo_mouse_data_5000\valid/images test: E:\pyt…

2025最新解决方案:新买的mac鼠标和这个触控板反向

solution1 &#xff1a;1.打开设置&#xff0c;搜索 触控 点击 自然滚动 ----->解决的是 触控板 但是还是解决不了鼠标反向的问题 solution1 ultra&#xff1a; 下载一个免费 且纯净的 软件 Scroll Reverser for macOS 这是给出的链接&#xff0c;非常简单&#xff0c;…

【C++习题】20. 两个数组的交集

题目&#xff1a;349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 链接&#x1f517;&#xff1a;349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 题目&#xff1a; 代码&#xff1a; class Solution { public:// 函数功能&#xff1a;求两个数组…

从零开始:使用VSCode搭建Python数据科学开发环境

引言 在数据科学领域&#xff0c;一个高效、稳定的开发环境是成功的关键。本文将详细介绍如何使用Visual Studio Code搭建一个完整的Python数据科学开发环境。通过本指南&#xff0c;您将学会&#xff1a; 安装和配置VSCode&#xff0c;包括基本设置和快捷键配置设置Python开…

JVM vs JDK vs JRE

JVM是Java虚拟机的缩写&#xff0c; 用于实现Java的一次编译&#xff0c;处处运行。 Java代码写成.class后&#xff0c;由本地的虚拟机运行。 JDK&#xff08;Java Development Kit&#xff09;是一个功能齐全的 Java 开发工具包&#xff0c;供开发者使用。 JDK包含了JRE。…

Redis Zset有序集合

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 Redis Zset有序集合 收录于专栏[redis] 本专栏旨在分享学习Redis的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 概述 普通命令 ZAD…

【漫话机器学习系列】040.降采样(downsampling)

降采样&#xff08;Downsampling&#xff09; 降采样&#xff08;Downsampling&#xff09; 是一种在数据处理中常见的技术&#xff0c;目的是通过减少数据的数量来简化模型、加快计算速度&#xff0c;或减少存储空间的需求。降采样的核心思想是从原始数据中选取代表性的样本&…

国内使用博查SearchAPI进行智能搜索,通过API获取搜索引擎的天气、日历、百科、手机、火车票等信息

在现代开发中&#xff0c;网络资源搜索是关键且常见的需求。博查SearchAPI作为国内领先的智能搜索解决方案&#xff0c;已服务超过2000家企业和16000名开发者&#xff0c;获得腾讯元器、字节扣子、阿里钉钉等官方推荐。该API提供近百亿网页内容及多样的生态合作内容&#xff0c…

前端学习DAY33(外边距的折叠)

垂直外边距的重叠 在网页中相邻的垂直方向的外边距&#xff0c;会发生外边距的重叠 兄弟元素 兄弟元素之间的相邻外边距会取&#xff08;绝对值&#xff09;最大值&#xff0c;而不是取和&#xff0c;谁大取谁 特殊情况&#xff1a;如果相邻的外边距一正一负&#xff0c;则取两…

【蓝桥杯选拔赛真题60】C++寻宝石 第十四届蓝桥杯青少年创意编程大赛 算法思维 C++编程选拔赛真题解

目录 C++寻宝石 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 五、运行结果 六、考点分析 七、推荐资料 C++寻宝石 第十四届蓝桥杯青少年创意编程大赛C++选拔赛真题 一、题目要求 1、编程实现 有N(1<N<100)个盒子排成一排,每个盒子都放…

自动化脚本本地可执行但是Jenkins上各种报错怎么解决

作者碎碎念&#xff1a; 测试环境 Jenkinsdockerpythonunittest&#xff0c; 测试问题&#xff1a;本人在写关于SAP4Me网站的自动化脚本时遇到一个问题 本地怎么都跑的通 但是一上Jenkins会出现各种各样的问题 因为在Jenkins里面脚本是放在docker环境里面跑的 所以环境的差异…

Nginx入门笔记

Nginx入门笔记 一、Nginx基本概念二、代理1、正向代理2、反向代理 三、准备工作1、CentOS 7安装nginx&#xff08;1&#xff09;. 安装必要的依赖&#xff08;2&#xff09;下载nginx&#xff08;3&#xff09;编译安装&#xff08;4&#xff09;编译并安装 Nginx(5)启动nginx …

优化提示词改善答疑机器人回答质量

1.通过优化提示词来调整大模型的回答 1.1使用场景 默认提示词无法满足业务要求。 回答的内容太简单/困难&#xff0c;输出内容/格式/语气达不到要求等 1.2llama-index 的提示词模版 1.2.1llama-index 的默认模板 from llama_index.llms.dashscope import DashScope from lla…