RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /**
     * Wait until all messages published since the last call have been
     * either ack'd or nack'd by the broker.  Note, when called on a
     * non-Confirm channel, waitForConfirms throws an IllegalStateException.
     * @return whether all the messages were ack'd (and none were nack'd)
     * @throws java.lang.IllegalStateException
     */
    boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){
    channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列
        List<String> messages = new ArrayList<>();
        for (int i = 1; i < 20000 ;  i++){
            String msg = String.valueOf(i);
            messages.add(msg);
            channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
            // 每发送十条消息进行一次确认
            if(i > 0 && i % 10 == 0 ){
                // 如果确认不通过则将消息重新发送
                if(!channel.waitForConfirms()){
                    for (String e : messages) {
                        channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());
                    }
                }else{
                    // 如果确认成功则将这些消息从未应答队列中移除
                    messages.clear();
                }
            }
        }

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {
  void handleAck(long deliveryTag, boolean multiple)
      throws IOException;

  void handleNack(long deliveryTag, boolean multiple)
      throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体
        HashMap<Long,String> msgMap = new HashMap<>();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                msgMap.remove(deliveryTag);
            }

            /**
             * 如果消息应带结果为nack则重新发送该消息
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                String msg = msgMap.get(deliveryTag);
                if(msg != null){
                    channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
                }
            }
        });
        for (int i = 1; i < 20000 ;  i++){
            String msg = String.valueOf(i);
            // 将消息序号和消息存储map中
            msgMap.put(channel.getNextPublishSeqNo(),msg);
            channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        }

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/66754.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vuejs 设计与实现 - 双端diff算法

我们介绍了简单 Diff 算法的实现原理。简单 Diff 算法利用虚拟节点的 key 属性&#xff0c;尽可能地复用 DOM元素&#xff0c;并通过移动 DOM的方式来完成更新&#xff0c;从而减少不断地创建和销毁 DOM 元素带来的性能开销。但是&#xff0c;简单 Diff 算法仍然存在很多缺陷&a…

并发三大特性和JMM

一、并发三大特性 1、原子性 一个或多个操作&#xff0c;要么全部执行且在执行过程中不被任何因素打断&#xff0c;要么全部不执行。在Java中&#xff0c;对基本数据类型的读取和赋值操作是原子性操作&#xff08;64位处理器&#xff09;。不采取任何的原子性保障措施的自增操…

c++11 标准模板(STL)(std::basic_fstream)(三)

定义于头文件 <fstream> template< class CharT, class Traits std::char_traits<CharT> > class basic_fstream : public std::basic_iostream<CharT, Traits> 类模板 basic_fstream 实现基于文件的流上的高层输入/输出。它将 std::basic_i…

Cadvisor+InfluxDB+Grafan+Prometheus(详解)

目录 一、CadvisorInfluxDBGrafan案例概述 &#xff08;一&#xff09;Cadvisor Cadvisor 产品特点&#xff1a; &#xff08;二&#xff09;InfluxDB InfluxDB应用场景&#xff1a; InfluxDB主要功能&#xff1a; InfluxDB主要特点&#xff1a; &#xff08;三&#…

MyCat配置文件schema.xml讲解

1.MyCat配置 1.1 schema标签 如果checkSQLschema配置的为false&#xff0c;那么执行DB01.TB_ORDER时就会报错&#xff0c;必须用use切换逻辑库以后才能进行查询。 sqlMaxLimit如果未指定limit进行查询&#xff0c;列表查询模式默认为100,最多只查询100条。因为用mycat后默认数…

linux自定义网络访问规则

1.更改防火墙默认区域为trusted firewall-cmd --set-default-zonetrusted 2.新建一个zone&#xff0c;将想要访问本机80端口的ip&#xff0c;如&#xff1a;192.168.3.99 &#xff0c;添加的这个zone中&#xff0c;同时在这个zone中放行80端口。 firewall-cmd --permanent --ne…

SEO搜索引擎优化

目录 场景 内部业务To B (Business-to-Business&#xff0c;B2B)需要降低SEO&#xff0c;反爬 客户业务To C (Business-to-Consumer&#xff0c;B2C)需要提高SEO TDK优化 Title&#xff08;标题&#xff09; Description&#xff08;描述&#xff09; Keywords&#xff…

windows 安装免费3用户ccproxy ubuntu 代理上网

Windows 上进行安装 ubuntu 上进行设置 方法一 (临时的手段) 如果仅仅是暂时需要通过http代理使用apt-get&#xff0c;您可以使用这种方式。 在使用apt-get之前&#xff0c;在终端中输入以下命令&#xff08;根据您的实际情况替换yourproxyaddress和proxyport&#xff09;。 终…

布谷鸟配音:一站式配音软件

这是一款智能语音合成软件&#xff0c;可以快速将文字转换成语音&#xff0c;拥有多种真人模拟发音&#xff0c;可以选择不同男声、女声、童声&#xff0c;以及四川话、粤语等中文方言和外语配音&#xff0c;并且可对语速、语调、节奏、数字读法、多音字、背景音等进行全方位设…

初识Container

1. 什么是Container&#xff08;容器&#xff09; 要有Container首先要有Image&#xff0c;也就是说Container是通过image创建的。 Container是在原先的Image之上新加的一层&#xff0c;称作Container layer&#xff0c;这一层是可读可写的&#xff08;Image是只读的&#xff0…

Mybatis-Plus使用updateById()、update()将字段更新为null

文章目录 一、问题背景二、问题原因三、解决方案1. 设置全局的field-strategy2. 对某个字段设置单独的field-strategy3. 使用UpdateWrapper方式更新&#xff08;推荐使用&#xff09; 本文主要介绍了Mybatis-Plus使用updateById()、update()将字段更新为null&#xff0c;文中通…

Redis 6.5 服务端开启多线程源码

redis支持开启多线程&#xff0c;只有从socket到读取缓冲区和从输出缓冲区到socket这两段过程是多线程&#xff0c;而命令的执行还是单线程&#xff0c;并且是由主线程执行 借鉴&#xff1a;【Redis】事件驱动框架源码分析&#xff08;多线程&#xff09; 一、main启动时初始化…

freeswitch的mod_xml_curl模块动态获取dialplan

概述 freeswitch是一款简单好用的VOIP开源软交换平台。 mod_xml_curl模块支持从web服务获取xml配置&#xff0c;本文介绍如何动态获取dialplan配置。 环境 centos&#xff1a;CentOS release 7.0 (Final)或以上版本 freeswitch&#xff1a;v1.6.20 GCC&#xff1a;4.8.5…

HTTPS安全通信

HTTPS,TLS/SSL Hyper Text Transfer Protocol over Secure Socket Layer,安全的超文本传输协议,网景公式设计了SSL(Secure Sockets Layer)协议用于对Http协议传输的数据进行加密,保证会话过程中的安全性。 使用TCP端口默认为443 TLS:(Transport Layer Security,传输层…

View绘制流程-Window创建

前言&#xff1a; View绘制流程中&#xff0c;主要流程是这样的&#xff1a; 1.用户进入页面&#xff0c;首先创建和绑定Window&#xff1b; 2.首次创建以及后续vsync信号来临时&#xff0c;会请求执行刷新流程&#xff1b; 3.刷新流程完成后&#xff0c;会通知SurfaceFlin…

(力扣)用两个栈实现队列

这里是栈的源代码&#xff1a;栈和队列的实现 当然&#xff0c;自己也可以写一个栈来用&#xff0c;对题目来说不影响&#xff0c;只要符合栈的特点就行。 题目&#xff1a; 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、pe…

【HDFS】每天一个RPC系列----complete(二):客户端侧

上图给出了最终会调用到complete RPC的客户端侧方法链路(除去Router那条线了)。 org.apache.hadoop.hdfs.DFSOutputStream#completeFile(org.apache.hadoop.hdfs.protocol.ExtendedBlock): 下面这个方法在complete rpc返回true之前,会进行重试,直到超过最大重试次数抛异…

深度优先搜索与动态规划|543, 124, 687

深度优先搜索与动态规划|543. 二叉树的直径&#xff0c;124. 二叉树中的最大路径和&#xff0c;687. 最长同值路径 二叉树的直径二叉树中的最大路径和最长同值路径 二叉树的直径 好久没写二叉树了&#xff0c;主要还是看遍历的顺序是什么样的。 # Definition for a binary tr…

代码随想录算法训练营之JAVA|第二十五天| 491. 递增子序列

今天是第25天刷leetcode&#xff0c;立个flag&#xff0c;打卡60天。 算法挑战链接 491. 递增子序列https://leetcode.cn/problems/non-decreasing-subsequences/ 第一想法 题目理解&#xff1a;在给定的一个数组中&#xff0c;找出全部的递增列表。要求不能有重复。 这是一…

【mars3d - 报错】使用mars3d加载时的一些报错和不生效问题

在使用过程中遇到过很多报错&#xff0c;不管大的还是小的&#xff0c;在这里总结下&#xff0c;应该会持续更新&#xff1b; 1、设置贴地之后报错 可能是因为 arcType&#xff1a;Cesium.arcType.NONE 与 clampToGround&#xff1a;true 是相互冲突的&#xff0c;两个都设置就…