RabbitMQ---work消息模型

1、work消息模型

工作队列或者竞争消费者模式
在这里插入图片描述

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:

o P:生产者:任务的发布者
o C1:消费者,领取任务并且完成任务,假设完成速度较快
o C2:消费者2:领取任务并完成任务,假设完成速度慢

面试题:避免消息堆积?

1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。

1.1、生产者

生产者与案例1中的几乎一样:

public class Send {
   private final static String QUEUE_NAME = "test_work_queue";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 循环发布任务
       for (int i = 0; i < 50; i++) {
           // 消息内容
           String message = "task .. " + i;
           channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
       }
       // 关闭通道和连接
       channel.close();
       connection.close();
   }
}

不过这里我们是循环发送50条消息。

1.2、消费者1

// 消费者1
public class Recv {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] received : " + msg + "!");
                try {
                    // 模拟完成任务的耗时:1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

1.3、消费者2

//消费者2
public class Recv2 {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] received : " + msg + "!");
                try {
                    // 模拟完成任务的耗时:200ms
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                }
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
在这里插入图片描述
在这里插入图片描述

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

1.4、能者多劳

• 刚才的实现有问题吗?
o 消费者1比消费者2的效率要低,一次任务的耗时较长
o 然而两人最终消费的消息数量是一样的
o 消费者2大量时间处于空闲状态,消费者1一直忙碌
• 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
• 怎么实现呢?
o 我们可以使用basicQos方法和prefetchCount = 1设置。
o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。
o 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。
o 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
在这里插入图片描述

再次测试:

  ![在这里插入图片描述](https://img-blog.csdnimg.cn/a25bdfcd50bf41f9a6e51076560cd15f.png)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/86584.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

优化指南:带宽限制的可行策略

大家好&#xff01;作为一名专业的爬虫程序员&#xff0c;我们经常面临的一个挑战就是带宽限制。尤其是在需要快速采集大量数据时&#xff0c;带宽限制成为了我们提升爬虫速度的一大阻碍。今天&#xff0c;我将和大家分享一些解决带宽限制的可行策略&#xff0c;希望能帮助大家…

【算法系列篇】二分查找——这还是你所知道的二分查找算法吗?

文章目录 前言什么是二分查找算法1.二分查找1.1 题目要求1.2 做题思路1.3 Java代码实现 2.在排序数组中查找元素的第一个和最后一个位置2.1 题目要求2.2 做题思路2.3 Java代码实现 3.搜索插入位置3.1 题目要求3.2 做题思路3.3 Java代码实现 4.x的平方根4.1 题目要求4.2 做题思路…

element上传图片,调取接口传值,参数FormData为空

需求 输入完reason&#xff0c;选完文件后&#xff0c;点击提交按钮后 调取接口。 遇到的问题 上传文件orderFile 字段一直为空 打印了发现&#xff0c;上传文件也是有值得。但是传到接口中就为空 原因 json里边不能放file&#xff0c;但是formData里可以放 file 也可以放…

论文阅读——Imperceptible Adversarial Attack via Invertible Neural Networks

Imperceptible Adversarial Attack via Invertible Neural Networks 作者&#xff1a;Zihan Chen, Ziyue Wang, Junjie Huang*, Wentao Zhao, Xiao Liu, Dejian Guan 解决的问题&#xff1a;虽然视觉不可感知性是对抗性示例的理想特性&#xff0c;但传统的对抗性攻击仍然会产…

汽配企业MES管理系统如何追溯产品质量问题

随着汽车行业的快速发展&#xff0c;汽配行业也面临着越来越严格的质量要求。为了满足客户需求并提高产品质量&#xff0c;汽配企业需要实现生产过程的可追溯性。MES管理系统解决方案作为生产过程的核心管理系统&#xff0c;可以通过记录生产数据和流程&#xff0c;实现产品质量…

pdf转word最简单方法~

pdf转word最简单方法&#xff01;pdf转word最简单方法我们都知道&#xff0c;PDF文件是一种只读文件格式&#xff0c;无法按照需求对PDF文件进行更改与编辑&#xff0c;从而影响到了PDF文件的使用。所以&#xff0c;我们需要将PDF文件转换为word文档&#xff0c;以此来保证文件…

Linux系统之安装OneNav个人书签管理器

Linux系统之安装OneNav个人书签管理器 一、OneNav介绍1.OneNav简介2.OneNav特点 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、检查本地环境3.1 检查本地操作系统版本3.2 检查系统内核版本3.3 检查本地yum仓库状态 四、安装httpd服务4.1 安装httpd4.2 启动httpd服务4…

Error: Flash Download failed - “Cortex-M7“

选择对应FLM文件加上即可。 具体可参考&#xff1a; https://www.sunev.cn/embedded/669.html https://zhuanlan.zhihu.com/p/487664063

Linux socket网络编程

一、主机字节序列和网络字节序列 主机字节序列分为大端字节序列和小端字节序列&#xff0c;不同的主机采用的字节序列可能不同。大端字节序列是指一个整数的高位字节存储在内存的低地址处&#xff0c;低位字节存储在内存的高地址处。小端字节序列是指整数的高位字节存储在内存…

8个值得一看的网页设计工具,不再死敲代码!

之前&#xff0c;如果想完成网页制作&#xff0c;往往需要设计师具有一定的编程基础&#xff0c;而随着新型网页制作工具的出现&#xff0c;不仅降低了网页制作的门槛&#xff0c;也减轻了设计师的工作负担。今天本文整理了8个好用的网页制作工具&#xff0c;一起来看看吧&…

炫我为北京轻工技师学院提供渲染私有云系统解决方案

北京轻工技师学院作始建于1964年&#xff0c;是国家级重点学校。学院开设有计算机动画制作、计算机网络应用、电气自动化设备安装与维修、电子技术应用、工业机器人应用与维护等16个专业&#xff0c;本次项目的交付实施涉及该学院的一个重要项目。 ▲北京轻工技师学院 图源网…

创建R包-2.1:在RStudio中使用Rcpp制作R-Package(更新于2023.8.23)

目录 0-前言 1-在RStudio中创建R包项目 2-创建R包 2.1通过R函数创建新包 2.2在RStudio通过菜单来创建一个新包 2.3关于R包创建的说明 3-添加R自定义函数 4-添加C函数 0-前言 目标&#xff1a;在RStudio中创建一个R包&#xff0c;这个R包中包含C函数&#xff0c;接口是Rc…

【李群李代数】李群控制器(lie-group-controllers)介绍——控制 SO(3) 空间中的系统的比例控制器Demo...

李群控制器SO(3)测试 测试代码是一个用于控制 SO(3) 空间中的系统的比例控制器。它通过计算控制策略来使当前状态逼近期望状态。该控制器使用比例增益 kp 进行参数化&#xff0c;然后进行一系列迭代以更新系统状态&#xff0c;最终检查状态误差是否小于给定的阈值。这个控制器用…

Microsoft Message Queuing Remote Code Execution Vulnerability

近期官方公布了一个MSMQ的远程代码执行漏洞&#xff0c;可能因为网络安全设备的更新&#xff0c;影响业务&#xff0c;值得大家关注。 Microsoft Message Queuing 概述 MicroSoft Message Queuing&#xff08;微软消息队列)是在多个不同的应用之间实现相互通信的一种异步传输…

Wireshark数据抓包分析之HTTP协议

一、实验目的&#xff1a; 主要时熟悉wireshark的使用 二、预备知识&#xff1a; HTTP协议的相关知识 what fk&#xff0c;原来只要在右页点击切换&#xff0c;就可以开启2台不同的机器欸&#xff01;nice 三、实验过程&#xff1a; 1.在机器1中通过管理员身份运行hfs之后&a…

基于LSTM深度学习网络的时间序列分析matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 % 随机打乱数据集并划分训练集和测试集 index_list randperm(size(wdata, 1)); ind …

智慧水务在供水行业的应用场景

什么是“智慧水务” 智慧水务指利用物联网、智能传感、云计算、大数据、人工智能等技术对供水、排水、节水、污水 处理、防洪等水务环节进行智慧化管理。智慧水务通过结合传感器、通信网络、水务信息系统提升水务信息化水平&#xff0c;实现水务管理协同化、水资源利用高效化、…

C语言:指针(超深度讲解)

目录 指针&#xff1a; 学习目标&#xff1a; 指针可以理解为&#xff1a; 字符指针&#xff1a; 定义&#xff1a;字符指针 char*。 字符指针的使用&#xff1a; 练习&#xff1a; 指针数组&#xff1a; 概念&#xff1a;指针数组是一个存放指针的数组。 实现模拟二维…

Linux 虚拟机Ubuntu22.04版本通过远程连接连接不上,输入ifconfig只能看到127.0.0.1的解决办法

之前给虚拟机配置静态IP之后&#xff0c;可以直接通过主机Vscode远程连接。但是前一段时间把主机的TCP/IPV4静态IP设置了一下之后&#xff0c;再连接虚拟机就连不上了&#xff0c;于是参考解决虚拟机不能上网ifconfig只显示127.0.0.1的问题&#xff0c;又可以连接上了&#xff…

Centos7查看磁盘和CUP统计信息iostat命令

Centos7查看磁盘和CUP统计信息iostat命令 Centos7内存高|查看占用内存命令 docker实战(一):centos7 yum安装docker docker实战(二):基础命令篇 docker实战(三):docker网络模式(超详细) docker实战(四):docker架构原理 docker实战(五):docker镜像及仓库配置 docker实战(六…