RabbitMQ学习04

文章目录

    • 发布确认
      • 1. 发布确认的原理
      • 2. 发布确认的策略
        • 2.1.开启发布确认的方法
        • 2.2.单个确认
        • 2.3.批量确认发布
        • 2.4.异步确认发布
        • 2.5.如何处理异步未确认消息
        • 2.6 总结:

发布确认

1. 发布确认的原理

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
    confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2. 发布确认的策略

2.1.开启发布确认的方法

    发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

Channel channel  = connection.createChannel();
channel.confirmSelect();
2.2.单个确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
    这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

    /**
     * 1.单个发布确认
     * @throws Exception
     */
    public static void publishMessageIndividually() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:455ms

2.3.批量确认发布

    上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
     * 批量发布确认
     */
    public static void publishMessageBatch() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;


        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            if (i % batchSize == 99){
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("消息发送成功");
                }
            }
        }
        
        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:62ms

2.4.异步确认发布

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述

/**
     * 异步发布确认
     */

    public static void publishMessageAsync() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开启时间
        long begin = System.currentTimeMillis();

        ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
            System.out.println("确认消息" + deliveryTag);
        };
        /**
         * 1.消息的表示
         * 2.是否为批量
         */
        ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
            System.out.println("未确认消息:" + deliveryTag);
        };
        //准备消息监听器
        /**
         * 1.成功的消息
         * 2.失败的消息
         */
        channel.addConfirmListener(ackCallback,nackCallback);


        //批量发布消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());

        }

        //结束时间
        long end = System.currentTimeMillis();

        System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
    }

测试结果:
发布1000条消息,用时:28ms

2.5.如何处理异步未确认消息

    最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

    /**
    * 异步发布确认
    */

   public static void publishMessageAsync() throws Exception{

       Channel channel = RabbitMQUtils.getChannel();
       //队列的声明
       String queueName = UUID.randomUUID().toString();
       channel.queueDeclare(queueName,true,false,false,null);
       //开启发布确认
       channel.confirmSelect();
       //开启时间
       long begin = System.currentTimeMillis();
       /**
        * 线程安全有序的一个哈希表  适用于高并发场景
        * 1.他能轻松的将序号与消息关联
        * 2.批量删除条数  只要给到序号
        * 3.支持高并发(多线程)
        *
        */
       ConcurrentSkipListMap<Long,String> outStandingConfirms =
               new ConcurrentSkipListMap<>();
       
       ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
           if (multiple){
               //2.删除到已经确认的消息,剩下的就是未确认的消息
               ConcurrentNavigableMap<Long, String> confirmed =
                       outStandingConfirms.headMap(deliveryTag);
               confirmed.clear();
           }else {
               outStandingConfirms.remove(deliveryTag);
           }
           System.out.println("确认消息" + deliveryTag);
       };
       /**
        * 1.消息的表示
        * 2.是否为批量
        */
       ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
           //3.打印未确认的消息
           String message = outStandingConfirms.get(deliveryTag);
           System.out.println("未确认消息:" + message);
       };
       //准备消息监听器
       /**
        * 1.成功的消息
        * 2.失败的消息
        */
       channel.addConfirmListener(ackCallback,nackCallback);


       //批量发布消息
       for (int i = 0; i < MESSAGE_COUNT; i++) {
           String message = i + "";
           channel.basicPublish("",queueName,null,message.getBytes());
           // TODO: 1.记录下所有要发送的消息,消息的总和
           outStandingConfirms.put(channel.getNextPublishSeqNo(),message);

       }

       //结束时间
       long end = System.currentTimeMillis();

       System.out.println("发布" + MESSAGE_COUNT + "条消息,用时:" + (end - begin) + "ms");
   }
2.6 总结:

单独发布消息:
同步等待确认,简单,但吞吐量非常有限。
批量发布消息:
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/108532.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构上机实验——二叉树的实现、二叉树遍历、求二叉树的深度/节点数目/叶节点数目、计算二叉树度为1或2的节点数、判断二叉树是否相似

