RabbitMQ教程:工作队列(Work Queues)(二)

RabbitMQ教程:工作队列(Work Queues)(二)

一、引言

在快节奏的软件开发世界中,我们经常面临需要异步处理任务的场景,比如在Web应用中处理耗时的图片处理或数据分析任务。这些任务如果直接在用户的HTTP请求中同步处理,会导致用户体验不佳,因为用户需要等待任务完成才能继续。这时,工作队列(Work Queues)就显得尤为重要。工作队列允许我们将任务排队,然后在后台异步处理,这样可以释放Web服务器来处理更多的用户请求,提高应用的响应速度和吞吐量。

在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现工作队列。我们将创建一个模拟的任务分发系统,它能够在多个工作进程之间分配任务,确保任务的均衡执行和持久存储,即使在RabbitMQ服务器重启的情况下也不会丢失任务。
在这里插入图片描述

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ发送和接收消息。今天,我们将探索工作队列(Work Queues),这是一种在多个工作进程(workers)之间分配耗时任务的机制。工作队列也被称为任务队列(Task Queues),它的核心思想是避免立即执行资源密集型任务,而是将任务安排到以后执行。通过这种方式,我们可以将任务封装成消息并发送到队列中,然后由后台运行的工作进程来处理这些任务。

三、准备工作

3.1 说明

在之前的教程中,我们发送了包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。由于我们没有实际的任务(比如需要调整大小的图片或需要渲染的PDF文件),我们将使用Task.Delay()函数来模拟工作负载。

3.2 生成项目

首先,我们需要生成两个项目(也可直接vs创建):

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送任务(NewTask),另一个用于接收并处理任务(Worker)。

四、实战

4.1 修改发送程序

我们将更新NewTask程序,以便从命令行发送任意消息。这个程序将任务安排到我们的工作队列中,因此我们将其命名为NewTask

using RabbitMQ.Client;
using System.Text;

await PublishMessagesAsync(20);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{
    // 循环发送指定次数的消息
    for (int i = 1; i <= loopCount; i++)
    {
        // 调用SendMessageToQueue方法发送消息,并包含当前迭代次数
        await SendMessageToQueue($"Iteration {i} - Hello World");
        // 这里可以添加延迟,如果需要的话
        // await Task.Delay(1000);
    }
    Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{
    // 创建连接工厂,并设置RabbitMQ服务器地址为localhost
    var factory = new ConnectionFactory { HostName = "localhost" };
    // 使用异步方式创建连接
    using var connection = await factory.CreateConnectionAsync();
    // 使用异步方式创建通道
    using var channel = await connection.CreateChannelAsync();

    // 异步声明名为"task_queue"的持久队列
    await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
        autoDelete: false, arguments: null);

    // 将消息内容编码为字节数组
    var body = Encoding.UTF8.GetBytes(message);

    // 创建消息属性,并设置为持久化
    var properties = new BasicProperties
    {
        Persistent = true
    };

    // 异步发布消息到队列
    await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,
        basicProperties: properties, body: body);
    // 打印消息发送确认信息
    Console.WriteLine($" [x] Sent {message}");
}

4.2 修改接收程序

我们的旧Receive.cs脚本也需要修改,以便模拟消息体中每个点号一秒的工作量。以下是修改后的代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 异步声明名为"task_queue"的持久队列
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
    autoDelete: false, arguments: null);

// 设置QoS参数,确保每次只有一个消息被分发给同一个消费者
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);

// 设置接收到消息时的事件处理程序
consumer.ReceivedAsync += async (model, ea) =>
{
    // 获取消息体并转换为字符串
    byte[] body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");

    // 模拟工作负载,延时1.5秒
    await Task.Delay(1500);

    // 手动确认消息,确保消息被正确处理
    await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};

// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"task_queue"队列中的消息
await channel.BasicConsumeAsync("task_queue", // 队列名称
    autoAck: false, // 是否自动确认消息,默认为false,需要手动确认
    consumer: consumer); // 指定消费者对象,用于接收消息
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

在这里插入图片描述

4.3 消息确认(Message Acknowledgment)

在处理任务时,可能会出现消费者处理任务时崩溃的情况。如果消费者在处理任务时崩溃,RabbitMQ会立即将消息标记为已删除,这样我们就会丢失正在处理的消息以及所有未处理的消息。

为了确保消息不会丢失,RabbitMQ支持消息确认(acknowledgment)。消费者会向RabbitMQ发送确认,告知它特定的消息已经被接收和处理,RabbitMQ可以安全地删除该消息。

如果消费者在未发送确认的情况下崩溃(例如,通道关闭、连接关闭或TCP连接丢失),RabbitMQ会将该消息重新排队。如果有其他消费者在线,RabbitMQ会迅速将该消息重新分发给其他消费者。这样,我们可以确保即使工作进程偶尔崩溃,也不会丢失任何消息。

