RabbitMQ(六)消息的持久化

目录

    • 一、简介
      • 1.1 定义
      • 1.2 消息丢失的场景
    • 二、交换机的持久化
      • 方式一:直接 new
      • 方式二:channel.exchangeDeclare()
      • 方式三:ExchangeBuilder【推荐】
    • 三、队列的持久化
      • 方式一:直接 new
      • 方式二:channel.queueDeclare()
      • 方式三:QueueBuilder【推荐】
    • 四、消息的持久化
      • 方式一:channel.basicPublish()
      • 方式二:rabbitTemplate.convertAndSend()【推荐】
    • 五、持久化问题

在这里插入图片描述

一、简介

1.1 定义

  • 持久化: 是为了提高 RabbitMQ 消息的可靠性,防止在异常情况(重启宕机)下数据的丢失。
  • RabbitMQ 持久化分为三部分:交换机的持久化队列的持久化消息的持久化

1.2 消息丢失的场景

出现消息丢失的场景有:

  • 生产者发送消息丢失:发送消息到 RabbitMQ Server 异常。 可能因为网络问题造成 RabbitMQ 服务端无法收到消息。——解决方案:ConfirmCallback
  • 生产者发送消息丢失:消息无法路由到指定队列。 可能由于代码层面或配置层面错误导致消息路由到指定队列失败。——解决方案:ReturnCallback
  • RabbitMQ Server 存储消息丢失:消息未完全持久化到磁盘。 可能因为 RabbitMQ 宕机导致消息未完全持久化,或队列丢失从而导致消息丢失等持久化问题。——解决方案:集群部署,实现高可用
  • 消费者消费消息丢失:消费者消费消息异常。 可能在消费者接收消息后,还没来得及消费消息,消费者就发生宕机、故障等问题。——解决方案:消费端手动确认消息

二、交换机的持久化

方式一:直接 new

直接实例化对应的 Exchange 实现类即可,默认:持久化的、非自动删除的

@Bean
public DirectExchange directExchange() {
    return new DirectExchange(directExchange);
}

Exchange 源码:

package org.springframework.amqp.core;

public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    ...
    public AbstractExchange(String name) {
        // 默认是持久化的、非自动删除的。
        this(name, true, false);
    }
    ...
}

方式二:channel.exchangeDeclare()

在 RabbitMQ 的原生 API 中,例如:Java 客户端 API,声明持久化交换机时,需要将 durable 参数设置为 true

// 声明交换机:(交换机名称,交换机类型,持久化)
channel.exchangeDeclare("myExchange", "direct", true);

方式三:ExchangeBuilder【推荐】

在 Spring AMQP 中,我们可以使用 ExchangeBuilder 来创建持久化的交换机:

import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;

Exchange exchange = ExchangeBuilder.directExchange("myExchange")
    							.durable(true) // 交换机持久化
        						.build();

三、队列的持久化

方式一:直接 new

直接实例化 Queue 类即可,默认:持久化的、非独占的、非自动删除的

@Bean
public Queue myQueue() {
    return new Queue("myQueue");
}

Queue 源码:

package org.springframework.amqp.core;

public Queue(String name) {
    // 默认是持久化的、非独占的、非自动删除的。
   this(name, true, false, false);
}

方式二:channel.queueDeclare()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化队列时,需要将 durable 参数设置为 true

// 声明队列:(队列名称,持久化,非独占,非自动删除,可选队列参数)
channel.queueDeclare("myQueue", true, false, false, null);

方式三:QueueBuilder【推荐】

在 Spring AMQP 中,可以使用 QueueBuilder 来创建持久化的队列:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;

Queue queue = QueueBuilder.durable("myQueue").build();

四、消息的持久化

即使队列时持久化的,发送到队列中的消息默认情况下 可能仍然是非持久化的。要使消息持久化,需要在发送消息时设置消息属性为持久化。

方式一:channel.basicPublish()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化消息时,需要将 deliveryMode 参数设置为 2

import com.rabbitmq.client.AMQP;

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // 这里2对应MessageDeliveryMode.PERSISTENT
    .build();

// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", props. messageBytes);

或者,我们可以直接使用 MessageProperties.PERSISTENT_TEXT_PLAIN 来进行指定消息的持久化:

import com.rabbitmq.client.MessageProperties;

// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

方式二:rabbitTemplate.convertAndSend()【推荐】

在 Spring AMQP 中,可以使用 rabbitTemplate.convertAndSend() 来创建持久化的消息:

rabbitTemplate.convertAndSend("myQueue", body, message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
});

通过上述步骤,消息会被持久化存储,只要队列也被正确配置为持久化,即使 RabbitMQ 服务器重启,消息也将被保留下来。

补充:

然而,即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘


五、持久化问题

思考:将交换机、队列、消息都设置持久化之后就能保证数据不会丢失吗?

答:当然不能,需要从多方面考虑:

  • 场景1: 如果消费者订阅队列的时候将 autoAck(自动确认)设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那消息也会丢失。

    解决方案: 将消费者的消息确认机制改为手动确认,带处理完消息之后,手动删除消息。

  • 场景2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 服务器中的消息还没来得及持久化,即没有将数据写入磁盘时,如果服务器发生异常,消息也会丢失。

    解决方案: 可以 通过 RabbitMQ 集群的方式实现消息中间件的高可用

因此,还需要做好生产者和消费者的 消息确认机制 以及通过 集群 的方式来实现 RabbitMQ 的高可用。

整理完毕,完结撒花~ 🌻





参考地址:

1.RabbitMQ保证消息可靠性,https://www.cnblogs.com/auguse/articles/17712620.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/309284.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

牛客网-JAVA(错题集)-1

