RabbitMQ基于Java实现消息应答

RabbitMQ

概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据、|

四大核心概念

生产者

产生数据发送消息的程序时生产者

交换机

交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

队列

队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。 许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

工作原理

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker 一个交换机(Exchange)对应多个队列(Queue) 每一个生产者(Producer)与RabbitMQ Server建立连接,每一个连接(Connection)有多个信道(channel),在连接中通过信道发送信息给Broker。

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection ,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效力也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级 Connection 极大减少了操作系统建立TCP connection的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct(point-to-point 点到点),topic(publish-subscribe)和 fanout(multicast)

Virrual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ提供的服务时,可以划分出多个 virtual host(简称vhost),每个用户在自己的vhost创建 exchange/queue等

多租户:一个Broker中可以有多个用户,一个用户有自己的Exchange

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中 可以包含routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

Producer 发出消息 -> Connection(Channel) -> Broker -> Exchange -> 匹配查询表中的 routing key -> 分发到对应的queue中 -> Connection(Channel) -> 给对应的Consumer

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

Java实现

1.导入依赖

2.生产者代码

3.消费者代码

工作队列

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。(异步处理

一个消息只能被处理一次,不可以被处理多次。

轮询分发消息

工作队列轮流接收消息

消息应答

概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理了一个长的任务并仅完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为消费者无法接收。

为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者端出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制。当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用于在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

消息在手动应答时是不丢失的,失败将会放回队列中重新消费。

方法
  • Channel.basicAck(DeliverTag deliverTag, multiple)(用于肯定确认)

    RabbitMQ 已知到该消息并且成功的处理消息,可以将其丢弃了

  • Channel.basicNack(CancelTag cancelTag , multiple)(用于否定确认)

  • Channel.basicReject(CancelTag cancelTag )(用于否定确认)

    不处理该消息了,直接拒绝,将消息丢弃

Multipart

手动应答的好处是可以批量应答并且减少网络拥堵

multiple的true和false 代表不同意思

true代表批量应答 channel 上未应答的消息

比如说 channel 上有传送 tag 的消息 5,6,7,8,当前 tag 是 8,那么此时5-8这些还未应答的消息将会被确认到消息应答

true 会应答前面未确认的消息

false同上面相比

只会应答 tag=8 的消息,5,6,7这三个消息依然不会被确认收到消息应答

false 只会应答当前成功处理的消息

消息确认

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认成功发布后,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能以及足够了。

// 单个确认
    public static void publishMessageIndividual()throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        // 队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开启时间
        long begin = System.currentTimeMillis();
​
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // 单个消息就马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("消息发送成功");
            }else {
                System.out.println("发布失败");
            }
        }
        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条单独确认的消息,耗时:" + (end - begin) + "ms");
    }

批量确认发布

单个确认发布方式非常慢,与其相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

// 批量发布确认
    public static void publicMessageBatch() throws Exception{
        // 获取信道
        Channel channel = RabbitMQUtils.getChannel();
        // 创建队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
​
        // 批量确认消息的大小
        int basicSize = 100;
​
        // 未确认消息个数
​
        // 发布消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            if (i % 100 == 0){
                channel.waitForConfirms();
            }
        }
        // 最后确认发布
        channel.waitForConfirms();
​
        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条批量确认的消息,耗时:" + (end - begin) + "ms");
    }
异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

如何处理异步未确认消息

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如所用 ConcurrentLinkedQueue 这个队列在从firm callbacks 与发布线程之间的消息传递。