文章目录 数据结构上机实验1.要求2.二叉树的实现2.1创建一颗二叉树2.2对这棵二叉树进行遍历2.3求二叉树的深度/节点数目/叶节点数目2.4计算二叉树中度为 1 或 2 的结点数2.5判断2棵二叉树是否相似&#xff0c;若相似返回1&#xff0c;否则返回0 3.全部源码测试&#xff1a;Bina…

在3分钟内使用AI-Chat生成精美PPT(附AI工具)

前言 在人工智能的大趋势下&#xff0c;AI-Chat是一款令人惊叹的技术。它用强大的自然语言处理技术帮助我们快速生成PPT&#xff0c;提高工作效率。本文将介绍使用ChatAI-Chat生成PPT的方法&#xff0c;以及使用Mindshow转换为炫酷的演示文稿。让技术为我们节省时间&#xff0c…

【数据结构】数组和字符串(十):稀疏矩阵的链接存储:十字链表的矩阵操作(加法、乘法、转置)

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b~c. 三角、对称矩阵的压缩存储d. 稀疏矩阵的压缩存储——三元组表4.2.3三元组表的转置、加法、乘法、操作4.2.4十字链表0. 十字链表的基本操作1. 矩阵加法2. 矩阵乘法3. 矩阵转置4. 主函数 5. 代码…

Binder机制总结笔记

Binder机制总结笔记 什么是Binder&#xff1f; Binder的Android特有的IPC通信机制。Android的四大组件Activity、Service、Broadcast、ContentProvider&#xff0c;不同的App等都运行在不同的进程内&#xff0c;他们之间的通信都需要依靠Binder完成。因此Binder在整个Android系…

ARM | 传感器必要总线IIC

IIC总线介绍 1.谈谈你对IIC总线理解&#xff1f; 1&#xff09;IIC总线是串行半双工同步总线,主要用于连接整体电路 2&#xff09;SCL/SDA作用:IIC是两线制,一根是时钟线SCK,用于控制什么时候进行进行数据传输,时钟信号由主机发出; 另一根是数据线SDA,用于进行数据传输,可以从…

7.多线程之单例模式

单例模式 文章目录 单例模式1. 什么是单例模式2. 饿汉模式3. 懒汉模式3.1 单线程版&#xff1a;3.2 多线程版 1. 什么是单例模式 单例模式是一种设计模式&#xff0c;常见的设计模式还有工厂模式、建造者模式等。 设计模式是一套被反复使用、多数人知晓的、经过分类编目的、代码…

“人类高质量数据”如何训练计算机视觉模型?

人类的视觉系统可以复制吗&#xff1f; 答案是肯定的。 计算机视觉 (Computer Vision) 技术的不断普及&#xff0c;让机器识别和处理图像就像人的大脑一样&#xff0c;且速度更快、更准确。 机器像人类一样去“思考” 计算机视觉 (Computer Vision) 是近年来人工智能增长最快…

玩转视图变量,轻松实现动态可视化数据分析

前言 在当今数据驱动的世界中&#xff0c;数据分析已经成为了企业和组织中不可或缺的一部分。传统的静态数据分析方法往往无法满足快速变化的业务需求和实时决策的要求。为了更好地应对这些挑战&#xff0c;观测云的动态可视化数据分析应运而生。 在动态可视化数据分析中&…

微机原理:汇编语言程序设计

文章目录 一、汇编格式1、文字简述2、代码表述 二、汇编语言结构说明1、方式选择伪指令2、段定义语句3、段约定语句4、汇编结束语句5、返回DOS语句 三、实例1、例子2、汇编语言程序开发过程 四、功能调用DOS功能调用1、功能号01H2、功能号02H3、功能号09H4、功能号0AH5、举例 B…

Ubuntu编译 PCL 1.13.1 详细流程

Ubuntu编译 PCL 1.13. 详细流程 一、编译环境二、虚拟机准备1. 虚拟机扩容2. 配置交换分区 三、Cmake - gui 生成 MakeFile1. 解决 flann 依赖问题2. 配置 Cmake 四、编译安装1.编译&#xff1a;2. 安装 一、编译环境 Ubuntu&#xff1a;Ubuntu 20.04 VMware&#xff1a;VMwar…