1 Java的抽象类和接口不可以进行实例化 2 知识点: 1、不论如何 finally里面的代码是一定会执行的 2、finally里面的代码块比return早执行 3、多个return是按顺序执行的,只执行一次 public abstract class Test {public static void main(String[] ar…

深度解析-Java语言的未来

深度解析-Java语言的未来,文末有我耗时一个月,问遍了身边的大佬,零基础自学Java的路线,适用程序员入门&进阶,Java学习路线,2024新版最新版。 文章目录 Q1 - 能否自我介绍下? Q2 - Java语…

diffusers scheduler add_noise前向加噪可视化

参考: http://www.bryh.cn/a/604194.html 1、diffusers Pipeline使用 import torch from diffusers import PixArtAlphaPipelinepipe = PixArtAlphaPipeline.from_pretrained("PixArt-alpha/PixArt-XL-2-1024-MS", torch_dtype=torch.float16) pipe = pipe.to(cud…

嵌入式-Stm32-江科大基于寄存器点亮LED灯

文章目录 前言:一:搭建基于寄存器控制LED的工程二:用江科大的STM32板子实现基于寄存器点亮LED灯三:用非江科大stm32板子实现基于寄存器点亮LED灯道友:一星陨落,黯淡不了星空灿烂;一花凋零&#…

认识Linux指令之 “find grep” 命令

01.find指令: -name Linux下find命令在目录结构中搜索文件,并执行指定的操作。 Linux下find命令提供了相当多的查找条件,功能很强大。由于find具有强大的功能,所以它的选项也很多,其中大部分选项都值得我们花时间来…

Set和Map

一、Set的介绍 1.1、Set相关文档介绍 cplusplus.com/reference/set/set/?kwset 1. set是按照一定次序存储元素的容器 2. 在set中,元素的value也标识它(value就是key,类型为T),并且每个value必须是唯一的。 set中的元素不能在容器中修改…

iOS14 Widget 小组件调研

桌面小组件是iOS14推出的一种新的桌面内容展现形式。 根据苹果的统计数据,“一般用户每天进入主屏幕的次数超过90次”,如果有一个我们应用的小组件在桌面,每天都有超过90次曝光在用户眼前的机会,这绝对是一个顶级的流量入口。 “…

世邦通信 SPON IP网络对讲广播系统uploadjson.php任意文件上传漏洞

产品介绍 世邦通信SPON IP网络对讲广播系统采用领先的IPAudio™技术,将音频信号以数据包形式在局域网和广域网上进行传送,是一套纯数字传输系统。 漏洞描述 spon IP网络对讲广播系统uploadjson.php存在任意文件上传漏洞,攻击者可以通过构造特殊请求包上传恶意后门…

【NLP】多标签分类【上】

简介 《【NLP】多标签分类》主要介绍利用三种机器学习方法和一种序列生成方法来解决多标签分类问题(包含实验与对应代码)。共分为上下两篇,上篇聚焦三种机器学习方法,分别是:Binary Relevance (BR)、Classifier Chain…

python 爬虫 request get或post传参

爬虫传参 import requestsurl http://www.xxx# get 或 post 传参数据 data {"pageNo": 1652,"pageSize": 10, }headers {Cookie: ,Host: ,Origin: ,Referer: ,User-Agent: , }# get 请求 # res requests.get( # url, # paramsdata, # hea…

PINN物理信息网络 | 泊松方程的物理信息神经网络PINN解法

基本介绍 泊松方程是一种常见的偏微分方程,它在物理学和工程学中具有广泛的应用。它描述了在某个区域内的标量场的分布与该场在该区域边界上的值之间的关系。 物理信息神经网络(PINN)是一种结合了物理定律和神经网络的方法,用于…

机器学习---流形学习

1. 流形学习 作为机器学习研究的热点问题之一,流形学习是要从高维数据集中发现内在的低维流形,并基于低 维流形来实现随后的各种机器学习任务,如模式识别,聚类分析。与欧氏空间不同,流形学习主要 处理的是非欧空间里…

spark基础--学习笔记

1 spark 介绍 1.1 spark概念 Apache Spark是专为大规模数据处理而设计的快速通用的分布式计算引擎,是开源的类Hadoop MapReduce的通用分布式计算框架。和MapReduce一样,都是完成大规模数据的计算处理。 简而言之,Spark 借鉴了 MapReduce思…

Mysql是怎么运行的(上)

文章目录 Mysql是怎么运行的Mysql处理一条语句的流程连接管理解析与优化存储引擎 基本配置配置文件系统变量状态变量字符集四种重要的字符集MySQL中的utf8和utf8mb4各级别的字符集和比较规则MySQL中字符集的转换排序规则产生的不同的排序结果 InnoDB存储引擎介绍COMPACT行格式介…

06.构建大型语言模型步骤

在本章中,我们为理解LLMs奠定了基础。在本书的其余部分,我们将从头开始编写一个代码。我们将以 GPT 背后的基本思想为蓝图,分三个阶段解决这个问题,如图 1.9 所示。 图 1.9 本书中介绍的构建LLMs阶段包括实现LLM架构和数据准备过程、预训练以创建基础模型,以及微调基础模…

HarmonyOS应用开发学习笔记 UI布局学习 List(){}创建列表 列表形式显示 简单使用

List 创建列表 列表形式显示 官方文档:创建列表(List) 关键代码 List(){} 列表控件ListItem() {} 子元素 例如 1、简单使用代码 List(){} List() {ListItem() {Row() {Image($r(app.media.iconE)).width(40).height(40).margin(10)Tex…

继承详细说明

概述 Java中提供一个关键字extends,用这个关键字,我们可以让一个类和另一个类建立起父子关系。 例如:public class Student extends People {} Student称为子类(派生类),People称为父类(基类 或超类)。 …

分布式系统架构设计之分布式消息队列 VS 分布式事务

1、分布式事务的挑战 在分布式系统中,事务的处理变得尤为复杂,传统的数据库事务(ACID)在单一数据库中可以确保数据的完整性和一致性,但在多个分布式节点间保证事务的原子性、一致性、隔离性和持久性变得极具挑战性。 …

【Go】excelize库实现excel导入导出封装(三),基于excel模板导出excel

前言 大家好,这里是符华~ 关于excelize库实现excel导入导出封装,我已经写了两篇了,我想要的功能基本已经实现了,现在还差一个模板导出,这篇文章就来讲讲如何实现用模板导出excel。 前两篇: 【Go】excel…

作业:通过两台linux主机配置ssh实现互相免密登陆

做题步骤: 一.开启两个Linux主机,并且用ssh连接,要能够ping通 我这里是server:192.168.81.129 client:192.168.81.130 举例 步骤: 1.安装服务软件 2.运行软件程序 3.根据自定配置提供对应的服务/etc/chr…