(七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

发布者确认(Publisher Confirms)

发布者确认是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。

0、引言

先决条件

本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口(5672)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。

获取帮助

如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的利弊。

原文链接:https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html

1、在通道上启用发布者确认

发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,所以默认情况下它们是不启用的。使用 ConfirmSelect 方法可以在通道层级启用发布者确认:

var channel = connection.CreateModel();
channel.ConfirmSelect();

您必须在期望启用发布者确认的每个通道上调用该方法。确认只需要启用一次,而不是对每条发布的消息都启用。

策略 #1:单独发布消息

让我们从实现带确认的发布的最简单途径开始吧,那就是,发布一条消息并同步等待它确认

while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    IBasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

在前面的示例中我们像往常一样发布消息并使用 Channel#WaitForConfirmsOrDie(TimeSpan) 方法等待它确认。该方法在消息确认后立即返回。如果在超时时间内消息未得到确认或者如果消息已 nack(Negative-Acknowledgement) 了(意味着代理由于某些原因无法处理它),方法会抛出一个异常。异常的处理通常包括记录一个错误消息日志 并/或 重新尝试发送消息。

不同的客户端库有不同的方式去同步处理发布者确认,所以确保仔细阅读您正在使用的客户端的文档。

这个技术非常简单但也有一个巨大的缺点:它会显著降低发布速度,因为某条消息的确认会堵塞后续消息的发布。这种方法提供的吞吐量不会超过每秒几百条已发布的消息。不过,这对于某些应用程序来说已经足够好了。

发布者确认是异步的吗

在开头我们提到代理是异步确认已发布的消息的,但在第一个例子中,代码是同步等待直至消息确认的。客户端实际上异步接收确认,并相应地解除对 WaitForConfirmsOrDie 的调用阻塞。可以将 WaitForConfirmsOrDie 看作是一个同步 helper,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进上面的例子,我们可以发布一批消息并等待这一批消息全部得到确认。如下是一个使用 100 一批次的示例:

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    IBasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize)
    {
        channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0)
{
    channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

等待一批消息的确认比等待单个消息的确认大大提高了吞吐量(在远程 RabbitMQ 节点上最多可提高 20-30 倍)。一个缺点是,如果出现故障,我们不知道究竟是哪里出了问题,因此我们可能不得不在内存中保存整个批处理,以记录一些有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻止消息的发布。

策略 #3:异步处理发布者确认

代理异步确认已发布的消息,只需要在客户端上注册一个回调就可以收到这些确认的通知:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
  // code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
  //code when message is nack-ed
};

这儿有两个回调:一个用于已确认的消息,一个用于已 nack 的消息(可以认为是代理丢失的消息)。两个回调都有相应的 EventArgs 参数(ea)包含:

delivery tag
标识已确认或已 nack 消息的序列号。我们将很快看到如何将其与发布的消息关联起来。
multiple
这是一个布尔值。如果为 false,则仅有一条消息确认/nack-ed;如果为 true,所有序列号 小于等于该序列号的消息都确认/nack-ed。

在发布前,可以通过 Channel#NextPublishSeqNo 获得消息的序列号:

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);

将消息与序列号关联起来的一种简单方法是使用字典。让我们假设我们想要发送字符串,因为它们很容易转换为用于发布的字节数组。下面是一个代码示例,它使用字典将发布序列号与字符串消息体关联起来:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

发布代码现在使用字典跟踪出站消息。我们需要在确认到达时清理字典,并在消息已 nack 时做一些类似于记录警告的事情:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
    if (multiple)
    {
        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
        foreach (var entry in confirmed)
        {
            outstandingConfirms.TryRemove(entry.Key, out _);
        }
    }
    else
    {
        outstandingConfirms.TryRemove(sequenceNumber, out _);
    }
}

channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
    CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

// ... publishing code