默认情况下,消费者的确认超时时间为30分钟。这有助于检测未确认的消费者。如果需要,可以根据需要增加此超时时间。

在我们的代码中,我们已经将autoAck参数设置为false,并在处理完任务后手动发送确认。以下是确认消息的代码:

await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);

4.4 公平分发(Fair Dispatch)

你可能已经注意到,分发仍然不是我们想要的方式。例如,在有两个工作进程的情况下,如果所有奇数消息都很重,而偶数消息都很轻,一个工作进程将始终忙碌,而另一个工作进程几乎不工作。这是因为RabbitMQ在消息进入队列时就分发消息,它不会查看消费者未确认的消息数量。它只是简单地将每第n个消息分发给第n个消费者。

为了改变这种行为,我们可以使用BasicQos方法,并设置prefetchCount为1。这告诉RabbitMQ一次不要给工作进程超过一个消息。换句话说,直到工作进程处理并确认前一个消息之前,不要分发新消息给它。相反,它将分发给下一个不忙的工作进程。在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ工作队列的概念和实现。通过构建一个模拟的任务分发系统,我们学习了如何在多个工作进程之间分配任务,以及如何确保任务的均衡执行和持久存储。以下是我们从本教程中获得的关键要点:

  1. 异步任务处理:通过使用工作队列,可以将耗时的任务异步处理,从而提高Web应用的响应速度和用户体验。

  2. 任务封装:将复杂的任务封装成消息,发送到队列中,由后台的工作进程来处理这些任务。

  3. 消息确认:实现了消息确认机制,确保了即使在消费者处理任务时崩溃,消息也不会丢失,并且可以被重新分发给其他消费者处理。

  4. 公平分发:通过设置prefetchCount为1,实现了公平分发,确保了工作负载在所有消费者之间均匀分配,避免了某些消费者过载而其他消费者空闲的情况。

  5. 持久化:将队列和消息设置为持久化,以确保即使RabbitMQ服务器重启,任务也不会丢失。

通过这些机制,我们能够建立一个健壮的工作队列系统,它不仅能够提高应用的性能,还能够在面对各种异常情况时保持任务的可靠性和持久性。这些知识为我们在实际开发中实现复杂的异步任务处理提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916830.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Node.js下载安装及环境配置教程

一、进入官网地址下载安装包 Node.js 中文网 选择对应你系统的Node.js版本&#xff0c;这里我选择的是Windows系统、64位 二、安装程序 &#xff08;1&#xff09;下载完成后&#xff0c;双击安装包&#xff0c;开始安装Node.js (2)直接点【Next】按钮&#xff0c;此处可根据…

免费,WPS Office教育考试专用版

WPS Office教育考试专用版&#xff0c;不仅满足了考试需求&#xff0c;更为教育信息化注入新动力。 https://pan.quark.cn/s/609ef85ae6d4

将 HTML 转换为 JSX:JSX 和 JSX 规则

JSX 是 JavaScript 的语法扩展。您可以在 JavaScript 文件中编写 HTML 格式。 它基于 Web、Html、Css 和 JavaScript。Web 开发人员将页面内容分别编写为 Html 文件&#xff0c;将设计编写为 Css 文件&#xff0c;将逻辑编写为 JavaScript 文件。 须知 &#xff1a; JSX 是一个…

数据结构-二叉树及其遍历

🚀欢迎来到我的【数据结构】专栏🚀 🙋我是小蜗,一名在职牛马。🐒我的博客主页​​​​​​ ➡️ ➡️ 小蜗向前冲的主页🙏🙏欢迎大家的关注,你们的关注是我创作的最大动力🙏🙏🌍前言 本篇文章咱们聊聊数据结构中的树,准确的说因该是只说一说二叉树以及相…

活动|华院计算作为联盟理事单位出席进博会全球人工智能合作论坛

第七届中国国际进口博览会&#xff08;进博会&#xff09;于11月5日至10日在上海举行&#xff0c;作为本次进博会的重要配套活动&#xff0c;首届人工智能全球合作论坛也于9日圆满落幕。本次论坛由全球招商中心委员会、人工智能全球合作论坛组委会主办&#xff0c;中国国际科技…

Selective attention improves transformer详细解读

Selective attention improves transformer Google 2024.10.3 一句话&#xff1a;简单且无需额外参数的选择性注意力机制&#xff0c;通过选择性忽略不相关信息并进行上下文剪枝&#xff0c;在不增加计算复杂度的情况下显著提升了Transformer模型的语言建模性能和推理效率。 论…