统计学习方法 支持向量机(下)

文章目录 统计学习方法 支持向量机&#xff08;下&#xff09;非线性支持向量机与和核函数核技巧正定核常用核函数非线性 SVM 序列最小最优化算法两个变量二次规划的求解方法变量的选择方法SMO 算法 统计学习方法 支持向量机&#xff08;下&#xff09; 学习李航的《统计学习方…

VS工程的“多dll与exe文件合并”

运行环境 ILMerge插件 1、打开 VS的“工具 - NuGet包管理器 - 管理解决方案的NuGet程序包” 2、在浏览中搜索“ILMerge”&#xff0c;在官方源中&#xff0c;3.0.41版本的插件已不支持使用了 3、下拉列表其他版本可以安装&#xff0c;使用3.0.40 4、下载封装好的“ILMerge”任…

React-快速搭建开发环境

1.安装 说明&#xff1a;react-excise-01是创建的文件名 npx create-react-app react-excise-01 2. 打开文件 说明:we suggest that you begin by typing:下面即是步骤。 cd react-excise-01 npm start 3.显示

B. Qingshan Loves Strings(贪心规律)

Problem - B - Codeforces 解析&#xff1a; 首先判断 t 字符串是不是相邻不同并且两端不同。 然后遍历 s 并且判断每一个相邻的相同字符&#xff0c;必须 t 字符符合并且两侧不同。 #include<bits/stdc.h> using namespace std; #define int long long const int N2e55…

计算机视觉注意力机制小盘一波 (学习笔记)

将注意力的阶段大改分成了4个阶段 1.将深度神经网络与注意力机制相结合&#xff0c;代表性方法为RAM ⒉.明确预测判别性输入特征&#xff0c;代表性方法为STN 3.隐性且自适应地预测潜在的关键特征&#xff0c;代表方法为SENet 4.自注意力机制 通道注意力 在深度神经网络中…

状态机的设计与实现

写作目的 好久没有写博客进行输出了&#xff0c;是时候需要水一篇了&#xff0c;嘻嘻。 正好项目中使用了状态机&#xff0c;也借此分享一下系统中状态机的项目落地经验。 什么是状态机 以在某宝下单为例&#xff0c;在点击下单后&#xff0c;此时订单就已经创建了&#xff…

基于Python+pyecharts 实现国内上映电影票房评分可视化分析项目源码

基于Pythonpyecharts 实现国内上映电影票房评分可视化分析项目源码 项目内容 统计2018年在国内上映的所有电影&#xff0c;分别获取上映电影的票房、评分&#xff08;豆瓣、猫眼、时光、imdb&#xff09;、类型、上映日期、演员、导演等数据。利用所获数据绘图&#xff0c;对…

人工智能基础_机器学习006_有监督机器学习_正规方程的公式推导_最小二乘法_凸函数的判定---人工智能工作笔记0046

我们来看一下公式的推导这部分比较难一些, 首先要记住公式,这个公式,不用自己理解,知道怎么用就行, 比如这个(mA)T 这个转置的关系要知道 然后我们看这个符号就是求X的导数,X导数的转置除以X的导数,就得到单位矩阵, 可以看到下面也是,各种X的导数,然后计算,得到对应的矩阵结…

php之 角色的权限管理(RBAC)详解

RBAC&#xff08;Role-based access control&#xff09;是一种常见的权限管理模型&#xff0c;通过将用户分配至特定的角色&#xff0c;以及为角色分配访问权限&#xff0c;实现了权限管理的目的。以下是关于RBAC的详细解释&#xff1a; 角色&#xff1a;RBAC模型的核心是角色…

Java电商平台 - API 接口设计之 token、timestamp、sign 具体架构与实现|电商API接口接入

一&#xff1a;token 简介 Token&#xff1a;访问令牌access token, 用于接口中, 用于标识接口调用者的身份、凭证&#xff0c;减少用户名和密码的传输次数。一般情况下客户端(接口调用方)需要先向服务器端申请一个接口调用的账号&#xff0c;服务器会给出一个appId和一个key, …