消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

4、Work Queues

Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

在这里插入图片描述

1、抽取工具类

在这里插入图片描述

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("42.192.149.23");
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
2、启动两个工作线程来接受消息
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


public class Worker01 {

    //定义队列
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        //利用工具类创建信道
        Channel channel = RabbitMqUtils.getChannel();

        //消息接受,实现接口函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
        };
        //消息被取消
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");

        };

        //消费者消费
        System.out.println("C1 消费者启动等待消费.................. ");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}

(1)开启多线程

在这里插入图片描述

选中 Allow multiple instances:

在这里插入图片描述

(2)启动后

在这里插入图片描述

3、启动一个发送消息线程

在这里插入图片描述

public class Task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {  //hasNext,有下一个,就发送
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  //生产者发送消息,理解各参数意思
            System.out.println("消息发送完成:" + message);
        }

    }
}

启动后

在这里插入图片描述

C1、C2,轮询接受到消息

在这里插入图片描述

5、消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。

**RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。**在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

1、自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

2、手动消息应答的方法

Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。

Channel.basicNack (用于否定确认)

Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

Multiple 的解释:

手动应答的好处是可以批量应答并且减少网络拥堵 。

true 代表批量应答 channel 上未应答的消息

false 同上面相比只会应答 tag=8 的消息

3、消息自动重新入队

如果消费者由于某些原因失去连接 (其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

4、消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

在这里插入图片描述

(1)消息生产者

在这里插入图片描述

import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * 消息生产者,消息在手动应答时是不丢失的,放回队列重新消费。
 *
 * 
 */
public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            //发布消息
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息" + message);
        }
    }

}

(2)睡眠工具类

在这里插入图片描述

(3)消费者 01

在这里插入图片描述

在这里插入图片描述

(4)消费者02:把睡眠时间改成 30 秒

在这里插入图片描述

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


