文章目录
- RabbitMQ教程:路由(Routing)(四)
- 一、引言
- 二、基本概念
- 2.1 路由与绑定
- 2.2 Direct交换机
- 2.3 多绑定
- 2.4 发送日志
- 2.5 订阅
- 三、整合代码
- 3.1 EmitLogDirectApp.cs
- 3.2 ReceiveLogsDirectApp.cs
- 3.3 推送所有和接收error、warning级别日志
- 3.4 推送和接收error级别日志
- 四、结论
RabbitMQ教程:路由(Routing)(四)
一、引言
在之前的教程中,我们构建了一个简单的日志系统,该系统能够将日志消息广播给多个接收者。在本教程中,我们将扩展这个系统,增加一个功能:只订阅消息的一个子集。例如,我们可能只想将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
二、基本概念
2.1 路由与绑定
在之前的示例中,我们已经创建了绑定。绑定是交换机和队列之间的关系,可以简单地理解为:队列对来自这个交换机的消息感兴趣。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: string.Empty);
绑定可以带一个额外的routingKey
参数,为了避免与BasicPublish
参数混淆,我们将其称为binding key
。下面是如何创建带有键的绑定:
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: "black");
绑定键的含义取决于交换机类型。我们之前使用的fanout
交换机,简单地忽略了它的值。
2.2 Direct交换机
我们的日志系统从上一个教程中广播所有消息给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望写入磁盘日志消息的脚本只接收关键错误,而不是浪费磁盘空间在警告或信息日志消息上。
我们之前使用的fanout
交换机没有给我们提供太多灵活性——它只能进行无脑广播。
我们将改用direct
交换机。direct
交换机背后的路由算法很简单——消息会被路由到binding key
与消息的routing key
完全匹配的队列。
2.3 多绑定
将多个队列绑定到同一个绑定键是完全合法的。在我们的示例中,我们可以在X
和Q1
之间添加一个绑定键为black
的绑定。在这种情况下,direct
交换机会表现得像fanout
一样,将消息广播给所有匹配的队列。带有路由键black
的消息将被传递给Q1
和Q2
。
2.4 发送日志
我们将为我们的日志系统使用这个模型。我们将消息发送到一个direct
交换机,并提供日志严重性作为routing key
。这样,接收脚本将能够选择它想要接收的严重性。
首先,我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
然后我们可以发送消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
为了简化,我们假设’severity’可以是info
、warning
或error
中的一个。
2.5 订阅
接收消息将与上一个教程中的工作方式相同,唯一的区别是我们将为每个我们感兴趣的严重性创建一个新的绑定。
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
三、整合代码
3.1 EmitLogDirectApp.cs
using RabbitMQ.Client;
using System.Text;
// 从外部传递循环次数,例如10或20
int loopCount = 10; // 可以根据需要修改循环次数
await SendLogsAsync(loopCount);
// 等待用户按下回车键退出程序
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
//发送日志消息到RabbitMQ的direct类型的交换机
async Task SendLogsAsync(int loopCount)
{
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 声明名为"direct_logs"的direct类型的交换机
string ExchangeName = "direct_logs";
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);
// 定义可能的严重性级别
string[] severities = { "info", "warning", "error" };
// 创建Random实例用于生成随机数
Random random = new Random();
// 循环发送指定次数的消息
for (int i = 0; i < loopCount; i++)
{
// 随机选择一个严重性级别
string severity = severities[random.Next(severities.Length)];
// 构建消息内容,包含循环次数
string message = $"Iteration {i + 1} - Hello World!";
// 将消息内容编码为字节数组
var body = Encoding.UTF8.GetBytes(message);
// 异步发布消息到交换机,使用严重性级别作为路由键
await channel.BasicPublishAsync(exchange: ExchangeName, routingKey: severity, body: body);
// 打印消息发送成功的信息
Console.WriteLine($" [x] Sent '{severity}':'{message}'");
}
}
3.2 ReceiveLogsDirectApp.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 声明名为"direct_logs"的direct类型的交换机
string ExchangeName = "direct_logs";
await channel.ExchangeDeclareAsync(exchange: ExchangeName, type: ExchangeType.Direct);
// 声明一个由服务器命名的队列
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
// 设定感兴趣的日志级别
//string[] severities = { "info","warning", "error" };
//string[] severities = { "warning", "error" };
string[] severities = { "error" };
// 为每个感兴趣的日志级别创建绑定
foreach (string severity in severities)
{
await channel.QueueBindAsync(queue: queueName, exchange: ExchangeName, routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages."); // 提示信息,表示消费者正在等待消息
// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
// 从接收到的消息中提取消息体并转换为字符串
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey; // 获取路由键,即消息的严重性级别
Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); // 打印接收到的消息
return Task.CompletedTask; // 返回Task.CompletedTask以满足异步事件处理的签名要求
};
// 开始消费指定队列的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); // 提示信息,等待用户按下回车键退出程序
Console.ReadLine();
3.3 推送所有和接收error、warning级别日志
配置EmitLogDirectApp.cs
推送所有日志级别数据。
配置ReceiveLogsDirectApp
接收error和warning级别数据。
3.4 推送和接收error级别日志
配置EmitLogDirectApp.cs
推送所有日志级别数据。
配置ReceiveLogsDirectApp
接收error级别数据。
四、结论
在本教程中,我们深入探讨了RabbitMQ路由模式的概念和实现。通过构建一个可以根据消息属性(如严重性级别)路由消息的日志系统,我们学习了如何创建direct
类型的交换机,以及如何根据路由键将消息发送到特定的队列。以下是我们从本教程中获得的关键要点:
-
路由灵活性:通过使用
direct
交换机,我们实现了基于路由键的消息路由,这允许我们灵活地控制消息的流向,而不是简单地广播给所有订阅者。 -
消息过滤:我们可以根据消息的属性(如严重性级别)来过滤消息,确保只有相关的消费者接收到消息,从而节省资源并提高效率。
-
绑定键与路由键:我们学习了如何使用绑定键和路由键来控制消息的路由,使得消息可以根据特定的键值被路由到对应的队列。
-
多绑定:我们了解了如何将多个队列绑定到同一个绑定键,从而实现消息的多播分发。
-
实际应用:通过这个教程,我们掌握了如何在实际应用中实现消息的精细化控制,这对于构建复杂的事件驱动架构和微服务架构至关重要。
通过这些机制,我们能够建立一个既高效又灵活的路由系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。