【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

目录

前言:

一.消息确认机制 

• ⾃动确认

• ⼿动确认

 手动确认方法又分为三种:

二. 代码实现(spring环境)

配置相关信息:

1). AcknowledgeMode.NONE 

2 )AcknowledgeMode.AUTO

3)AcknowledgeMode.MANUAL

总结:


前言:

前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下

Rabbitmq官网


一.消息确认机制 

⽣产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常

RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失.
那么如何确保消费端已经成功接收了,并正确处理了呢
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种: 

• ⾃动确认

当autoAck等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景.


• ⼿动确认

当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景. 

 手动确认方法又分为三种:

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)                  RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了.
  • 否定确认: Channel.basicReject(long deliveryTag, boolean requeue) 
    消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.
  • 否定批量确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
    Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令.消费者客⼾端可以调⽤channel.basicNack⽅法来实现.

参数说明:

1)deliveryTag :
消息的唯⼀标识,它是⼀个单调递增的64位的⻓整型值. deliveryTag 是每个通道
(Channel)独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认.

2)multiple 

是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进
⾏批量确认.值为true则会⼀次性ack所有⼩于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息.

 

 3)requeue

表⽰拒绝后,这条消息如何处理.如果requeue参数设置为true,则RabbitMQ会重新将这条
消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,⽽不会把它发送给新的消费者.


二. 代码实现(spring环境)

1.可以直接使用RabbitMQ Java Client 库

2.使用spring集成的amqp

 主要介绍第二种,在spring环境下实现

Spring-AMQP 对消息确认机制提供了三种策略.

public enum AcknowledgeMode { 
    NONE //确认,
    MANUAL//手动 ,
    AUTO //默认;
}

配置相关信息:

基本信息以及确认机制

队列,交换机,以及它们之间的绑定关系 

package com.bite.extensions.config;

import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean("ackQueue")
    public Queue ackQueue() {
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }
    @Bean("directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }
    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("ackQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("ack");
    }
}

生产者:

主要解释消费者在不同确认机制的状态

package com.bite.extensions.controller;

import com.bite.extensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
        return "消息发送成功!";
    }
}

1)AcknowledgeMode.NONE 

这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会⾃动确认
消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失.

 

1)消费者 正常消费情况下

package com.bite.extensions.listener;

import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
        //业务逻辑处理
        System.out.println("业务逻辑处理!");

        System.out.println("业务逻辑完成!");
    }
}

消费者正确处理,MQ删除相应信息

2)消费者 异常消费情况下

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
        //业务逻辑处理
        System.out.println("业务逻辑处理!");
        int num = 3/0; //异常
        System.out.println("业务逻辑完成!");
    }
}

 可以看到,消费者处理失败,但是消息已经从RabbitMQ中移除.

2 )AcknowledgeMode.AUTO

这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息. 

    listener:
      simple:
        acknowledge-mode: auto  #消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
        //业务逻辑处理
        System.out.println("业务逻辑处理!");
        //int num = 3/0;
        System.out.println("业务逻辑完成!");
    }
}

 

2)消费者 异常消费情况下 

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
        //业务逻辑处理
        System.out.println("业务逻辑处理!");
        int num = 3/0;
        System.out.println("业务逻辑完成!");
    }
}
..........
接收到信息: consumer ack mode test..., deliveryTag: 88
业务逻辑处理!
2024-11-17T15:19:11.420+08:00  WARN 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
接收到信息: consumer ack mode test..., deliveryTag: 89
业务逻辑处理!
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-11-17T15:19:11.477+08:00  INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

消费者处理异常,会一直重试发送,所有仍然保留在mq中

3)AcknowledgeMode.MANUAL

⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消
息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这
种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理.

    listener:
      simple:
        acknowledge-mode: manual#消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws Exception {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
            //业务逻辑处理
            System.out.println("业务逻辑处理!");
            //int  num = 3/0;
            System.out.println("业务逻辑完成!");
            //肯定确认
            channel.basicAck(deliverTag,false);
        } catch (Exception e) {
            //否定确认
           channel.basicNack(deliverTag,false,true);
        }
    }
}

