RabbitMQ---消息确认和持久化

(一)消息确认

1.概念

生产者发送消息后,到达消费端会有以下情况:

1.消息处理成功

2.消息处理异常

   如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题

 所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制

 消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)

  自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高

  手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的

当我们设置为手动确认后,队列中的消息就分为了两个部分:

1.等待投递给消费者的消息

2.已经投递给消费者但是没有收到确认信号的消息

也就是Ready和Unacked

如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者

 

 

 

2.Spring Boot的三种策略和代码演示

Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)

1)AcknowledgeMode.NONE

 这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的

 2)AcknowledgeMode.AUTO(默认)

这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息

3)AckniwledgeMode.MANUAL

这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队

代码演示

配置文件代码

spring:
  rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
        acknowledge-mode: NONE

声明交换机和队列代码

@Component
public class Config {
    @Bean("ackExchange")
    public Exchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();
    }
    @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }
    @Bean
    public Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();
    }
}

生产者代码


@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");
        return "发送成功";
    }
}

然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认) 

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel){
        System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                +message.getMessageProperties().getDeliveryTag());
//        int num=3/0;     //模拟失败
        System.out.println("处理完成");
    }
}

此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在 

 

我们发现能够准确的接收消息

如果我们此时抛出异常呢? 

 

 

  我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端 

此时我们把确认策略改成AUTO

我们这里只需要改配置文件即可

截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环

 

我们再来演示下正确情况 

 

此时我们再来说一下手动确认策略

首先更改配置文件

  其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变 

  那我们先来讲一下手动确认方法

手动确认方法:

1.肯定确认

RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了 

我们来解释下这个方法的参数

deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认

multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值

2.否定确认

有两个方法

他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息

我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicNack(Tag,false,true);
            channel.basicReject(Tag,true);
        }
    }
}

出现异常时的现象 

因为我们是设置出现异常重新入队给下一个消费者,所以会循环

如果消息成功接收

 

(二)持久化

  我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?

1.RabbitMQ的持久化

  RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化

1)交换机持久化

  交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在

上面是我们把交换机设置为持久化的代码

其实我们不手动设置,默认交换机也是持久的,我们来看源码 

 

 

我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的

2)队列持久化

队列持久化也是在声明队列时调用方法来实现的

如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时

所以我们要设置消息持久化,那么一定要设置队列持久化否则没用

这是设置为持久化的代码

这是设置为非持久化的代码

那我们还是来看一眼源码

然后我们点进这个QueueBuilder看看

 

我们发现这个就是给名字赋值用的

再来看看durable中的setDurable

我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了

3)消息持久化

消息持久化

消息实现持久化就需要把消息的投递模式进行更改

设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失

消息持久化代码

@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        String s1="ack test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);
        return "发送成功";
    }
}

注意我们这里传的不是一个单独字符串了,而是一个消息 

  RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化 

问题:

 注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量

如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?

答案是错误的

1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失

2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可

第一个问题的解决方案我们下一篇博客在讲

  

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/953842.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】反向迭代器

反向迭代器 一.源码及框架分析二.反向迭代器实现代码1.ReverseIterator.h2.Vector.h3.List.h4.Test.cpp 一.源码及框架分析 SGI-STL30版本源代码,反向迭代器实现的核心源码在stl_iterator.h中,反向迭代器是一个适配器,各个容器中再适配出自己…

浅谈云计算02 | 云计算模式的演进

云计算计算模式的演进 一、云计算计算模式的起源追溯1.2 个人计算机与桌面计算 二、云计算计算模式的发展阶段2.1 效用计算的出现2.2 客户机/服务器模式2.3 集群计算2.4 服务计算2.5 分布式计算2.6 网格计算 三、云计算计算模式的成熟与多元化3.1 主流云计算服务模式的确立3.1.…

WEB 攻防-通用漏-XSS 跨站脚本攻击-反射型/存储型/DOMBEEF-XSS

XSS跨站脚本攻击技术(一) XSS的定义 XSS攻击,全称为跨站脚本攻击,是指攻击者通过在网页中插入恶意脚本代码,当用户浏览该网页时,恶意脚本会被执行,从而达到攻击目的的一种安全漏洞。这些恶意脚…

Vue3组件设计模式:高可复用性组件开发实战

Vue3组件设计模式:高可复用性组件开发实战 一、前言 在Vue3中,组件设计和开发是非常重要的,它直接影响到应用的可维护性和可复用性。本文将介绍如何利用Vue3组件设计模式来开发高可复用性的组件,让你的组件更加灵活和易于维护。 二、单一职责…

深度剖析RabbitMQ:从基础组件到管理页面详解

文章目录 一、简介二、Overview2.1 Overview->Totals2.2 Overview->Nodesbroker的属性2.3 Overview->Churn statistics2.4 Overview->Ports and contexts2.5 Overview->Export definitions2.6 Overview->Import definitions 三、Connections连接的属性 四、C…

Unity 语音转文字 Vosk 离线库

市场有很多语音库,这里介绍Vosk SDK 除了支持untiy外还有原生开发服务器等 目录 安装unity示例demo下载语音训练文件运行demo结尾一键三联 注意事项 有可能debug出来的文本是空的,(确保麦克风正常,且索引正确)分大…

播放音频文件同步音频文本

