RabbitMQ系列学习笔记(四)--消息应答机制

文章目录

  • 一、消息应答详解
    • 1、基本概念
    • 2、自动应答
    • 3、手动应答
    • 4、自动重新入队
    • 5、手动应答代码
    • 6、手动应答演示
  • 二、不公平分发
  • 三、预取值机制

本文参考:
尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq
RabbitMQ 详解
Centos7环境安装Erlang、RabbitMQ详细过程(配图)

一、消息应答详解

1、基本概念

在RabbitMQ系列学习笔记(三)–工作队列模式的案例中进行消息发送时,应答机制参数我们设置的是true,即自动应答。此时RabbitMQ 一旦向消费者传递了一条消息,便立即认为已经发送成功,并将该消息标记为删除。在通常情况下消费者完成一个消息处理任务可能需要一段时间,如果其中一个消费者在处理一个长的任务过程中突然挂掉了,将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是消费者在接收到消息并且处理该消息成功之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

2、自动应答

前文案例我们一直使用的是自动应答方式,消息在发送后立即被认为已经传送成功,这种模式下如果消息在接收到之前,消费者那边出现连接关闭或者 channel 关闭,或者是在消息处理过程中消费者宕机,那么消息就丢失了,使用这种模式需要在高吞吐量和数据传输安全性方面做权衡,速度快,但是会失去数据安全性。

3、手动应答

消费者在接收到消息以后使用手动应答的方式回复RabbitMQ已经接收成功,在java中,可以使用以下三种函数进行手动应答:

  • Channel.basicAck(用于肯定确认):告诉RabbitMQ已成功处理该消息,可以将其丢弃了。
  • Channel.basicNack(用于否定确认):告诉RabbitMQ没有成功接收该消息。
  • Channel.basicReject(用于否定确认):告诉RabbitMQ不处理该消息而是直接拒绝,可以将其丢弃,与Channel.basicNack相比少一个Multiple参数。

image.png

multiple取值为true和false代表不同意思:

  • true:代表批量应答channel上未应答的消息。比如说channel上有传送tag的消息 5,6,7,8 当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答,即批量应答。但此时后边的5-7实际上还未接收处理,此时进行批量应答仍然会有数据丢失的风险。
  • false:同上面相比只会应答tag=8的消息 5,6,7 这三个消息依然不会被确认收到消息应答,相当于是逐条应答,此时可靠性较高,相应的速度会降低。

image.png

4、自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔下线,也可以确保不会丢失任何消息。image.png

5、手动应答代码

默认情况下在进行消息接收 channel.basicConsume() 的第二个函数为true即自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,并在消费者实现代码中手动实现ack函数。
消息生产者代码:

public class Task {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        // 1.通过工具类建立信道
        Channel channel = RabbitMqUtil.getChannel();
        // 2.声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 3.从控制台获取输入,并将其作为消息发送消息队列
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入信息");
		while (scanner.hasNext()){
			String message = scanner.next(); 
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); 
			System.out.println("生产者发出消息:"+message);
		}
        // 4.关闭资源
        channel.close();
    }
}

消息消费者01代码:

public class Consumer01 {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("C1等待接收消息,处理时间较短.....");
        // 采用手动应答
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                SleepUtils.sleep(1);
                System.out.println("接收到消息: " + message);
                /*
                 * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息
                 *
                 * 第一个参数:添加手动确认消息的唯一标识
                 * 第二个参数:确认方式。是否一次抓取并确认多个消息
                 *   true:设置为true,即表示可以批量确认消息
                 *   false:设置为false,即表示无论多少个消息,都需要一次一次的确认
                 *   实际中可根据不同场景进行设置...
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消息消费者02代码:

public class Consumer02 {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("C2等待接收消息,处理时间较长.....");
        // 采用手动应答
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                SleepUtils.sleep(30);
                System.out.println("接收到消息: " + message);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

睡眠工具类代码:

public class SleepUtils {
    public static void sleep(int second){
        try {
        	Thread.sleep(1000*second);
        } catch (InterruptedException _ignored) {
        	Thread.currentThread().interrupt();
        }
    }
}

6、手动应答演示

正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理。
image.png
在发送消息dd时,dd本应交给c2处理,由于它处理时间比较长,在还未处理完时,模拟c2出现故障,将c2停掉,也就是说C2还没有执行ack代码的时候,C2宕机了,此时会看到消息被C1接收到了,说明消息dd被重新入队,然后分配给能处理消息的C1处理了。
image.png
image.png
image.png

二、不公平分发

上述例子的运行结果其实属于公平分发,即消息被均匀分发到各个消费者,每个消费者各占N条消息,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者处理速度却很慢,这个时候我们还是采用轮训分发的话就会导致处理速度快的消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,显然在这种场景下公平分发机制并不适合。
为了避免这种情况,我们可以设置参数channel.basicQos(1);
RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认之前,不再消费新的消息。
image.png
这个数值称之为预先抓取消息的数量值,当设置为1时,c1和c2每次都会从队列中抓取一个消息进行处理,当执行完处理流程发送确认完毕以后才会继续抓取下一个,如果c2速度慢,这个任务还没有处理完,rabbitmq就会把下一个任务分配给没有那么忙的那个空闲消费者c1,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他存储任务的策略。
消费者每次最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发。

注意:使用这种机制一定是在手动应答情况下的,不能是自动应答。


通过测试发现,c1的速度快,因此c1处理的消息就多,属于能者多劳。

三、预取值机制

不公平分发其实就是使用了预取值机制,通过将prefetch设置为1,消费端每次拉取一条消息进行处理,这样谁的速度快谁拉取的频率就高,自然处理的消息就多。下面是预取值(prefetch)的详细介绍。

ChatGPT
在 RabbitMQ 中,预取值(Prefetch Count)是指消费者从队列中预取的消息数量。当一个消费者连接到一个队列并开始消费消息时,它可以通过设置预取值来控制一次从队列中获取的消息数量。预取值可以在消费者创建时进行设置,也可以在运行时进行更改。
预取值的主要作用是控制消费者的负载,避免一个消费者在处理消息时占用过多的资源,导致其他消费者无法获得足够的资源。通过限制每次预取的消息数量,可以控制消费者的处理速度,避免过度消费队列中的消息。
预取值的设置方式有两种:

  • 全局设置:通过 channel.basicQos(prefetchCount) 方法设置全局预取值。在这种情况下,所有的消费者都将使用相同的预取值。
  • 单独设置:通过 channel.basicConsume(queue, consumer) 方法的 prefetchCount 参数设置单独的预取值。在这种情况下,每个消费者都可以使用不同的预取值。

假想我们 RabbtiMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间全部喷涌推动过来,但是单个客户端无法同时处理这么多条数据,就会被压垮崩溃。此时可以通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/897115.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何去掉歌曲的人声只剩伴奏?伴奏独享的方法

在音乐制作、后期处理或是个人娱乐中,我们经常遇到需要将歌曲中的人声去除,仅保留伴奏的情况。虽然这一过程可能听起来颇为复杂,但实际上,借助现代音乐技术和软件,我们可以较为轻松地达成这一目标。本文将介绍三种常见…

[AWS]RDS数据库版本升级

背景:由于AWS上mysql5.7版本不再支持,需要进行版本升级。 吐槽:每年都要来那么几次,真的有病一样,很烦。 步骤一、升级检查 AWS提供了一个python的升级检测脚本,可以按照一下脚本下载测试: [r…

机器视觉基础系列2—简单了解用神经网络进行深度估计

机器视觉基础系列2—简单了解深度估计 深度估计 深度估计通俗的来讲就是要得到一张图像当中,哪些区域离得比较近,哪些区域离得比较远。 输入一张彩色得图像,我们输出深度估计得图像,深浅即为远近(从而完成了离相机距离…

Git安装与配置(2.47.0版本超详细)

一、背景 1.什么是gitt?(官网引用) Git 是一个快速、可扩展的分布式版本控制系统,它拥有异常丰富的命令集,可以提供高级操作和对内部的完全访问。 参阅 gittutorial[7] 开始使用,然后查看 giteveryday[7] …

【2022统考真题】计算时间复杂度

目录 一、题目描述 二、思路分析 三、易错提醒 四、同级和嵌套的关系 一、题目描述 下列程序段的时间复杂度是&#xff08;&#xff09; int sum 0; for (int i 1; i < n; i * 2) for (int j 0; j < i; j) sum; A. O(logn) B. O(n) C. O(nlogn) D…

使用Radzen Blazor组件库开发的基于ABP框架炫酷UI主题

一、项目简介 使用过ABP框架的童鞋应该知道它也自带了一款免费的Blazor UI主题&#xff0c;它的页面是长这样的&#xff1a; 个人感觉不太美观&#xff0c;于是网上搜了很多Blazor开源组件库&#xff0c;发现有一款样式非常不错的组件库&#xff0c;名叫&#xff1a;Radzen&am…

iEnglish「速成」板块上线,快速提升英语能力

10月17日&#xff0c;iEnglish智能升级版正式推出了「速成」板块&#xff0c;这一创新举措不仅是AI教育深度融合的体现&#xff0c;还为用户提供了更为高效的个性化学习体验。 据悉&#xff0c;「速成」板块旨在通过个性化的学习模式和多元化的练习方式&#xff0c;帮助用户实…

SSD |(九)ECC原理 | LDPC

文章目录 &#x1f4da;信号和噪声&#x1f4da;通信系统模型&#x1f4da;纠错编码的基本思想&#x1f407;编码距离&#x1f407;线性纠错码的基石——奇偶校验&#x1f407;校验矩阵H和生成矩阵G &#x1f4da;LDPC原理简介&#x1f407;LDPC是什么&#x1f407;Tanner图 &a…

scrapy案例——当当网的爬取一

项目名称&#xff1a;当当网的爬取一——爬取青春文学的书籍数据 案例需求&#xff1a; 1.使用scrapy爬虫技术爬取当当网中青春文学的书籍数据&#xff0c;包括&#xff08;标题、现价、定价、作者、出版日期、出版社、书本详情和书本图片url&#xff09; 2.将获取到的数据保…

免费开源的微信开发框架

近年来&#xff0c;随着人工智能技术的快速发展&#xff0c;聊天机器人在各个领域得到了广泛的应用。在社交媒体中&#xff0c;自动回复成为了一个流行的功能&#xff0c;让用户可以方便地与机器人进行互动。gewe框架&#xff0c;一个开源的微信聊天机器人框架&#xff0c;实现…

高刚性重切削数控走心机

高刚性重切削数控走心机&#xff0c;作为现代精密加工领域的佼佼者&#xff0c;以其卓越的性能和广泛的应用领域&#xff0c;赢得了众多行业的青睐。下面&#xff0c;我将从多个方面为您详细解析这种数控走心机。 ‌一、定义与特点‌ ‌定义‌&#xff1a;高刚性重切削数控走心…

【Java 并发编程】单例模式

前言 单例模式是一种十分常用但却相对而言比较简单的单例模式。虽然它简单但是包含了关于线程安全、内存模型、类加载机制等一些比较核心的知识点。本章会介绍单例模式的设计思想&#xff0c;会去讲解了几种常见的单例实现方式&#xff0c;如饿汉式、懒汉式、双重检锁、静态内部…

C++和OpenGL实现3D游戏编程【连载16】——详解三维坐标转二维屏幕坐标(向量和矩阵操作实战)

&#x1f525;C和OpenGL实现3D游戏编程【目录】 1、本节课要实现的内容 在上一课我们了解了着色器&#xff0c;了解了部分核心模式编程内容&#xff0c;从中接触到了线性代数中向量和矩阵相关知识&#xff0c;我们已经能够感受到向量和矩阵在OpenGL编程中的重要性。特别是后期…

Linux——传输层协议

目录 一再谈端口号 1端口号范围划分 2两个问题 3理解进程与端口号的关系 二UDP协议 1格式 2特点 3进一步理解 3.1关于UDP报头 3.2关于报文 4基于UDP的应用层协议 三TCP协议 1格式 2TCP基本通信 2.1关于可靠性 2.2TCP通信模式 3超时重传 4连接管理 4.1建立…

MySQL数据库的高可用

一、MHA工作原理 1、MHA的工作原理 1、MHA利用 select 1 as value 指令判断master服务器的健康性&#xff0c;一旦master宕机&#xff0c;MHA从宕机崩溃idmaster保存二进制日志事件&#xff08;binlog events&#xff09; 2、识别含有最新更新的slave 3、应用差异的中继日志&a…

bcprov-jdk15on-1.52.0.jar has unsigned entries - org/bouncycastle/LICENSE

报错界面如上图 解决办法&#xff1a; 1.修改引用jar包&#xff0c;将build.gradle里面的依赖为 implementation org.bouncycastle:bcprov-jdk15on:1.52 2.到maven上下载最新的bcprov-jdk15on-1.52.0.jar,替换文件夹中原有的jar包

C/C++每日一练:实现一个环形队列

队列&#xff08;queue&#xff09; 队列是一种先进先出&#xff08;FIFO&#xff0c;First In First Out&#xff09; 的数据结构&#xff0c;类似于排队的场景。最先进入队列的元素最先被处理&#xff0c;而后加入的元素则排在队列的末尾。 常见的队列操作&#xff1a; 入队…

第二届中国楚域品牌文化创新发展大会暨楚域尚品发布会在汉圆满落幕

10 月 19 日&#xff0c;“第二届中国楚域品牌文化创新发展大会暨楚域尚品发布会”在武汉市光谷九通海源大酒店隆重举行。本次大会由中国商业文化研究会传承创新工作委员会、楚域品牌文化传承创新工作委员会、华夏品牌文化创新发展大会组委会主办&#xff0c;湖北省企业文化促进…

python爬虫简易入门示例

版本环境 win11python 3.12.4 目标&#xff1a;爬取https://gitee.com/explore的列表内容&#xff0c;并写入txt文本 效果 开始 1.安装依赖 pip install requests beautifulsoup42.编写代码&#xff0c;如下&#xff0c;详见注释 import requests from bs4 import Beauti…

【PFGA】二选一数选器

文章目录 前言一、实验原理二、实验过程三、实验结果参考文献 前言 进行 verilog FPGA 实验 一、实验原理 二、实验过程 三、实验结果 代码 module mux21(input s,input a,input b,output reg y); always(s or a or b) beginif (~s) beginy<a;end else beginy<…