RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求:
        //声明队列
        channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);
        //定义回调队列
        String replyQueueName = Common.RPC_RESPONSE_QUEUE;
        //本次请求的唯一标识
        String corrId = UUID.randomUUID().toString();
        //生成发送消息的属性
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        //通过内置交换机, 发送消息
        String message = "hello, rpc......";
        channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应:
        //使用阻塞队列来存储回调结果, 避免了客户端反复访问队列
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        
        //接收服务器的响应
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
                //如果唯一标识正确, 放在阻塞队列中
                if(properties.getCorrelationId().equals(corrId)){
                    response.offer(new String(body));
                }
            }
        };
        channel.basicConsume(replyQueueName, true, consumer);
        //获取回调的结果
        String result = response.take();
        System.out.println("[RPCClient] Result: " + result);
       

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息
        channel.basicQos(1);
        //接收消息, 并对消息进行应答
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                String message = new String(body);
                String response = "接收到消息 request: " + message;
                channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                //对消息进行应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
        try(Connection connection = createConnection()){
            //创建channel
            Channel channel = connection.createChannel();
            //开启信道确认模式
            channel.confirmSelect();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            Long start = System.currentTimeMillis();
            for(int i = 0; i < MESSAGE_COUNT; i++){
                String body = "消息" + i;
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));
                //等待确认消息, 只要消息被确认, 这个方法就会被返回
                //如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException
                channel.waitForConfirmsOrDie();

            }
            Long end = System.currentTimeMillis(5000);
            System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);
        }

    }

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
        try(Connection connection = createConnection()){
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //批量个数
            int batchSize = 100;
            int outstandingMessageCount = 0;
            long start = System.currentTimeMillis();
            for(int i = 0; i < MESSAGE_COUNT; i++){
                String body = "消息" + i;
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));
                outstandingMessageCount++;
                if(outstandingMessageCount == batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if(outstandingMessageCount > 0){
                channel.waitForConfirms(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);
        }
    }

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{
        try(Connection connection = createConnection()){
            Channel channel = connection.createChannel();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            channel.confirmSelect();

            //有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号
            SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        //批量
                        confirmSet.headSet(deliveryTag + 1).clear();
                    }else{
                        confirmSet.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        //批量
                        confirmSet.headSet(deliveryTag + 1).clear();
                    }else{
                        confirmSet.remove(deliveryTag);
                    }
                    //如果处理失败, 需要有消息重发的环节, 此处省略
                }
            });
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));
                confirmSet.add(nextPublishSeqNo);
            }
            while(!confirmSet.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);
        }
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/933863.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入理解 SQL 注入:原理、攻击流程与防御措施

深入理解 SQL 注入&#xff1a;原理、攻击流程与防御措施 在当今数字化的时代&#xff0c;数据安全已成为每个企业和开发者必须面对的重要课题。SQL 注入&#xff08;SQL Injection&#xff09;作为一种常见的网络攻击方式&#xff0c;给无数企业带来了巨大的损失。本文将深入…

【项目实战】基于python+爬虫的电影数据分析及可视化系统

注意&#xff1a;该项目只展示部分功能&#xff0c;如需了解&#xff0c;文末咨询即可。 本文目录 1.开发环境2 系统设计 2.1 设计背景2.2 设计内容 3 系统页面展示 3.1 用户页面3.2 后台页面3.3 功能展示视频 4 更多推荐5 部分功能代码 5.1 爬虫代码5.2 电影信息代码 1.开发环…

JCR一区牛顿-拉夫逊优化算法+分解对比!VMD-NRBO-Transformer-BiLSTM多变量时序光伏功率预测

JCR一区牛顿-拉夫逊优化算法分解对比&#xff01;VMD-NRBO-Transformer-BiLSTM多变量时序光伏功率预测 目录 JCR一区牛顿-拉夫逊优化算法分解对比&#xff01;VMD-NRBO-Transformer-BiLSTM多变量时序光伏功率预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.中科院…

深入探索 Compose 渲染流程:从 UI 树到 Skia 绘制的实现解析

文章目录 前言Compose 渲染流程概述1. Compose 解析1.1 Compose 声明性 UI1.2 Compose 编译1.2.1 Compose 编译概述1.2.2 代码示例1.2.3 编译过程细节 1.3 组合与重组合1.3.1 组合&#xff08;Composition&#xff09;1.3.2 重组合1.3.3 组合与重组合的区别1.3.4 组合与重组合的…

数据结构排序算法详解

数据结构排序算法详解 1、冒泡排序&#xff08;Bubble Sort&#xff09;2、选择排序&#xff08;Selection Sort&#xff09;2、插入排序&#xff08;Insertion Sort&#xff09;4、快速排序&#xff08;Quick Sort&#xff09; 1、冒泡排序&#xff08;Bubble Sort&#xff09…

命令模式的理解和实践

在软件开发中&#xff0c;设计模式是开发者们经过长期实践总结出来的、可复用的解决方案&#xff0c;用于解决常见的设计问题。命令模式&#xff08;Command Pattern&#xff09;是行为型设计模式之一&#xff0c;它通过将一个请求封装成一个对象&#xff0c;从而允许用户用不同…

