RabbitMQ消息的应答

消息的应答机制

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了, RabbitMQ 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵,批量应答的批量范围是channel 上未应答的消息。比如说 channel 上有传送 tag 的消息5,6,7,8 当前 tag 是8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。但是实际上还是不建议开启批量应答的。

image-20220530100237882

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

Channel.basicNack(用于否定确认)

不处理该消息了直接拒绝,可以将其丢弃了。

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数。

不处理该消息了直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

代码实现

生产者
public class MyProducer {

    @Test
    public void test() throws Exception {
        // 队列名称
        String queue = "xw_queue";
        String message = "Hello World -> ";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 20; i++) {
            // 发布消息
            channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());
        }
    }
}
消费者1

开启手动确认后,消费者1如果在处理消息的回调中不确认消息,那么队列中的消息会处于unacked的状态,如果消费者1突然挂掉,那么这些未确认的消费会重新发送给其他消费者。

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 队列名称
        String queue = "xw_queue";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind("", "xw_exchange", queue);
        // 配置开启手动应答
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费完成后手动应答
                // channel.basicAck(envelope.getDeliveryTag(), false);
                Thread.sleep(5000);
                System.out.println("消费者1:接收到消息: " + new String(body));
            }
        });
    }
}
消费者2

消费者2正常消费消息,收到消息后立刻确认。

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 队列名称
        String queue = "xw_queue";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind("", "xw_exchange", queue);
        // 配置开启手动应答
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费完成后手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println("消费者2:接收到消息: " + new String(body));
            }
        });
    }
}

效果展示

image-20220530102415347

image-20220530102445700

此时把消费者1停掉,那么上面这10条为确认的消费会重新入队,发送给另外的消费者。

image-20220530102642150

image-20220530102708467

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/201289.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL】常用内置函数:数值函数 / 字符串函数 / 日期函数 / 其他函数

文章目录 数值函数round()&#xff1a;四舍五入ceiling()&#xff1a;上限函数floor()&#xff1a;地板函数abs()&#xff1a;计算绝对值rand()&#xff1a;生成0-1的随机浮点数 字符串函数length()&#xff1a;获取字符串中的字符数upper() / lower()&#xff1a;将字符串转化…

基于springboot+vue的在线考试系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

水果编曲软件FL Studio21最新中文版本2023年最新FL 21中文版如何快速入门教程

水果编曲软件FL Studio介绍 各位&#xff0c;大家晚上好&#xff0c;今天给大家带来最新最新2023水果编曲软件FL Studio 21中文版下载安装激活图文教程。我们一起先了解一些FL Studio 。FL Studio21是目前流行广泛使用人数最多音乐编曲宿主制作DAW软件&#xff0c;这款软件相信…

git stash save untracked not staged

git stash save untracked not staged 如图 解决方案&#xff1a; git stash save "tag标记信息" --include-untracked或者&#xff1a; git stash save -u "tag标记信息" git stash clear清空本地暂存代码_zhangphil的博客-CSDN博客文章浏览阅读486次。…

QDoubleSpinBox的使用示例

QDoubleSpinBox即可以做为数值型输入框使用&#xff0c;也可以使用只读型数据显示框&#xff0c;在作为输入框使用时比QLineEdit有以下几个方面的优势 1.可以设置范围&#xff0c;并且范围精确&#xff0c; 2.输入数据精确&#xff0c;自动屏幕非数值以外的字符。 3.设置步长后…

LeetCode(40)同构字符串【哈希表】【简单】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 同构字符串 1.题目 给定两个字符串 s 和 t &#xff0c;判断它们是否是同构的。 如果 s 中的字符可以按某种映射关系替换得到 t &#xff0c;那么这两个字符串是同构的。 每个出现的字符都应当映射到另一个字符&#xff0…

java开发需要用到的软件,必备软件工具一览

java开发需要用到的软件&#xff0c;必备软件工具一览 如果你对Java编程感兴趣或已经是一名Java开发者&#xff0c;你需要一些必备的软件工具来提高你的生产力和简化开发过程。在本文中&#xff0c;我们将探讨Java开发所需的关键软件工具&#xff0c;并通过具体示例来解释它们的…

Elk:filebeat 日志收集工具和logstash

Elk:filebeat 日志收集工具和logstash Filebeat是一个轻量级的日志手机工具,所使用的系统资源比logstash部署和启动时使用的资源要小得多 Filebeat可以在非java环境使用&#xff0c;他可以代理logstash在非java环境上收集日志 缺点 Filebeat无法实现数据的过滤,一般是结合l…