// 异步发布确认
    public static void publishMessageAsync() throws Exception {
        // 创建信道
        Channel channel = RabbitMQUtils.getChannel();
        // 创建队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
​
        /**
         * 线程安全有序的一个哈希表 适用于高并发的情况下
         * 1. 轻松的将序号与消息进行管理那
         * 2. 轻松批量删除条目 只需要序号
         * 3. 支持高并发
         */
        ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>();
​
        // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了
        // 消息确认成功 回调函数
        /**
         * 1、 deliveryTag 消息的标记 默认从1开始
         * 2、 multiple 是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            // 删除已经确认的消息 剩下的就是未确认的消息
            if (multiple){
                // 批量发消息 批量删除
                ConcurrentNavigableMap<Long, Object> confirmed =
                        outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息" + deliveryTag);
        };
        // 消息确认失败 回调函数
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            // 有哪些未确认的消息
            String message = (String) outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是" + message);
            System.out.println("未确认的消息" + deliveryTag);
        };
        /**
         * 1、监听哪些消息成功了
         * 2、监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);
​
        // 开始时间
        long begin = System.currentTimeMillis();
        // 批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            // 在此处记录下所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条异步发布确认的消息,耗时:" + (end - begin) + "ms");
    }
总结

单独发布消息:

同步等待确认,简单,但吞吐量非常有限

批量发布消息:

批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难知道具体是哪条消息出现了问题

异步处理:

最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现较复杂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/518024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Golang Gin框架

1、这篇文章我们简要讨论一些Gin框架 主要是给大家一个基本概念 1、Gin主要是分为路由和中间件部分。 Gin底层使用的是net/http的逻辑&#xff0c;net/http主要是说&#xff0c;当来一个网络请求时&#xff0c;go func开启另一个协程去处理后续(类似epoll)。 然后主协程持续…

备战蓝桥杯---递归与DFS刷题2

1. 数据范围允许直接暴力把所有组合都写一遍&#xff0c;我们用Pair来存&#xff0c;在sort中分式比较只要把自己的分子与对方的分母乘比较即可&#xff0c;下面介绍一下st树的写法&#xff0c;具体原理就不说了&#xff0c;它是先[0/1,1/1]然后取分子分母的平均化成两个区间&a…

【C++】学习多态原理

目录 一、虚函数表二、多态原理三、关于动态绑定与静态绑定 一、虚函数表 先来看一段代码&#xff1a;sizeof(Base)是多少&#xff1f; class Base { public:virtual void Func1(){cout << "Func1()" << endl;} private:int _b 1; };int main() {cout…

【Linux】make 工具和 Makefile 文件的引入

前面提到了 gcc 编译器&#xff0c;那么使用 gcc 编译器肯定就会接触到 Makefile 。当源码文件比较多的时候就不适合通过直接输入 gcc 命令来编译&#xff0c;这时候就需要一个自动化的编译工具 make 。 举例&#xff1a;通过键盘输入两个整形数字&#xff0c;然后计算他们的和…

elasticSearch原理浅尝

终于等到你 马上就要放弃 开个玩笑 &#xff0c;进入正题 on fire 基础的咱不说了&#xff0c;一搜一麻袋 读 全文检索&#xff1a; 协调节点广播查询请求到相关分片 并 将其响应 整合 全局排序 返回结果集合 带路由&#xff1a;具体文档 shard hash(document_id) % (…

国外服务器托管需要了解哪些信息

国外服务器托管服务提供了一种在国外租用并管理服务器的方式&#xff0c;适用于需要特定地域服务或对本地法规有特殊要求的企业和个人。那么想要进行国外服务器托管需要了解哪些信息呢?Rak部落小编为您整理发布国外服务器托管相关内容。 以下是一些关于国外服务器托管服务的详…

YoloV8改进策略:BackBone改进|ELA

文章目录 摘要1、引言2、相关工作3、方法3.1、重新审视坐标注意力3.1.1、坐标注意力3.1.2、坐标注意力的不足 3.2、高效局部注意力3.3、多个ELA版本设置3.4、可视化3.5、实现 4、实验4.1、实验细节4.2、ImageNet上的图像分类4.3、目标检测4.4、语义分割 5、结论 摘要 https://…

如何搭建自己的百度网盘目录树搜索系统

许多做虚拟资源的小伙伴都有好几个百度网盘账号&#xff0c;大部分也都是扩容盘&#xff0c;但是扩容盘不能搜索&#xff0c;这个就很难受&#xff0c;不能让用户搜索自己的资源&#xff0c;这无意是是对成交概率是致命的&#xff0c;几百万条的数据&#xff0c;不能让用户一个…

【Java】jdk1.8 Java代理模式,Jdk动态代理讲解(非常详细,附带class文件)

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 一、什么是代理模式 想要学代理模式&#xff0c;我们就要先弄清一个概念“什么是代理”&#xff1f; 在我们的现实生活中&#xff0c;你或许不少听过关于代理的名词&#xff0c;如&#xff1a;代理商。那什么又叫做代理…

Linux课程____LVM(逻辑卷管理器)

LVM 技术是在硬盘分区和文件系统之间添加了一个逻辑层&#xff0c;它提供了一个抽象的卷组&#xff0c;可以把多块硬盘进行卷组合并。 这样一来&#xff0c;用户不必关心物理硬盘设备的底层架构和布局&#xff0c;就可以实现对硬盘分区的动态调整。 动态调整磁盘容量&#xff…

设计模式——工厂模式01

工厂模式 定义&#xff1a;工厂模式是创建子类实例化对象的一种方式&#xff0c;屏蔽了创造工厂的内部细节。把创建对象与使用对象进行拆分&#xff0c;满足单一职责。如果需要向工厂中添加新商品&#xff0c; 只需要扩展子类再重写其工厂方法&#xff0c;满足开闭原则。 设计…

数据分析(三)线性回归模型实现

1. 惩罚线性回归模型概述 线性回归在实际应用时需要对普通最小二乘法进行一些修改。普通最小二乘法只在训练数据上最小化错误&#xff0c;难以顾及所有数据。 惩罚线性回归方法是一族用于克服最小二乘法&#xff08; OLS&#xff09;过拟合问题的方法。岭回归是惩罚线性回归的…

新型智慧城市大数据解决方案(附下载)

随着云计算、大数据、移动互联网等技术的发展&#xff0c;由城市运行产生的交通、环境、市政、商业等各领域数据量巨大&#xff0c;这些数据经过合理的分析挖掘可产生大量传统数据不能反映的城市运行信息&#xff0c;已成为智慧城市的重要资产。 在大数据时代&#xff0c;数据信…

Unity入门

Unity入门学习 知识概述&#xff1a; Unity环境搭建 1.Unity引擎是什么 2.软件下载安装 下载最新的长期支持版即可 3.新工程和工程文件夹 Unity界面基础 1.Scene场景和Hierachy层级窗口 练习&#xff1a; 2.Game游戏和Project工程 3.Inspector检查和Console控制台 练习&#…

【快速上手ESP32(基于ESP-IDFVSCode)】03-定时器

ESP32中的通用定时器 通用定时器是 ESP32 定时器组外设的驱动程序。ESP32 硬件定时器分辨率高&#xff0c;具有灵活的报警功能。定时器内部计数器达到特定目标数值的行为被称为定时器报警。定时器报警时将调用用户注册的不同定时器回调函数。 在ESP32-S3中&#xff0c;一共有…

HTML:框架

案例&#xff1a; <frameset cols"5%,*" ><frame src"left_frame.html"><frame src"right_frame.html"> </frameset> 一、<frameset>标签 <frameset>标签&#xff1a;称为框架标记&#xff0c;将一个HTML…

全网最强JavaWeb笔记 | 万字长文爆肝JavaWeb开发——day06_数据库-MySQL-02

万字长文爆肝黑马程序员2023最新版JavaWeb教程。这套教程打破常规&#xff0c;不再局限于过时的老套JavaWeb技术&#xff0c;而是与时俱进&#xff0c;运用的都是企业中流行的前沿技术。笔者认真跟着这个教程&#xff0c;再一次认真学习一遍JavaWeb教程&#xff0c;温故而知新&…

vue-cli打包 nodejs内存溢出 vue2.x Last few GCs

遇到这种情况百度各种博客&#xff0c;什么改package.json里的配置&#xff0c;什么安装increase-memory-limit &#xff0c;都尝试了并没什么用处&#xff0c;最后解决方案为执行下方名单&#xff0c;再次打包就成功了&#xff1a; export NODE_OPTIONS--max_old_space_size4…

spring事务那些事

实际工作中还会面临千奇百怪的问题&#xff0c;看下面返个例子&#xff08;注意MySql数据库测试&#xff09;&#xff1a; //1.hello1Service 调用 hello2Service Transactional(propagation Propagation.REQUIRED,rollbackFor Exception.class) public void doUpdate() {//…

重建大师地物实体shp该怎样获取?(如下图)

问题如图 一般是基于自己的模型去提取的&#xff0c;可以使用地物外轮廓功能生成&#xff0c;或者这边有DLG也可以实现。同时&#xff0c;用重建大师可以提取地物外轮廓。 重建大师是一款专为超大规模实景三维数据生产而设计的集群并行处理软件&#xff0c;输入倾斜照片&#x…