【C++】关系操作符的全面解析与高级应用

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;关系操作符1.关系操作符的分类与语义2.关系操作符的连用问题3.浮点数比较的精度问题问题示例解决方案 &#x1f4af;总结核心要点 &#x1f4af;小结 &#x1f4af;前言 在…

python爬虫--某房源网站验证码破解

文章目录 使用模块爬取目标验证码技术细节实现成果代码实现使用模块 requests请求模块 lxml数据解析模块 ddddocr光学识别 爬取目标 网站验证码破解思路是统一的,本文以城市列表为例 目标获取城市名以及城市连接,之后获取城市房源信息技术直接替换地址即可 验证码 技术…

java+ssm+mysql校园物品租赁网

项目介绍&#xff1a; 使用javassmmysql开发的校园物品租赁网&#xff0c;系统包含管理员、用户角色&#xff0c;功能如下&#xff1a; 管理员&#xff1a;用户管理&#xff1b;物品管理&#xff08;物品种类、物品信息、评论信息&#xff09;&#xff1b;订单管理&#xff1…

【JS】简单CSS简单JS写的上传进度条

纯JS写的&#xff0c;简单的上传进度条&#xff0c;当上传的文件较大&#xff0c;加一个动态画面&#xff0c;就不会让人觉得出错了或网络卡了 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"v…

Oracle系统性能监控工具oswatcher演示

1、关于 OSW OSWatcher 的使用符合 Oracle 的标准许可条款&#xff0c;并且不需要额外的许可即可使用&#xff01;&#xff01;&#xff01;&#xff01; OSWatcher (oswbb) 是一种 UNIX shell 脚本的集合&#xff0c;主要用于收集和归档操作系统和网络的度量&#xff0c;以便…

Oracle EBS PAC 如何复修非标任务单生产生非常大的PAC成本?

系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.6 问题症状 非标准任务单组件和装配相同物料A,俗称投入A产A。该物料A的期初数量为0。 上期成本假设为20,而本期成本爆增至563.674234。关键问题点: 由于该物料没有期初数量,无法通过“更新定期成本”指定“新期本…

word实践:正文/标题/表图等的共用模板样式设置

说在前面 最近使用word新建文件很多&#xff0c;发现要给大毛病&#xff0c;每次新建一个word文件&#xff0c;标题/正文的字体、大小和间距都要重新设置一遍&#xff0c;而且每次设置这些样式都忘记了参数&#xff0c;今天记录一下&#xff0c;以便后续方便查看使用。现在就以…

java抽奖系统(一)2.0

1. 项⽬介绍 1.1 背景 随着数字营销的兴起&#xff0c;企业越来越重视通过在线活动来吸引和留住客⼾。抽奖活动作为⼀种有效的营 销⼿段&#xff0c;能够显著提升⽤⼾参与度和品牌曝光率。于是我们就开发了以抽奖活动作为背景的Spring Boot项⽬&#xff0c;通过这个项⽬提供⼀…

spring 源码分析

1 IOC 源码解析 BeanDefinition: bean的定义。里面会有beanClass、beanName、scope等属性 beanClass&#xff1a;通过Class.forName生成的Class 对象beanName&#xff1a;context.getBean(“account”)&#xff0c;acount就是beanNamescope: 作用区分单例bean、原型bean Bea…

Android水波纹效果

Android水波纹效果 需要到水波纹效果的场景还蛮少的。万一刚好你需要呢 一、思路&#xff1a; 自定义组件SpreadView 二、效果图&#xff1a; 看视频更直观点&#xff1a; Android轮子分享-水波纹效果 三、关键代码&#xff1a; public class SpreadView extends View {pr…

用 NotePad++ 运行 Java 程序

安装包 网盘链接 下载得到的安装包: 安装步骤 双击安装包开始安装. 安装完成: 配置编码 用 NotePad 写 Java 程序时, 需要设置编码. 在 设置, 首选项, 新建 中进行设置, 可以对每一个新建的文件起作用. 之前写的文件不起作用. 在文件名处右键, 可以快速打开 CMD 窗口, 且路…

TaskBuilder SQL执行工具

为了方便开发者连接当前任擎服务器上配置的各个数据源对应的数据库进行相关操作&#xff0c;TaskBuilder提供了一个SQL执行工具&#xff0c;点击系统侧边栏里的执行SQL图标 &#xff0c;即可打开该工具&#xff0c;界面如下图所示&#xff1a; 该工具从上至下分为三个区域&a…

学生信息管理系统(简化版)数据库部分

使用Mysql&#xff0c;与navicat工具 下面是mysql创建的代码&#xff0c;可做必要修改 -- 创建学生学籍信息表 CREATE TABLE StudentEnrollment (-- 学号&#xff0c;作为主键student_id VARCHAR(8) NOT NULL,-- 学生姓名stu_name VARCHAR(8) NOT NULL,-- 学生性别gender VARC…

计算机网络之传输层协议TCP

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 计算机网络之传输层协议TCP 收录于专栏【计算机网络】 本专栏旨在分享学习计算机网络的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目…