使用 graph_tool 绘制网络关系图

目标 使用python的graph_tool包&#xff0c;根据以下表格&#xff0c;生成网络关系图。 采样方法大类小类低空遥感解译地面裸土地,人工地面地面影像解译水生植物水葫芦,荷叶,苦草,黑藻,水华,水白菜RTK测量禾本植物狗牙根,华克拉莎,斑茅,苔草,芦苇,芦竹,杂茅RTK测量竹风箱树,马…

注解(概念、分类、自定义注解)

注解基本概念 注解(元数据)为我们在代码中添加信息提供一种形式化的方法&#xff0c;我们可以在某个时刻非常方便的使用这些数据。将的通俗一点&#xff0c;就是为这个方法增加的说明或功能。 作用&#xff1a; 编写文档&#xff1a;通过代码里标识的注解生成文档【生成doc文…

bugku题解记录2

文章目录 哥哥的秘密黄道十二官where is flag一段新闻 哥哥的秘密 给出了一个qq&#xff0c;那就去看看呗 hint里面说 收集空间信息——相册——收集微博信息——相册——解题——相册——提交flag 那看看空间先 盲文&#xff1a; hint&#xff1a;密码时地人 旗帜存在相册里…

聚焦 6G 无线技术——目标和需求

从 3G 到 5G 乃至之后的每一种无线标准&#xff0c;都在设计时加入了推动行业发展的具体目标。例如&#xff0c;4G 专注于以 IP 为中心的灵活语音、数据和视频通信&#xff0c;而 5G 则在此基础上进行了改进。6G 的目标是提供更加无处不在、更高效、更身临其境的无线连接。6G 系…

想要更快的文件传输?看看这些aspera的替代方案吧

随着数据量的增大&#xff0c;文件传输已经成为许多公司和组织日常工作中必不可少的环节之一。而对于大容量、海量数据的传输&#xff0c;普通的传输方式可能甚至无法胜任。Aspera作为一种高效的文件传输协议应运而生&#xff0c;其能够在处理大容量、高速传输方面表现出色。然…

Verilog 入门(一)(Verilog 简介)

文章目录 什么是 Verilog HDL&#xff1f;Verilog 主要能力模块时延数据流描述方式 什么是 Verilog HDL&#xff1f; Verilog HDL是一种硬件描述语言&#xff0c;用于从算法级、门级到开关级的多种抽象设计层次的数字系统建模。被建模的数字系统对象的复杂性可以介于简单的门和…

MATLAB | 官方举办的动图绘制大赛 | 第三周赛情回顾

MATHWORKS官方举办的迷你黑客大赛第三期(MATLAB Flipbook Mini Hack)的最新进展&#xff01;&#xff01; 很荣幸前三周都成为了阶段性获奖者~&#xff1a; https://ww2.mathworks.cn/matlabcentral/communitycontests/contests/6/entries/13382 https://ww2.mathworks.cn/mat…

97.STL-查找算法 find

目录 STL-查找算法find 1.基本用法&#xff1a; 2.查找自定义类型&#xff1a; 3.查找范围&#xff1a; STL-查找算法find 在C的STL&#xff08;标准模板库&#xff09;中&#xff0c;find 算法用于在指定范围内查找指定值的元素。 功能描述&#xff1a; 查找指定元素&…

git-4

1.在GitHub上创建个人仓库 现在仓库中有LICENSE文件&#xff0c;但本地没有这个文件&#xff0c;该怎么办呢&#xff1f;往下看 2.把本地仓库同步到GitHub 3.不同人修改了不同文件如何处理&#xff1f; 两个人在同一个分支上&#xff0c;两个人修改了不同文件 其中一人&…

Django路由分发

首先明白一点&#xff0c;Django的每一个应用下都可以有自己的templates文件夹&#xff0c;urls.py文件夹&#xff0c;static文件夹&#xff0c;基于这个特点&#xff0c;Django能够很好的做到分组开发&#xff08;每个人只写自己的app&#xff09;&#xff0c;作为老大&#x…

uniapp 打包的 IOS打开白屏 uniapp打包页面空白

uniapp的路由跟vue一样,有hash模式和history模式, 使用 URL 的 hash 来模拟一个完整的 URL,于是当 URL 改变时,页面不会重新加载。 如果不想要很丑的 hash,我们可以用路由的 history 模式,这种模式充分利用 history.pushState API 来完成 URL 跳转而无须重新加载页面。…