如果不进行确认 又会发送什么?

 当我们使用手动确认(manual)的时候,一定要手动添加上肯定确认,不然即使消费者处理成功,也不会进行确认!

 2)消费者 异常消费情况下 

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handleMessage(Message message, Channel channel) throws Exception {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
            //业务逻辑处理
            System.out.println("业务逻辑处理!");
            int  num = 3/0;
            System.out.println("业务逻辑完成!");
            //肯定确认
            channel.basicAck(deliverTag,false);
        } catch (Exception e) {
            //否定确认
           channel.basicNack(deliverTag,false,true);
        }
    }
}

 否定确认完,又会进行重新入队,会变成Ready状态

此时修改为false,不让它入队,会发生什么? 

消费者处理异常,会不停的重试 

使用manual,一定要进行手动确认


总结:

模式确认方式可靠性性能使用场景
None无确认低,可能丢失消息不关心消息是否成功消费,丢失消息可容忍的场景
Auto自动确认较低,可能丢失消息较高对丢失消息容忍度较高的场景
Manual手动确认高,消息只有成功处理才会确认较低需要确保每条消息被成功消费的场景
  • None 适用于性能要求高,但对消息丢失不敏感的场景。
  • Auto 适合那些不需要太高消息可靠性的应用,但仍然需要自动化确认机制。
  • Manual 最适合那些对消息处理的可靠性要求较高,尤其是在出现异常时需要精细控制消息是否重新入队或丢弃的场景。

选择哪种模式取决于你的具体需求,尤其是对于消息可靠性的要求以及系统的性能考虑。 


结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/917970.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

QT入门之下载、工程创建、学习方法

1.QT下载链接 因为我的是下载在LINUX上面,所以这里提供LINUX平台下的下载方式: wget http://download.qt.io/archive/qt/5.12/5.12.9/qt-opensource-linux-x64-5.12.9.run 赋予可执行权限,加上 sudo 权限进入安装,这样会安装在…

初识Linux—— 基本指令(上)

前言 Linux简述 ​ Linux是一种开源、自由、类UNIX的操作系统,由著名的芬兰程序员林纳斯托瓦兹(Linus Torvalds)于1991年首次发布。Linux的内核在GNU通用公共许可证(GPL)下发布,这意味着任何人都可以自由…

劳动力市场