先前的示例中包含一个在确认到达时清理字典的回调。注意,这个回调处理单次和多次确认。这个回调会在确认到达(Channel#BasicAcks)时被使用。用于已 nack 消息的回调将检索消息体并发出警告。然后,它重用之前的回调来清除字典中未完成的确认(无论消息是已确认还是已 nack,都必须删除字典中对应的条目)。

如何跟踪未完成的确认?

我们的示例使用一个 ConcurrentDictionary 跟踪未完成的确认。由于几种原因,这个数据结构十分方便。它允许我们能够轻易地将序列号与消息关联起来(无论消息数据是什么)并允许我们能够通过一个给出的序列 id 轻易地清理条目(以处理多次确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中被调用的,该线程应该与发布线程保持不同。

除了使用复杂的字典实现外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发哈希表和变量来跟踪发布序列的下界,但它们通常更复杂,不属于“教程”的范畴。

总而言之,异步处理发布者确认通常需要以下步骤:

  • 提供一个方法去关联发布序列号和消息。
  • 在通道上注册确认侦听器,以便在发布者 acks/nacks 到达时得到通知,并执行适当的操作,例如记录或者重新发布已 nack 的消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布已 nack 的消息?

在相应的回调中重新发布已 nack 的消息可能很诱人,但应该避免这样,因为确认回调是在(通道不应该执行操作的)I/O 线程中分配的。更好的方案是在内存队列中对消息进行排队,该队列由发布线程轮询。像 ConcurrentQueue 这样的类可以很好地在确认回调和发布线程之间传递消息。

总结

在某些应用程序中,确保已发布的消息到达代理是必要的。发布者确认(Publisher Confirms)是 RabbitMQ 的一个特性,可以帮忙满足这个需求。发布者确认本质上是异步的,但也可以同步处理它们。没有说只有一个绝对的方法来实现发布者确认,这通常取决于应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但在一些东西出现问题时很难推断。
  • 异步处理:最佳性能和资源使用,错误情况下的良好控制,但还需要参与正确实现的过程(无法直接用现成的)。

2、将所有的东西放到一起

PublisherConfirms.cs 类包含了我们所介绍的技术的代码。我们可以编译它,按原样执行它并看看每项技术的表现如何:

dotnet run

输出会看起来像下面这样:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

运行效果:
在这里插入图片描述

如果客户端和服务器位于同一台机器上,那么您的计算机的输出应当与之类似。不出所料单独发布消息表现十分糟糕;但出乎意料的是:与批量发布相比,异步处理的表现有些令人失望。

发布者确认十分依赖于网络,所以我们最好不要在远端节点上尝试,而在生产中,客户端和服务器通常不在同一台机器上却又是更现实的情况。PublisherConfirms.cs 可以很容易地更改为使用非本地节点:

private static IConnection CreateConnection()
{
    var factory = new ConnectionFactory { HostName = "remote-host", UserName = "remote-host", Password = "remote-password" };
    return factory.CreateConnection();
}

重新编译类,再次执行并等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布现在的表现仍然非常糟糕。但是有了客户端和服务器之间的网络,批量发布和异步处理现在表现得差不多,同时异步处理在发布者确认方面还有一点小优势。

请记住,批量发布很容易实现,但是在 negative publisher acknowledgement 的情况下,不容易知道哪些消息不能发送到代理。异步处理发布者确认需要更多的参与实现,但提供了更好的粒度和对发布消息已 nack 时执行的操作的更好控制。

5、生产[非]适用性免责声明

请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。

在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/45893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uni-app个人中心

一. 介绍uni-app&#xff1a; uni-app 是基于Vue.js框架开发的一个跨平台移动应用开发框架&#xff0c;可以同时支持多个平台&#xff08;如iOS、Android、Web等&#xff09;的应用开发。采用了统一的语法和组件规范&#xff0c;可以大大简化跨平台开发的工作&#xff0c;提高…

SZ:zip/内部函数外部函数/VGG模型/nn

zip&#xff1a; -r recursion-d delete-m move (move隐藏的意思是&#xff0c;原文件会消失&#xff09;想增加文件&#xff0c;不需要加参数什么参数也没有。如果zip压缩文件不存在&#xff0c;执行以上命令将会创建一个新的zip文件并将指定的文件添加进去。如果zip压缩文件…

需求管理中最易忽视的6大重点

需求管理是产品经理的重点工作&#xff0c;如果无法有效进行需求管理&#xff0c;往往会引起需求变更、项目延期以及成本增加等问题。那么如何对需求进行高效管理&#xff0c;我们在需求管理中&#xff0c;往往最容易忽视的重点都有哪些&#xff1f; 1、重视项目整体管理计划 首…

【前后端数据交互】原生JS的Fetch请求封装

一、 AJAX 和 Fetch 对比 1.1 AJAX 概述 AJAX 是最早出现请求数据的方式&#xff0c;它不需要不需要刷新整个页面即可更新部分数据。 属于原生 JS 范畴 &#xff0c;技术核心是 XMLHttpRequest 对象。 AJAX 请求过程&#xff1a;创建 XMLHttpRequest 对象、连接服务器、发送请…

Qt Core学习日记——第六天QMetaMethod

Qt子类会将每一个函数封装成QMetaMethod存储在对应的QMetaObject中&#xff0c;包括信号、槽函数、普通函数、构造函数、析构函数 函数解析 QMetaMethod::methodSignature 获取方法的签名 比如函数slot2&#xff0c;对应签名是“slot2(int*)” QMetaMethod::name 方法名称。…

你能说说“淘宝购物车”怎么测试么?

前言 今天我给大家整理一篇面试高频问到的问题“淘宝购物车”怎么测试。 测试思维 依然附上测试任何事物的测试思路&#xff1a; 第一步&#xff1a;梳理产品的核心业务流程&#xff1a;明白这是个什么项目&#xff0c;实现了什么业务&#xff0c;以及是怎么实现的&#xf…

从Vue层面 - 解析发布订阅模式和观察者模式区别

目录 前言一、发布订阅模式什么是发布订阅模式&#xff1f;应用场景 二、观察者模式1&#xff09;什么是观察者模式&#xff1f;2&#xff09;应用场景3&#xff09;vue中的观察者模式观察者&#xff08;订阅者&#xff09; - Watcher目标者&#xff08;发布者&#xff09; - D…

STM32 串口实验(学习一)

本章将实现如下功能&#xff1a;STM32通过串口和上位机对话&#xff0c;STM32在收到上位机发过来的字符串后&#xff0c;原原本本返回给上位机。 STM32 串口简介 串口作为MCU的重要外部接口&#xff0c;同时也是软件开发重要的调试手段&#xff0c;其重要性不言而喻。现在基本…

Fiddler使用说明及中文绿化版

Fiddler是最常用的Web调试工具之一。 对于开发来说&#xff0c;前端可以通过fiddler代理来调试JS、CSS、HTML样式。后端可以通过fiddler查看请求和相应&#xff0c;定位问题。 对于测试来说&#xff0c;可以通过抓包方式修改前端请求参数和模拟后端返回&#xff0c;快速定位缺…

如何创建vue2,vue3项目

前提需安装node.js和Vue CLI node.js:https://nodejs.org/zh-cn Vue CLI&#xff1a; npm install -g vue/cli 如何创建一个vue2项目 &#xff08;1&#xff09; 使用cmd终端直接创建 进入到vue项目所创建的目录里&#xff08;我是直接创建在桌面上&#xff09; 选择vue2 …

STL中的string类的模拟实现【C++】

文章目录 默认成员函数构造函数拷贝构造函数 赋值运算符重载函数析构函数beginendsizecapacityreserveeraseresizepush_backappendoperatorinsertswapsubstrc_stroperator[ ]findcleargetline>>运算符的重载<<运算符的重载 默认成员函数 构造函数 构造函数设置为…

现代控制理论

B站学习视频https://space.bilibili.com/230105574/channel/seriesdetail?sid1569601 一.引入状态-空间表达 &#xff08;本质上是使用一组向量的线性组合来表示整个系统任意物理量&#xff0c;也就是一个特征分解的过程&#xff09; 现代控制理论的基础是 状态-空间表达方…

IDEA以服务列表的形式展示

IDEA以服务列表的形式展示 要是没有要显示的服务列表的话就右键将启动的全部添加进去。正常是懒加载的形式&#xff0c;正常启动了就会添加进去。

Toyota Programming Contest 2023#4(AtCoder Beginner Contest 311)(A-G)

Contest Duration: 2023-07-22(Sat) 20:00 - 2023-07-22(Sat) 21:40 (local time) (100 minutes) 头文件和宏 #include<iostream> #include<string> #include<vector> using namespace std; #define int long long #define fer(i,a,b) for(int ia;i<b;i…

【时间复杂度】

旋转数组 题目 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 /* 解题思路&#xff1a;使用三次逆转法&#xff0c;让数组旋转k次 1. 先整体逆转 // 1,2,3,4,5,6,7 // 7 6 5 4 3 2 1 2. 逆转子数组[0, k - 1] // 5 6 7 4 3…

Pytorch个人学习记录总结 03

目录 Transeforms的使用 常见的transforms Transeforms的使用 torchvision中的transeforms&#xff0c;主要是对图像进行变换&#xff08;预处理&#xff09;。from torchvision import transforms transeforms中常用的就是以下几种方法&#xff1a;&#xff08;Alt7可唤出…

多源BFS-- 矩阵距离

关于多源BFS&#xff0c;基本上就是单源BFS的简单升级了一下&#xff0c;比如在queue中队头开始时只有一个&#xff0c;我们通过这一个队头去推导其他的东西。而多源最短路就是队头一开始有1-n个可能的数&#xff0c;一个一个去BFS。 题目思路&#xff1a; 这个题就直接把所有的…

0成本搭建自己的云数据库

第一步&#xff0c;租免费的云服务器 www.aliyun.com 阿里云的&#xff0c;可以免费租三个月 进入主页后选择云服务器ESC 选择这款&#xff0c;点击试用就行 第二步&#xff0c;配置服务器 在配置服务器系统的时候选择centos&#xff0c;省事&#xff0c;别选ubuntu&#x…

[Spring] 三级缓存解决循环依赖详解

什么是循环依赖 注册一个bean对象的过程&#xff1a; Spring扫描class得到BeanDefinition – 根据得到的BeanDefinition去生成bean – 现根据class推断构造方法 – 根据推断出来的构造方法&#xff0c;反射&#xff0c;得到一个对象 – 填充初始对象中的属性(依赖注入) – 如果…

服务器中了360后缀勒索病毒,360后缀勒索病毒介绍解密数据恢复

360后缀勒索病毒&#xff0c;是BeijingCrypt勒索家族中的一种勒索软件病毒&#xff0c;这种恶意软件一旦攻击了企业的服务器就会利用自身独特的加密技术来全盘扫描系统文件&#xff0c;并对用户的全部文件进行加密&#xff0c;并要求用户支付赎金以解锁文件。近期&#xff0c;我…