shell脚本(1)

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 感谢泷羽sec 团队的教学 视频地址&#xff1a;shell脚本&#xff08;1&#xff09;脚本创建执行与变量使用_哔哩哔哩_bilibili 本文主要讲解shell脚本的创建、执行和变量的使用。 一、脚本执行…

本地 / 网络多绑定用例总结

原文连接&#xff1a;AUTOSAR_EXP_ARAComAPI的7章笔记&#xff08;4&#xff09; 情景设定 在前一节的基础上&#xff0c;假设有类似情景&#xff0c;区别在于服务实例 2 位于与 AP 产品相同以太网的不同 ECU 上&#xff0c;服务消费者及其代理驻留在 AP 产品 ECU 上。因以太网…

通用定时器---输出比较功能

目录 一、概念 二、输出比较的8种模式 三、输出比较输出PWM波形的基本结构 配置步骤 四、示例代码 一、概念 OC&#xff08;OutPut Compare&#xff09;输出比较。输出比较可以通过比较CNT与CCR寄存器的关系&#xff0c;来对输出电平进行置1/置0/翻转的操作&#xff0c;可…

CSS盒子的定位> (下篇)#固定定位#笔记

一、固定定位 1.概念 固定定位其实是绝对定位的子类别&#xff0c;一个设置了position&#xff1a;fixed的元素是相对于视窗固定的&#xff0c;就算页面文档发生了滚动&#xff0c;它也会一直待在相同的地方。 2.代码属性 CSS代码添加 position&#xff1a;fixed 水平方…

leetcode100:相同的树

给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q [1,2,3] 输出&#xff1a;true示例 2&…

我谈二值形态学基本运算——腐蚀、膨胀、开运算、闭运算

Gonzalez从集合角度定义膨胀和腐蚀&#xff0c;不易理解。 Through these definitions, you can interpret dilation and erosion as sliding neighborhood operations analogous to convolution (or spatial filtering). 禹晶、肖创柏、廖庆敏《数字图像处理&#xff08;面向…

【数据结构 | C++】整型关键字的平方探测法散列

整型关键字的平方探测法散列 将给定的无重复正整数序列插入一个散列表&#xff0c;输出每个输入的数字在表中的位置。所用的散列函数是 H(key)key%TSize&#xff0c;其中 TSize 是散列表的表长。要求用平方探测法&#xff08;只增不减&#xff0c;即H(Key)i^2&#xff09;解决冲…

24.11.15 Vue3

let newJson new Proxy(myJson,{get(target,prop){console.log(在读取${prop}属性);return target[prop];},set(target,prop,val){console.log(在设置${prop}属性值为${val});if(prop"name"){document.getElementById("myTitle").innerHTML val;}if(prop…

413: Quick Sort

解法&#xff1a; #include <bits/stdc.h> using namespace std; const int N1e55; int a[N]; int n;int main(int argc, char** argv) {cin>>n;for (int i0;i<n;i) cin>>a[i];sort(a,an);for (int i0;i<n;i) cout<<a[i]<<" "…

麒麟kysec安全

一、kysec安全框架管理 开启kysec getstatus Copy security-switch --set default Copy 重启系统 reboot Copy 刷新页面&#xff0c;等待几分钟&#xff0c;即可完成文件的扫描。 查看kysec状态 getstatus Copy 切换到管理员身份&#xff08;密码&#xff1a;devuser…

在qml里如何使用C++ Qt数据模型QAbstractListModel

本篇博客用qml GridView来显示视频矩阵,然后加载本地的视频,需要用到C++ Qt的model, 代码环境Qt6.5.3 qml, 对应的视频讲解:https://edu.csdn.net/learn/40003/653975?spm=3001.4143 先看一下界面效果: 上图是用qml ScrollView和GridView做了一个可以滚动显示的视频矩阵列…

Java项目实战II基于微信小程序的实习记录(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 在当今竞争激烈的就业市场中&#xff0…

【C++笔记】vector使用详解及模拟实现

前言 各位读者朋友们&#xff0c;大家好&#xff01;上期我们讲了string类的模拟实现&#xff0c;这期我们开启vector的讲解。 一.vector的介绍及使用 1.1 vector的介绍 vector的文档 使用STL的三个境界&#xff1a;能用、明理、能扩展&#xff0c;下面学习vector&#xff…

【环境配置】macOS配置jdk与maven

配置jdk与maven 配置jdk与切换java版本命令 maven安装与配置国内镜像源 用到的命令 # 进入 JDK 安装目录 cd /Library/Java/JavaVirtualMachines# 查看文件 ls ➜ jdk-1.8.jdk jdk-11.jdk# 查看路径 pwd ➜ /Library/Java/JavaVirtualMachines# 打开环境变量配置文件 vi &…