1.劳动力市场概述 (1)劳动力:所有有工作能力且愿意工作的人的总称,由那些正在工作(就业)和正在寻找工作(失业)的人组成,表示为:L(劳动力&#xf…

PHP代码审计 --MVC模型开发框架rce示例

MVC模型开发框架 控制器Controller:负责响应用户请求、准备数据,及决定如何展示数据 模块Model:管理业务逻辑和数据库逻辑,提供链接和操作数据库的抽象层 视图View:负责前端模板渲染数据,通过html呈现给用户…

Dify 通过导入 DSL 文件创建 Workflow 过程及实现

本文使用 Dify v0.9.2 版本,主要介绍 Dify 通过导入 DSL(或 URL)文件创建(或导出)Workflow 的操作过程及源码分析实现过程。Dify通过导入DSL文件创建Workflow过程及实现:https://z0yrmerhgi8.feishu.cn/wik…

Redis五大基本类型——List列表命令详解(命令用法详解+思维导图详解)

目录 一、List列表类型介绍 二、常见命令 1、LPUSH 2、LPUSHX 3、RPUSH 4、RPUSHX 5、LRANGE 6、LPOP 7、RPOP 8、LREM 9、LSET 10、LINDEX 11、LINSERT 12、LLEN 13、阻塞版本命令 BLPOP BRPOP 三、命令小结 相关内容: Redis五大基本类型——Ha…

有序数组的平方(leetcode 977)

一个数组&#xff0c;返回一个所有元素的平方之后依然是一个有序数组。&#xff08;数组中含负数&#xff09; 解法一&#xff1a;暴力解法 所有元素平方后再使用快速排序法重新排序&#xff0c;时间复杂度为O(nlogn)。 class Solution { public:vector<int> sortedSqu…

调用门提权

在我写的2.保护模式&#xff0b;段探测这篇文章中&#xff0c;我们提到了S位对于段描述符的控制&#xff0c;之前我们已经介绍了代码段和数据段&#xff0c;现在我们来把目光转到系统段 在这么多中结构里面&#xff0c;我们今天要介绍的就是编号为12的&#xff0c;32位调用门 结…

Web Service 学习笔记

Web Service 学习笔记 Web Service 基本概念 Web Service 即 web 服务&#xff0c;它是一种跨编程语言和跨操作系统平台的远程调用技术。 Java 中共有三种 Web Service 规范&#xff1a; JAX-WS(JAX-RPC): 基于 xml 数据JAXM&SAAJJAX-RS&#xff1a;基于 xml 或 json 数…

爬虫——JSON数据处理

第三节&#xff1a;JSON数据处理 在爬虫开发中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;是最常见的数据格式之一&#xff0c;特别是在从API或动态网页中抓取数据时。JSON格式因其结构简单、可读性强、易于与其他系统交互而广泛应用于前端与后端的数…

计算机编程中的设计模式及其在简化复杂系统设计中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 计算机编程中的设计模式及其在简化复杂系统设计中的应用 计算机编程中的设计模式及其在简化复杂系统设计中的应用 计算机编程中的…

【Tealscale + Headscale + 自建服务器】异地组网笔记

文章目录 效果为什么要用 Headscale云服务器安装 Headscale配置 config.yaml创建反向代理搭建管理 UI授权管理 UI添加互联设备参考 效果 首先是连接情况&#xff0c;双端都连接上自建的 Headscale&#xff0c; 手机使用移动流量&#xff0c;测试一下 ping 值 再试试进入游戏 可…

单片机学习笔记 2. LED灯闪烁

目录 0、实现的功能 1、Keil工程 2、代码实现 0、实现的功能 LED灯闪烁 1、Keil工程 闪烁原理&#xff1a;需要进行软件延时达到人眼能分辨出来的效果。常用的延时方法有软件延时和定时器延时。此次先进行软件延时 具体操作步骤和之前的笔记一致。此次主要利用无符号整型的范…

【Cesium】自定义材质,添加带有方向的滚动路线

【Cesium】自定义材质&#xff0c;添加带有方向的滚动路线 &#x1f356; 前言&#x1f3b6;一、实现过程✨二、代码展示&#x1f3c0;三、运行结果&#x1f3c6;四、知识点提示 &#x1f356; 前言 【Cesium】自定义材质&#xff0c;添加带有方向的滚动路线 &#x1f3b6;一、…

Vue之插槽(slot)

插槽是vue中的一个非常强大且灵活的功能&#xff0c;在写组件时&#xff0c;可以为组件的使用者预留一些可以自定义内容的占位符。通过插槽&#xff0c;可以极大提高组件的客服用和灵活性。 插槽大体可以分为三类&#xff1a;默认插槽&#xff0c;具名插槽和作用域插槽。 下面…

从零开始深度学习:全连接层、损失函数与梯度下降的详尽指南

引言 在深度学习的领域&#xff0c;全连接层、损失函数与梯度下降是三块重要的基石。如果你正在踏上深度学习的旅程&#xff0c;理解它们是迈向成功的第一步。这篇文章将从概念到代码、从基础到进阶&#xff0c;详细剖析这三个主题&#xff0c;帮助你从小白成长为能够解决实际…

Python 绘图工具详解:使用 Matplotlib、Seaborn 和 Pyecharts 绘制散点图

目录 数据可视化1.使用 matplotlib 库matplotlib 库 2 .使用 seaborn 库seaborn 库 3 .使用 pyecharts库pyecharts库 注意1. 确保安装了所有必要的库2. 检查Jupyter Notebook的版本3. 使用render()方法保存为HTML文件4. 使用IFrame在Notebook中显示HTML文件5. 检查是否有其他输…

【C++】vector 类模拟实现:探索动态数组的奥秘

&#x1f31f; 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。&#x1f31f; 如果你对string&#xff0c;vector还存在疑惑&#xff0c;欢迎阅读我之前的作品 &#xff1a; 之前文章&#x1f525;&#x1f52…

【ubuntu18.04】vm虚拟机复制粘贴键不能用-最后无奈换版本

我是ubuntu16版本的 之前费老大劲安装的vmware tools结果不能用 我又卸载掉&#xff0c;安装了open-vm-tools 首先删除VMware tools sudo vmware-uninstall-tools.pl sudo rm -rf /usr/lib/vmware-tools sudo apt-get autoremove open-vm-tools --purge再下载open-vm-tools s…

使用原生 OpenTelemetry 解锁各种可能性:优先考虑可靠性,而不是专有限制

作者&#xff1a;来自 Elastic Bahubali Shetti•Miguel Luna Elastic 现在支持使用 OTel Operator 在 Kubernetes 上部署和管理 Elastic Distributions of OpenTelemetry (EDOT)。SRE 现在可以访问开箱即用的配置和仪表板&#xff0c;这些配置和仪表板旨在通过 Elastic Observ…