0、引言
先决条件
本教程假设 RabbitMQ 已安装并且正在
本地主机
的标准端口(5672
)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。获取帮助
如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。
在上一篇教程中我们构建了一个简单的日志系统,得以向许多接收者(receiver)广播日志消息。
在本教程中我们将会为该系统添加一个特性 —— 我们将使“仅订阅消息的一个子集”成为可能。例如,我们将能够只将关键错误消息定向到日志文件(以节省磁盘空间),与此同时仍然能够在控制台上打印所有的日志消息。
原文链接:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
1、绑定
在上一篇例程中我们已经建立过绑定了。您也许能够回想起,代码类似于:
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: string.Empty);
绑定 即表征 交换机 与 队列 之间的关系。这可以简单地理解为:队列对来自此交换机的消息感兴趣。
绑定可以接收一个额外的 routingKey
(路由键)参数。为了避免与 BasicPublish
方法的一个参数混淆,我们现在将其称之为 binding key
(绑定键)。下面演示了我们如何创建一个带有键的绑定:
channel.QueueBind(queue: queueName,
exchange:"direct_logs",
routingKey: "black");
binding key
的含义取决于交换机的类型。比如我们之前使用的 fanout
交换机会简单地忽略它的值。
2、直连交换机(Direct exchange)
在上一篇教程中,我们的日志系统向所有的消费者广播所有消息。我们希望对其进行拓展,以允许根据消息的严重程度过滤消息。比如,我们也许想要向磁盘写入日志消息的脚本文件只接收关键错误,而不是在警告或者信息日志消息上浪费磁盘空间。
但我们在上一篇教程使用的 fanout
扇出交换机并没有给我们如此大的灵活性 —— 它只能够进行无意识的广播。
相反,我们将会使用一个 direct
直连交换机。其背后的路由算法很简单 —— 将消息路由到其 routing key
与队列的 binding key
完全匹配的队列。
为了说明这一点,请考虑如下配置:
在这个配置中,我们可以看到绑定了两个队列的 direct
直连交换机 X
。第一个队列使用 orange
绑定键绑定;而第二个队列拥有两个绑定,一个绑定键为 black
,另一个绑定键为 green
。
在这样的配置下,被发布到交换机中带有 orange
路由键的消息将会被路由到 Q1
队列。而带有 black
或者 green
路由键的消息将会去往 Q2
队列。其他所有消息将会被丢弃。
3、多个绑定
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用 black
绑定键在 X
与 Q1
之间添加绑定。在那样的情况下,direct
直连交换机表现得像是 fanout
扇出交换机:向所有匹配的队列广播消息。带有 black
路由键的消息将同时传递给 Q1
和 Q2
双方。
4、发送日志
我们将在日志系统中使用这个模型,我们将消息发送至 direct
直连交换机而不是 fanout
扇出交换机。我们提供日志严重程序作为 routing key
路由键。这样接收脚本就能够选择其想要接收的严重程度。让我们首先关注发送日志:
如常,首先我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
我们已经准备好发送一条消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
为了简化问题我们假定 ‘严重程度’ 可以是 info
、warning
、error
中的一个。
5、订阅
接收消息的方式工作与上一篇教程类似,但有一点例外 —— 我们将会为我们感兴趣的每个 ‘严重程度’ 创建新的绑定。
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
6、将所有的东西放到一起
EmitLogDirect.cs
类的代码:
using System.Text;
using RabbitMQ.Client;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip(1).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
Console.WriteLine($" [x] Sent '{severity}':'{message}'");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
ReceiveLogsDirect.cs
的代码:
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
// declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
像往常一样创建项目(建议参考教程一)
如果您只想要保存 warning
和 error
(没有 info
)日志消息到文件中,只需要打开一个控制台并输入:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
如果您想要在屏幕上看见所有的日志消息,打开一个新的终端并尝试:
cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C
例如,要发送一个 error
日志消息,只需要输入:
cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(EmitLogDirect.cs 和 ReceiveLogsDirect.cs 的完整源码)
运行效果:
要了解如何基于特定模式侦听消息,请移步至教程五。
5、生产[非]适用性免责声明
请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。