public class Work03 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息处理时间较 短");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到消息:" + message);
            /**
             * 1.消息标记 tag
             * 2.是否批量应答未应答消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = (s) -> {
            System.out.println(s + "消费者取消消费接口回调逻辑");
        };

        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);


    }
}

启动后:

在这里插入图片描述

中途停掉消费者02

在这里插入图片描述

消息由消费者01接收

在这里插入图片描述
在这里插入图片描述

理解:在生产者轮询发送消息:cc和dd,给消费者01和消费者02的时候,消费者01处理的消息的时间较短,所以立马就接受到了,但是消费者02处理消息的时间较长,需要30s。如果在消费者02处理消息的时候,将消费者02关闭,那么生产者发送的消息dd,虽然消费者02 无法接收了,但是也不会中途丢失,消息dd会重新返回到队列中,然后再从队列中,发送给消费者01。这就是RabbitMQ的消息应答机制。

6、RabbitMQ 持久化(3步走)

当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

1、队列如何实现持久化(第1步)

之前创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为持久化。

//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

(1)直接修改声明会报错

在这里插入图片描述

(2)在RabbitMQ的Web界面,手动删除队列

在这里插入图片描述

在这里插入图片描述

(3)将队列持久化

在这里插入图片描述

2、消息持久化(第2步)

消息实现持久化需要在消息生产者修改代码

MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。

在这里插入图片描述

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

7、不公平分发(能者多劳)

问题

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1)

需要看一下basicQos的源码,参数设置的要求

//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

测试效果:

生产者连续发多几条消息

在这里插入图片描述

让处理消息速度快的消费者01,多接收一点消息

在这里插入图片描述

消费者02接受的少

时间长,所以少

理解:意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳) 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/398651.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【ArcGIS微课1000例】0105:三维模型转体模型(导入sketchup转多面体为例)

文章目录 一、实验概述二、三维模型转多面体三、加载多面体数据四、注意事项一、实验概述 ArcGIS可以借助【导入3D文件】工具支持主流的三维模型导入。支持 3D Studio Max (.3ds)、VRML and GeoVRML 2.0 (.wrl)、SketchUp 6.0 (.skp)、OpenFlight 15.8 (.flt)、Collaborative …

docker (八)-dockerfile制作镜像

一 dockerfile dockerfile通常包含以下几个常用命令: FROM ubuntu:18.04 WORKDIR /app COPY . . RUN make . CMD python app.py EXPOSE 80 FROM 打包使用的基础镜像WORKDIR 相当于cd命令,进入工作目录COPY 将宿主机的文件复制到容器内RUN 打包时执…

挑战杯 基于LSTM的天气预测 - 时间序列预测

0 前言 🔥 优质竞赛项目系列,今天要分享的是 机器学习大数据分析项目 该项目较为新颖,适合作为竞赛课题方向,学长非常推荐! 🧿 更多资料, 项目分享: https://gitee.com/dancheng-senior/po…

【LeetCode】递归精选8题——基础递归、链表递归

目录 基础递归问题: 1. 斐波那契数(简单) 1.1 递归求解 1.2 迭代求解 2. 爬楼梯(简单) 2.1 递归求解 2.2 迭代求解 3. 汉诺塔问题(简单) 3.1 递归求解 4. Pow(x, n)(中等&…

【linux】查看openssl程序的安装情况

【linux】查看openssl程序的安装情况 1、查看安装包信息 $ rpm -qa |grep openssl 2、安装路径 $ rpm -ql openssl $ rpm -ql openssl-libs $ rpm -ql openssl-devel 3、相关文件和目录 /usr/bin/openssl /usr/include/openssl /usr/lib64/libssl.so.* /usr/lib64/libcrypto…

直接查看电脑几核芯几线程的方法

之前查看电脑几核芯几线程时都是点击 此电脑->属性->设备管理器->处理器 但是这样并不能判断是否有多线程 譬如这里,是2核芯2线程还是4核芯? 实际上,打开任务管理器后点击性能查看核芯线程数即可 所以示例这台电脑是4核芯而不是2…

视频生成模型:构建虚拟世界的模拟器 [译]

原文:Video generation models as world simulators 我们致力于在视频数据上开展生成模型的大规模训练。具体来说,我们针对不同时长、分辨率和宽高比的视频及图像,联合训练了基于文本条件的扩散模型。我们采用了一种 Transformer 架构&#…

多维时序 | Matlab实现BiLSTM-MATT双向长短期记忆神经网络融合多头注意力多变量时间序列预测模型

多维时序 | Matlab实现BiLSTM-MATT双向长短期记忆神经网络融合多头注意力多变量时间序列预测模型 目录 多维时序 | Matlab实现BiLSTM-MATT双向长短期记忆神经网络融合多头注意力多变量时间序列预测模型预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.多维时序 | Matlab…

回显服务器

. 写一个应用程序,让这个程序可以使用网络通信,这里就需要调用传输层提供的api,传输层提供协议,主要是两个: UDP,TCP,它们分别提供了一套不同的api,socket api. UDP和TCP UDP:无连接,不可靠传输,面向数据报,全双工 TCP:有连接,可靠传输,面向字节流,全双工 一个客户端可以连接…

仪表板展示|DataEase看中国:历年研究生报考数据分析

背景介绍 在信息时代的浪潮中,研究生教育作为培养高层次专业人才的重要通道,不断吸引着广大毕业生和在职人士的关注。今天我们结合2018年~2024年的研究生报考数据,以数字为镜,深入了解近年来研究生培养态势。 本文将…

Redis篇----第七篇

系列文章目录 文章目录 系列文章目录前言一、Redis 的回收策略(淘汰策略)?二、为什么 edis 需要把所有数据放到内存中?三、Redis 的同步机制了解么?四、Pipeline 有什么好处,为什么要用 pipeline?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍…

机器学习:逻辑回归原理

逻辑回归模型是一种广泛应用于分类问题的统计方法。尽管名为“回归”,但它实际上是一种分类算法,主要用于预测观察对象属于某个类别的概率。逻辑回归模型特别适用于二分类问题,但也可以通过一些策略扩展到多分类问题。 逻辑回归的应用与优化…

gazebo 中使用gmaping 建图

一、使用gmapping 建图 启动roscoreroslaunch wpr_simulation wpb_stage_robocup.launchsudo apt install ros-noetic-gmappingrosrun gmapping slam_gmapping启动rviz (rosrun rviz rviz)。添加RobotModel、LaserScan、Map后 显示如下: 6.rosrun wpr_simulation k…

网络爬虫基础(上)

1. 爬虫的基本原理 爬虫就是在网页上爬行的蜘蛛,每爬到一个节点就能够访问该网页的信息,所以又称为网络蜘蛛; 网络爬虫就是自动化从网页上获取信息、提取信息和保存信息的过程; 2. URL的组成部分 URL全称为Uniform Resource L…

UG NX二次开发(C#)-PMI-获取PMI尺寸数据

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、前言2、在UG NX的三维模型中添加PMI尺寸信息3、采用二次开发获取尺寸数据4、测试结果1、前言 PMI(Product and Manufacturing Information)是产品和制造信息的简称,主要用于将产品部件设计的…

python 与 neo4j 交互(py2neo 使用)

参考自:neo4j的python.py2neo操作入门 官方文档:The Py2neo Handbook — py2neo 2021.1 安装:pip install py2neo -i https://pypi.tuna.tsinghua.edu.cn/simple 1 节点 / 关系 / 属性 / 路径 节点(Node)和关系(relationship)是构成图的基础…

代码随想录算法训练营第二十三天 | 669. 修剪二叉搜索树,108.将有序数组转换为二叉搜索树,538.把二叉搜索树转换为累加树 [二叉树篇]

代码随想录算法训练营第二十三天 LeetCode 669. 修剪二叉搜索树题目描述思路递归参考代码 LeetCode 108.将有序数组转换为二叉搜索树题目描述思路参考代码 LeetCode 538.把二叉搜索树转换为累加树题目描述思路参考代码 LeetCode 669. 修剪二叉搜索树 题目链接:669. …

《Solidity 简易速速上手小册》第9章:DApp 开发与 Solidity 集成(2024 最新版)

文章目录 9.1 DApp 的架构和设计9.1.1 基础知识解析更深入的理解实际操作技巧 9.1.2 重点案例:去中心化社交媒体平台案例 Demo:创建去中心化社交媒体平台案例代码SocialMedia.sol - 智能合约前端界面 测试和验证拓展功能 9.1.3 拓展案例 1:去…

LabVIEW高速信号测量与存储

LabVIEW高速信号测量与存储 介绍了LabVIEW开发的高速信号测量与存储系统,解决实验研究中信号捕获的速度和准确性问题。通过高效的数据处理和存储解决方案,本系统为用户提供了一种快速、可靠的信号测量方法。 项目背景 在科学研究和工业应用中&#xf…

百度RT-DETR :基于视觉变换器的实时物体检测器

概述 实时检测转换器 (RT-DETR) 由百度开发,是一种尖端的端到端物体检测器,可在保持高精度的同时提供实时性能。它利用视觉转换器(ViT)的强大功能,通过解耦尺度内交互和跨尺度融合,高效处理多尺度特征。RT…