RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;

await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{
    // 循环发送指定次数的消息
    for (int i = 1; i <= loopCount; i++)
    {
        // 调用SendMessageToQueue方法发送消息,并包含当前迭代次数
        await SendMessageToQueue($"Iteration {i} - Hello World");
        // 这里可以添加延迟,如果需要的话
        // await Task.Delay(1000);
    }
    Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{
    // 创建连接工厂,并设置RabbitMQ服务器地址为localhost
    var factory = new ConnectionFactory { HostName = "localhost" };
    // 使用异步方式创建连接
    using var connection = await factory.CreateConnectionAsync();
    // 使用异步方式创建通道
    using var channel = await connection.CreateChannelAsync();
    //声明名为"logs"的fanout类型的交换机
    await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
    // 将消息内容编码为字节数组
    var body = Encoding.UTF8.GetBytes(message);

    // 异步发布消息到队列
    await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
    Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();

// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",
    type: ExchangeType.Fanout);

// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");

// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);

// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{
    // 从接收到的消息中提取消息体并转换为字符串
    byte[] body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    // 打印接收到的消息
    Console.WriteLine($" [x] {message}");
    // 返回Task.CompletedTask以满足异步事件处理的签名要求
    return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918775.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot(6)-Shiro

目录 一、是什么 二、准备工作 2.1 环境搭建 2.2 自定义Realm配置类 2.3 自定义shiro配置类 三、实践 3.1 请求拦截 3.2 用户认证 3.3 用户授权 3.4 shiro和thymeleaf整合 一、是什么 是java的一个安全框架 核心三大对象&#xff1a; 1、Subject【用户】 2、Secur…

Misc_01转二维码(不是二进制)

例题ctfhub/隐写v2.0 打开是一张图片 文件分离得到zip&#xff0c;爆破密码得到7878 打开得到0和1&#xff0c; !!!不是二进制转图片&#xff0c;直接是二维码 缩小能看到 000000000000000000000000000000000000000000000000000000000000000000000 000000000000000000000000…

使用 K-means 算法进行豆瓣读书数据的文本聚类分析

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

STM32完全学习——F407ZGT6点亮LED

一、寄存器描述 我们想要点亮LED&#xff0c;无非就是对于寄存器的一些设置&#xff0c;主要分为两步&#xff0c;首先是需要打开相应GPIO的时钟&#xff0c;这是因为STM32在上电后&#xff0c;每个外设的时钟默认都是关闭的&#xff0c;需要我们手动打开。其次就是对GPIO的一…

Dubbo RPC线程模型

消费端线程模型&#xff0c;提供者端线程模型 消费端线程模型 对 2.7.5 版本之前的 Dubbo 应用&#xff0c;尤其是一些消费端应用&#xff0c;当面临需要消费大量服务且并发数比较大的大流量场景时&#xff08;典型如网关类场景&#xff09;&#xff0c;经常会出现消费端线程…

Python酷库之旅-第三方库Pandas(225)

目录 一、用法精讲 1056、pandas.PeriodIndex.dayofweek属性 1056-1、语法 1056-2、参数 1056-3、功能 1056-4、返回值 1056-5、说明 1056-6、用法 1056-6-1、数据准备 1056-6-2、代码示例 1056-6-3、结果输出 1057、pandas.PeriodIndex.day_of_week属性 1057-1、…

商业物联网详细指南:优势与挑战

物联网是信息技术行业最具前景的领域之一。为什么它如此热门呢&#xff1f;原因在于全球连接性。设备可以像人群一样相互协作。正如我们所知&#xff0c;协作能显著提高生产力。 物联网对普通用户和企业都有益处。许多日常流程可以通过传感器、扫描仪、摄像头和其他设备实现自…

Spring Boot汽车资讯:科技与汽车的新融合

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了汽车资讯网站的开发全过程。通过分析汽车资讯网站管理的不足&#xff0c;创建了一个计算机管理汽车资讯网站的方案。文章介绍了汽车资讯网站的系统分析部分&…

vlan之间的通信(三层交换机)

拓补图&#xff1a; 【实验步骤】 LSW1配置&#xff1a; The device is running! <Huawei> <Huawei>sys Enter system view, return user view with CtrlZ. [Huawei]un in e Info: Information center is disabled. [Huawei]sys maluoying [maluoying]vla…

Redis作为分布式锁,得会避坑

日常开发中&#xff0c;经常会碰到秒杀抢购等业务场景。为了避免并发请求造成的库存超卖等问题&#xff0c;我们一般会用到Redis分布式锁。但是使用Redis分布式锁之前要知道有哪些坑是需要我们避过去的。 1. 非原子操作&#xff08;setnx expire&#xff09; 一说到实现Redis…

ETH钱包地址如何获取 如何购买比特币

首先我们要先注册一个交易所 Gate.io&#xff08;推荐&#xff09;: 点我注册 1、注册很简单&#xff0c;通过手机号就可以进行注册了。 2、获取ETH钱包地址 注册好之后&#xff0c;如图所示&#xff0c;点击“统一账户” 3、通过搜索栏搜索ETH&#xff0c;如下图所示 4、点…

基于Python+Django的农业害虫识别系统设计和实现(源码+论文+部署讲解等)

博主介绍&#xff1a;CSDN毕设辅导第一人、全网粉丝50W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术范围&#xff1a;SpringB…

Python多线程爬虫入门:让你的爬虫跑得更快

一、前言 在互联网时代&#xff0c;数据是最有价值的资源之一。而网页爬虫是获取数据的一种非常重要的工具。在这篇文章中&#xff0c;我们将学习如何用 Python 编写一个多线程网页爬虫&#xff0c;适合小白快速上手&#xff01; 二、多线程进程 单线程串行&#xff1a;一步…

使用Web Speech API实现语音识别与合成技术

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用Web Speech API实现语音识别与合成技术 使用Web Speech API实现语音识别与合成技术 使用Web Speech API实现语音识别与合成技…

NavVis VLX3的精度怎么去进行验证?【上海沪敖3D】

01、精度评价现状 三维捕捉行业还没有建立一个用于估算或验证移动激光扫描系统精度的统一标准。因此&#xff0c;需要高精度交付成果的专业人士很难相信设备所标注的精度规格&#xff0c;也就很难知道基于SLAM的移动激光扫描系统是否适合当前的项目。 NavVis将通过展示一种严格…

爬虫开发工具与环境搭建——环境配置

第二章&#xff1a;爬虫开发工具与环境搭建 第二节&#xff1a;环境配置 在进行爬虫开发之前&#xff0c;首先需要配置好开发环境。一个良好的开发环境不仅能提高开发效率&#xff0c;还能避免因环境不一致带来的问题。以下是环境配置的详细步骤&#xff0c;涵盖了Python开发…

如何用Excel批量提取文件夹内所有文件名?两种简单方法推荐

在日常办公中&#xff0c;我们有时需要将文件夹中的所有文件名整理在Excel表格中&#xff0c;方便管理和查阅。手动复制文件名既费时又易出错&#xff0c;因此本文将介绍两种利用Excel自动提取文件夹中所有文件名的方法&#xff0c;帮助你快速整理文件信息。 方法一&#xff1…

gvim添加至右键、永久修改配置、放大缩小快捷键、ctrl + c ctrl +v 直接复制粘贴、右键和还原以前版本(V)冲突

一、将 vim 添加至右键 进入安装目录找到 vim91\install.exe 管理员权限执行 Install will do for you:1 Install .bat files to use Vim at the command line:2 Overwrite C:\Windows\vim.bat3 Overwrite C:\Windows\gvim.bat4 Overwrite C:\Windows\evim.bat…

机器学习day5-随机森林和线性代数1最小二乘法

十 集成学习方法之随机森林 集成学习的基本思想就是将多个分类器组合&#xff0c;从而实现一个预测效果更好的集成分类器。大致可以分为&#xff1a;Bagging&#xff0c;Boosting 和 Stacking 三大类型。 &#xff08;1&#xff09;每次有放回地从训练集中取出 n 个训练样本&…

SpringCloud框架学习(第三部分:Resilience4j 与 Micrometer)

目录 九、CircuitBreaker断路器 1.前言&#xff08;Hystrix&#xff09; 2.服务雪崩 3.Circuit Breaker 4. Resilience4j 5.案例实战 &#xff08;1&#xff09;熔断&#xff08;服务熔断 服务降级&#xff09; Ⅰ. 按照 COUNT_BASED&#xff08;计数的滑动窗口&#xf…