播放音频同步音频文本 对应单个文本高亮显示 使用audio音频文件对应音频文本资源 音频文本内容(Json) [{"end": 4875,"index": 0,"speaker": 0,"start": 30,"text": "70号二啊,","tex…

【React】新建React项目

目录 create-react-app基础运用React核心依赖React 核心思想:数据驱动React 采用 MVC体系package.jsonindex.html好书推荐 官方提供了快速构建React 项目的脚手架: create-react-app ,目前使用它安装默认是19版本,我们这里降为18…

mac homebrew配置使用

本文介绍mac上homebrew工具的安装、配置过程。homebrew功能类似于centos的yum,用于软件包的管理,使用上有命令的差异。 本次配置过程使用mac,看官方文档,在linux上也可以用,但我没试过,有兴趣的同学可以试试…

第一次作业三种方式安装mysql(Windows和linux下)作业

在Windows11上安装sever(服务)端和客户端 server端安装 打开官网MySQL 进入到主页 点击DOWMLOAD 进入下载界面 点击下方MySQL Community (GPL) Downloads 进入社区版mysql下载界面 点击 MySQL Community Server 进入server端下载 选择8.4.3LTS&…

基于Media+Unity的手部位姿三维位姿估计

使用mediapipe Unity 手部位姿三维位姿估计 参考文章 基于Mediapipe的姿势识别并同步到Unity人体模型中 MediapipeUnity3d实现虚拟手_unity mediapipe-CSDN博客 需求 我的需求就是快速、准确的跟踪手部位姿并实现一个三维显示。 主要思路 搭建mdeiapipe系统&#xff0c…

【C++】IO 流

文章目录 👉C 语言的输入与输出👈👉流是什么👈👉C IO 流👈C 标准 IO 流C 和 C 语言的输入格式问题C 的多次输入内置类型和自定义类型的转换日期的多次输入C 文件 IO 流文本文件和二进制文件的读写 &#x1…

成功案例分享 — 芯科科技助力涂鸦智能打造Matter over Thread模块,简化Matter设备开发

芯科科技(Silicon Labs)的愿景之一是让开发者每天都能够更轻松地开发无线物联网(IoT)。特别是在拥有相同愿景的合作伙伴的帮助下,我们每天都在取得进步。但是要想弥合知识水平和物联网开发之间的差距仍会面临一定的挑战…

MySQL主从部署(保姆版)

一、mysql 同步复制有关概述 一般数据库都是读取压力大于写数据压力,主从复制即为了实现数据库的负载均衡和读写分离。通过将Mysql的某一台主机的数据复制到其它主机(slaves)上,主服务器只负责写,而从服务器只负责读。…

浅谈计算机网络02 | SDN控制平面

计算机网络控制平面 一、现代计算机网络控制平面概述1.1 与数据平面、管理平面的关系1.2 控制平面的发展历程 二、控制平面的关键技术剖析2.1 网络层协议2.1.1 OSPF协议2.1.2 BGP协议 2.2 SDN控制平面技术2.2.1 SDN架构与原理2.2.2 OpenFlow协议2.2.3 SDN控制器 一、现代计算机…

基于网络爬虫技术的网络新闻分析【源码+文档+部署讲解】

目 录 1 绪论 1.1 论文研究背景与意义 1.2 论文研究内容 2 系统需求分析 2.1 系统需求概述 2.2 系统需求分析 2.2.1 系统功能要求 2.2.2 系统IPO图 2.2 系统非功能性需求分析 3系统概要设计 3.1 设计约束 3.1.1需求约束 3.1.2设计策略 3.1.3 技术实现 3.3 模块…

WeakAuras NES Script(lua)

WeakAuras NES Script 修星脚本字符串 脚本1:NES !WA:2!TMZFWXX1zDxVAs4siiRKiBN4eV(sTRKZ5Z6opYbhQQSoPtsxr(K8ENSJtS50(J3D7wV3UBF7E6hgmKOXdjKsgAvZFaPTtte0mD60XdCmmecDMKruyykDcplAZiGPfWtSsag6myGuOuq89EVDV9wPvKeGBM7U99EFVVVV33VFFB8Z2TJ8azYMlZj7Ur3QDR(…

网络安全学习81天(记录)

前言: 小迪安全,81天,开始了php代码审计 思路: 内容: #知识点: 1、审计漏洞-SQL 数据库注入挖掘 1、审计思路-正则搜索&功能追踪&辅助工具 3、审计类型-常规架构&MVC 架构&三方框架 #章…

1.14 互斥与同步

1.思维导图 2.有一个隧道&#xff0c;长1000m&#xff0c;有一辆高铁&#xff0c;每秒100米&#xff1b;有一辆快车&#xff0c;每秒50米&#xff1b;要求模拟这两列火车通过隧道的场景。 1>程序代码&#xff1a; #include <stdio.h> #include <string.h> #i…

研华 PCI-1751 驱动更新导LabVIEW致程序异常

问题描述&#xff1a; 某 LabVIEW 程序长期运行正常&#xff0c;但在使用研华 PCI-1751 数据采集卡运行一段时间后&#xff0c;程序开始出现不正常的行为。具体过程如下&#xff1a; 初始问题&#xff1a; 更换新的 PCI-1751 板卡后&#xff0c;驱动程序被更新&#xff0c;但程…