2 Hello, Rx
本章节涵盖的内容:
- 不使用Rx的工作方式
- 向项目中添加Rx
- 创建你的第一个Rx应用程序
Rx 的目标是协调和统筹来自社交网络、传感器、用户界面事件等不同来源的基于事件的异步计算。例如,建筑物周围的监控摄像头和移动传感器会在有人靠近建筑物时触发,并从最近的摄像头向我们发送照片。Rx 还能统计包含候选人姓名的推文,以估算候选人的受欢迎程度。这是通过异步方式调用外部网络服务完成的。对于这些场景和其他类似场景,协调往往会导致复杂的程序,而 Rx 绝对可以减轻这种工作量,你会看到这一点。
在本章中,你将通过一个示例来了解用 Rx 和不用 Rx 对程序的不同,主要从结构、可读性、扩展和演进的便利性方面进行对比。想象一下,你收到了股票反斗城公司著名的首席技术官彭尼先生的来信。股票反斗城是一家股票交易公司,为客户提供投资建议,并从收益中收取利息。 因此,对股票市场的变化做出快速反应对公司来说非常重要。最近,股票反斗城公司发现,通过使用一个系统,可以对股票的剧烈变化发出警报,正符合生彭尼先生的需求,从而节省资金。 彭尼先生对 剧烈波动 的定义是价格变动超过 10%。当发生此种波动时,"股票反斗城 "希望尽快知道,以便做出反应,卖出或买入股票。
彭尼先生来找你,是因为他知道他可以依靠你,快速提供高质量的应用程序。你的工作(也是本章的目标)是创建一个应用程序,通知用户有关发生剧烈变化的股票的信息。当股价两次报价之间以一定幅度增加或减少时(本例中为 10%),就算是发生剧烈变化。遇到这种情况时,你需要向用户的手机发送推送通知,或在应用程序屏幕上显示警报,例如显示红色闪烁条。
在本章的第一部分,你将了解到,使用传统 .NET 事件方法创建程序时,通常采用的步骤。然后,我们将分析解决方案,并讨论其不足之处。
本章的第二部分将在程序中引入 Rx。首先,你将把库添加到项目中,然后一步一步地为股票反斗城公司创建 Rx 风格的应用程序。
2.1 处理传统的 .NET 事件
股票信息来自股票交易源,许多服务商都提供此类信息。每种服务商都有自己的 API 和数据格式,其中有几种免费来源,如 Yahoo Finance (http://finance.yahoo.com) 和 Google Finance (www.google.com/finance)。对于你的程序,最重要的信息是股票的代码和股价。股票的代码是一种唯一标识股票交易或股票的字符串(例如,MSFT 是 Microsoft 股票代码)。
图 2.1 中的流程图描述了应用程序的逻辑流程。
图 2.1 "Stock R Us "应用程序逻辑流程图。我们会通知用户剧烈变动–价格变动超过 10%。
对于程序接收到的每一个股票信息,它都计算新股价与上一股价之间的价格差异,并换算成变化率。假如你收到一条更新,MSFT 的价格从 50 美元变为 40 美元,变化了 20%。这将视为是 剧烈波动,会在应用程序中显示警报。
在现实生活中,股票成交价的产生速率并不固定。为了避免混淆,目前先假定股票成交价以恒定的速率产生;稍后再处理时间方面的问题。
这里我们抽象地表示一下能发送出股票成交价的信号源,就用 StockTicker 类来表示。该类只公开了一个关于 StockTick 的事件,此事件会在有新股票成交价时被触发。
清单 2.1 StockTicker 类
class StockTicker { public event EventHandler<StockTick> StockTick; }
译者注: 我想讨论一下StockTick这个单词,我个人感觉作者应该用两个单词,分别是StockTickEvent和StockTickInfo,其中StockTickEvent表示她是StockTicker的event,在这上面挂监听。StockTickInfo是事件对象。所以我认为合理的代码应该如下:
class StockTicker { //public event EventHandler<StockTick> StockTick; public event EventHandler<StockTickInfo> StockTickEvent; }
然后再说说StockTicker类,作者在描述StockTicker上,用的笔墨很少。但这个类其实非常重要,他是股价信号的产生源。这里笔者没有给出她的任何工作原理,仅仅是说这种类型的对象,会以恒定速率发送StockTickEvent事件(后面我就简称tick信号了)。其实她工作时,就是不停的产生tick信号,每个tick信号都是一个StockTickInfo对象。tick信号中包含股票的代码和最新股价。更直观的想象就是,StockTicker就是豌豆射手,tick就是射出的子弹。
其实在使用这个类的时候需要考虑很多问题,比如StockTicker可能本身就是利用多个线程一起发送tick信号,作者在后文中特意以这个问题分析了程序可能产生的bug。再比如,后面讲解当以tick信号为元素,建立Observable(可观察序列)时,需要给Observable对象定义泛型,其实就是依据StockTicker中的事件中EventHandler包含的泛型。所以当读到后面的时候需要知道,之所以这样都是因为StockTicker类的定义造成的。
StockTick 类保存股票的信息,如股票代码和价格。
清单 2.2 StockTick 类
class StockTick { //股票代码 public string QuoteSymbol { get; set; } //成交价 public decimal Price { get; set; } //其他属性 }
一般情况下,会用到传统的 .NET 事件。在.NET中,当需要向程序提供通知时,都会如此,算是一种标准方式。为了处理股票价格变动,你将创建一个 StockMonitor 类,通过使用 += 运算符连接到 StockTick 事件,以便监听股票变动。
清单 2.3 StockMonitor 类
class StockMonitor { public StockMonitor(StockTicker ticker) { ticker.StockTick += OnStockTick; //每次事件发生时,都会调用 OnStockTick 方法。 } //其余代码... }
示例的核心功能是 OnStockTick 方法。在该方法中,你将检查每一次股价变动是否存在上次交易价格,以便比较新旧价格。为此,你需要一个容器,来容纳关于前一个刻的信息。由于每个 tick(最新股票成交信息) 都包含 QuoteSymbol(股票代码),因此可以使用字典来保存这些信息,并将 QuoteSymbol(股票代码) 作为键值。为了保存前一次的信息,你需要定义一个名为 StockInfo 的新类(清单 2.4),然后在 StockMonitor 类中声明字典成员(清单 2.5)。
清单 2.4 StockInfo class
class StockInfo { public StockInfo(string symbol, decimal price) { Symbol = symbol; PrevPrice = price; } public string Symbol { get; set; } public decimal PrevPrice { get; set; } }
每次调用 OnStockTick 时,程序需要检查,是否已将旧成交价保存到字典中。你可以使用 TryGetValue 方法,如果从字典中找到键存在,该方法就会返回 true,然后就可以用该键值进行查询,并使用 out 关键字将字典中的值对象返回。
清单 2.5 OnStockTick 事件处理程序,判断股票的StockInfo是否存在
Dictionary<string,StockInfo> _stockInfos=new Dictionary<string, StockInfo>(); void OnStockTick(object sender, StockTick stockTick) { StockInfo stockInfo ; var quoteSymbol = stockTick.QuoteSymbol; var stockInfoExists = _stockInfos.TryGetValue(quoteSymbol, out stockInfo); ... }
如果存在股票的StockInfo,你可以检查股票的当前价格和之前价格,如下表所示,检测变化率是否大于剧烈波动的阈值。
清单 2.6 OnStockTick 事件处理程序,处理价格剧烈波动
const decimal maxChangeRatio = 0.1m; ... var quoteSymbol = stockTick.QuoteSymbol; var stockInfoExists = _stockInfos.TryGetValue(quoteSymbol, out stockInfo); if (stockInfoExists) { //由于 stockInfoExists 为 true,因此可以确定 stockInfo 非空。 var priceDiff = stockTick.Price-stockInfo.PrevPrice;//stockInfo 变量中包含了股票的相关信息; var changeRatio = Math.Abs(priceDiff/stockInfo.PrevPrice);//计算变化率 if (changeRatio > maxChangeRatio) { //将股票的相关信息推送给用户或者显示在屏幕上 Console.WriteLine("Stock:{0} has changed with {1} ratio, Old Price:{2} New Price:{3}", quoteSymbol, changeRatio, stockInfo.PrevPrice, stockTick.Price); } _stockInfos[quoteSymbol].PrevPrice = stockTick.Price;//保存价格给下次使用. }
如果股票信息不在字典中(例如第一次收到该股票信息),你需要将其添加到字典中,方法是
_stockInfos[quoteSymbol]=new StockInfo(quoteSymbol,stockTick.Price);
当不再需要更新时(例如,当用户决定停止接收通知或关闭页面时),你需要使用 -= 操作符从事件中取消注册。但应该在哪里取消注册呢?一种方法是在 StockMonitor 类中创建一个方法,当你想停止时可以调用该方法。不过幸运的是,.NET 提供了一种处理此类 "清理 "的机制,只要实现 IDisposable 接口就行,该接口包含用于释放资源的 Dispose 方法。在 StockMonitor 中是这样的:
public void Dispose()
{
_ticker.StockTick -= OnStockTick;
_stockInfos.Clear();
}
完整代码如清单 2.7 所示。用以下输入信息运行程序:
Symbol: "MSFT" Price: 100
Symbol: "INTC" Price: 150
Symbol: "MSFT" Price: 170
Symbol: "MSFT" Price: 195
会得到如下结果:
Stock:MSFT has changed with 0.7 ratio, Old Price:100 New Price:170
Stock:MSFT has changed with 0.15 ratio, Old Price:170 New Price:195.5
清单 2.7 StockMonitor 完整代码
class StockMonitor : IDisposable { private readonly StockTicker _ticker; Dictionary<string, StockInfo> _stockInfos = new Dictionary<string, StockInfo>(); public StockMonitor(StockTicker ticker) { _ticker = ticker; ticker.StockTick += OnStockTick;//注册监听,监听股票价格的变化 } void OnStockTick(object sender, StockTick stockTick) { const decimal maxChangeRatio = 0.1m; StockInfo stockInfo; var quoteSymbol = stockTick.QuoteSymbol; //检查应用程序中是否已有股票价格信息 var stockInfoExists = _stockInfos.TryGetValue(quoteSymbol, out stockInfo); if (stockInfoExists) { //计算价格变化的百分比,看是否超过 10% var priceDiff = stockTick.Price - stockInfo.PrevPrice; var changeRatio = Math.Abs(priceDiff / stockInfo.PrevPrice); if (changeRatio > maxChangeRatio) { Debug.WriteLine("Stock:{0} has changed with {1} ratio OldPrice:{2} newPrice:{3}", quoteSymbol, changeRatio, stockInfo.PrevPrice, stockTick.Price); } //存储新的股票价格 _stockInfos[quoteSymbol].PrevPrice = stockTick.Price; } else { //如果这是第一次获得有关该股票的信息,直接将其存储起来。 _stockInfos[quoteSymbol] = new StockInfo(quoteSymbol, stockTick.Price); } } //释放资源并取消注册事件监听。此后,你将不会再收到任何通知。 public void Dispose() { _ticker.StockTick -= OnStockTick; _stockInfos.Clear(); } }
彭尼先生很满意,股票反斗城的员工正在使用该应用程序,效果已经在他们的报告中显示出来。该应用程序接收股票更新信息,可以计算新旧价格之间的差额比率,并向用户发出警报。
就像生活中的事物一样,变化是不可避免的,股票反斗城决定改变其股票信息源。幸运的是,你使用 StockTicker 类对信息源进行了抽象,因此 StockTicker 是唯一需要更改的类。
源更改后,你开始收到关于崩溃和其他错误的投诉,如警报丢失或不必要的警报。于是你开始调查这个问题,发现它与并发性有关。
2.1.1 处理并发问题
代码中隐藏着一个问题:并发性。在StockTicker中没有明确指明事件将在哪个线程中触发,也没有类似的机制,去保证在 StockMonitor 处理选事件时不会触发另一事件,如图 2.2 所示。
图 2.2 多个线程同时执行事件处理程序代码。每个方框代表一只股票的执行时间。当第一个线程为 MSFT 运行代码时,第二个线程开始为 GOOG 股票执行代码。然后第三个线程开始执行与第一个线程相同的股票代码。
线程安全
编程中的线程安全是指,当一个以上的线程调用代码时,无论这些线程执行代码的顺序如何,代码都能正确运行,而且无需同步调用代码。
如果一个类的任何一个方法都是线程安全的,即使不同的方法,同时被不同的线程调用,那么这个类也被称为线程安全的类。这通常意味着,类内部的数据结构受到保护,不会被同时修改。
字典对象支持被多个读取器同时读取,但读取的同时还要去更改,就会出现异常。这种情况如表 2.1 所示。线程 1(左侧)到达标记代码(代码行号2),试图获取股票号为 symbol1 的股票的 StockInfo。与此同时,线程 2(在右侧)到达了向字典添加新股票信息(股票号为 symbol2)的代码行(代码行号6)。字典的读取和更改同时进行,导致了异常。
线程 1 | 线程 2 | ||
---|---|---|---|
1 | : | ||
2 | var stockInfoExists = _stockInfos.TryGetValue(symbol2,out stockInfo); | ||
3 | if (stockInfoExists) | ||
4 | {…} | ||
1 | : | 5 | else{ |
2 | var stockInfoExists = _stockInfos.TryGetValue(symbol1, out stockInfo); | 6 | _stockInfos[symbol2] = new StockInfo(symbol2, price); |
3 | if (stockInfoExists) | 7 | } |
4 | {…} | ||
5 | else{ | ||
6 | _stockInfos[symbol1] = new StockInfo(symbol1, price); | ||
7 | } |
使用 .NET ConcurrentDictionary 可以解决这个问题。这种免锁(lock-free)集合在内部同步读取器和写入器,因此不会抛出异常。
不幸的是,仅仅使用 ConcurrentDictionary 还不够,因为 StockTicker 并没有同步tick(最新股票成交信息) 。如果同时处理同一股票的两个(或多个)tick,那么 PrevPrice 属性的值是多少?这个问题的答案是:以最后一个为准。但有有一个不确定性,这里的最后一个,并不一定是理论上的最后一个股票成交价,因为线程运行的顺序是由操作系统决定的,并不具有确定性。这就使得你的代码不可靠,最终用户可能因此收到你代码计算出的错误结论和通知。其实OnStockTick 事件处理程序中,包含了一个 关键部分(critical section),保护这关键部分 的方法是使用锁。
清单 2.8 带锁版的OnStockTick
object _stockTickLocker = new object();//互斥锁对象,可以用在需要锁处理的代码部分 void OnStockTick(object sender, StockTick stockTick) { const decimal maxChangeRatio = 0.1m; StockInfo stockInfo; var quoteSymbol = stockTick.QuoteSymbol; //本线程运行时,保证另一线程不能进入本线程的 *关键部分*。 //如果另一个线程试图进入锁定的代码,它将阻塞直到本线程将对象释放。 lock (_stockTickLocker) { var stockInfoExists = _stockInfos.TryGetValue(quoteSymbol, out stockInfo); if (stockInfoExists) { var priceDiff = stockTick.Price - stockInfo.PrevPrice; var changeRatio = Math.Abs(priceDiff/stockInfo.PrevPrice); if (changeRatio > maxChangeRatio) { Debug.WriteLine("Stock:{0} has changed with {1} ratio OldPrice:{2} newPrice:{3}", quoteSymbol, changeRatio, stockInfo.PrevPrice, stockTick.Price); } _stockInfos[quoteSymbol].PrevPrice = stockTick.Price; } else { _stockInfos[quoteSymbol] = new StockInfo(quoteSymbol, stockTick.Price); } } }
在很多情况下,使用锁是一种完美的解决方案。但是,当你开始在应用程序中的不同位置添加锁时,你可能会最终导致性能下降,因为锁会增加执行时间以及线程等待关键部分可用的时间。更严重的问题是,锁可能导致应用程序陷入死锁,如图 2.3 所示。每个线程都持有另一个线程需要的资源,而与此同时,每个线程都在等待另一个线程持有的资源。
处理多线程程序是非常困难的,也没有万能的解决方案。唯一合理的做法是让运行多线程的代码更容易理解,借此,让产生并发错误的事情更少发生。
Rx 提供了运行并发代码的处理器,你将在本章后面看到。现在,让我们退后一步,看看你已经创建的代码,并对其进行分析,看看是否可以做得更好。
2.1.2 解决方案回顾与未来展望
至此,我们的代码实现了彭尼先生在本章开头描述的要求。从功能上看,代码完成了它需要做的一切。但你对它的感觉如何?可读性强吗?可维护性如何?是否易于扩展?我想提醒你注意以下几点。
代码分散
让我们从代码分散开始说起。众所周知,分散的代码会增加程序的维护、审查和测试难度。在我们的示例中,程序的主要逻辑在 OnStockTick 事件处理程序中,而该处理程序的代码块与事件注册的代码块 “相距甚远”:
Class StockMonitor
{
//注册事件监听
public StockMonitor(StockTicker ticker)
{
...
ticker.StockTick += OnStockTick;
}
//处理事件
void OnStockTick(object sender, StockTick stockTick)
{
...
}
//取消注册和清理.
public void Dispose()
{
...
}
}
处理多个事件(甚至更多事件)的类很常见,每个类都有自己的事件处理程序,你可能会开始分不清哪些事件与哪些事件相关:
what:
class SomeClass
{
public SomeClass(StockTicker ticker)
{
...
//监听事件,初始化对象.
eventSource.event1 += OnEvent1;
...
eventSource.event2 += OnEvent2;
...
eventSource.event3 += OnEvent3;
...
}
//每个事件都有一个事件处理程序,比如 OnEvent1, OnEvent2, OnEvent3;
//其中一些处理程序可能还与其他事件相关联。
//同时,这里还可能出现类本身提供功能的成员函数或方法的代码,且不一定和事件处理有关。
void OnEvent1(object sender, EventArgs args)
{
...
}
//Other methods
void OnEvent2(object sender, EventArgs args)
{
...
}
void OnEvent3(object sender, EventArgs args)
{
...
}
public void Dispose()
{
...
}
}
很多时候,开发人员会选择将事件处理程序注册更改为 lambda 表达式,例如
anObject.SomeEvent += (sender, eventArgs)=>{...};
虽然你将事件处理程序逻辑移到了注册监听的代码中,但却给资源清理添加了一个错误。如何取消注册?-= 操作符希望你取消注册的是同一个委托。lambda 表达式只能按以下方式取消注册:
eventHandler = (sender, eventArgs)=>{...};
anObject.SomeEvent += eventHandler;//注册监听
:
anObject.SomeEvent -= eventHandler;//取消监听
这看起来不干净,所以如果需要取消注册,需要将 eventHandler 保存为成员,这就引出了下一个问题。
资源处理
从事件中取消注册,以及其他为资源清理(如字典)而添加的代码,都是在 Dispose 方法中完成的。这是一种很好用的模式,但开发人员经常忘记释放代码使用的资源。尽管 C# 和 .NET 作为一个整体是可管理的并使用垃圾回收,但很多时候你仍然需要正确地释放资源,以避免内存泄漏和其他类型的错误。事件经常被注册,这是造成内存泄漏的主要原因之一。原因是(至少对某些人来说),对于许多开发人员来说,我们取消注册的方式并不自然,而且决定取消注册的正确位置和时间并不总是很直接,尤其是因为许多开发人员更喜欢使用 lambda 风格来注册事件,正如我之前所说的那样。除了事件本身,你还添加了代码和状态管理(如我们的字典)来支持你的逻辑。还有更多类型的应用程序会处理相同的场景,例如过滤、分组、缓冲,当然还有清理它们带来的内容。这就引出了下一点。
可重复性和可嵌套性
在我看来,程序的逻辑也会显露出重复性。我敢说我以前在某个应用程序中写过这段代码(或类似的代码),通过关键字保存先前的状态,每当有新的更新时就对其进行更新。我敢打赌你也会有相同的感受。此外,我还觉得这种代码不太容易压缩,条件越多,内部的 if 语句就越多,从而降低了代码的可读性。这样的代码在应用程序中非常常见,而且由于其类似箭头的结构,让人难以理解和跟踪它的功能:
if (some condition)
{
if (another condition)
{
if (another inner condition)
{
//some code
}
}
}
else
{
if (one more condition)
{
//some code
}
else
{
//some code
}
}
嵌套(Composition)
嵌套是指用较简单的结构组成复杂结构的能力。
这一定义与数学中的定义类似,即可以用一组其他函数组成一个复杂表达式:
f ( x ) = x 2 + s i n ( x ) f(x) = x2 + sin(x) f(x)=x2+sin(x)
嵌套允许我们使用一个函数作为另一个函数的参数:
g ( x ) = x + 1 g(x) = x + 1 g(x)=x+1f ( g ( x ) ) = ( x + 1 ) 2 + s i n ( x + 1 ) f(g(x)) = (x + 1)2 + sin(x + 1) f(g(x))=(x+1)2+sin(x+1)
在计算机科学中,我们使用嵌套简单函数的方式表达复杂的代码。这使我们能够进行更高的抽象,并将注意力集中在代码的目的上,而不是细节上,从而更容易掌握。
如果对你的代码提出新的要求,例如通过查看两个以上的连续事件来计算变化率,那么你的代码将不得不发生巨大的变化。如果新的要求是基于时间的,例如在一个时间间隔内查看变化率,那么变化会更大。
同步
同步是开发人员容易忘记的另一件事,它导致了与我们同样的问题:由于计算值不当而导致代码不可靠,以及在使用非线程安全类时可能发生的崩溃。同步就是确保多个线程在同一时间(实际上不一定是并行的,因为可能涉及上下文切换)访问相同的代码时,只有一个线程可以访问。锁是实现同步的一种方法,但也存在其他方法,而且需要知识和谨慎。
编写线程不安全的代码很容易,但编写带锁的代码更容易导致死锁或饥饿。这类错误的主要根源是很难发现。你的代码可能会运行很久(真的),直到你遇到崩溃或其他错误。
这么小的程序就会出现这么多问题,难怪人们会说编程很难。现在是时候看看 Rx 的伟大之处,看看它是如何让我们将之前讨论过的问题变得更加容易。让我们来看看 Rx 的方法,并开始将 Rx 添加到你的应用程序中。
2.2 创建第一个 Rx 应用程序
在本节中,Rx 示例使用了你在上一节中看到的 StockTicker,但这次你将不再使用传统的 .NET 事件。取而代之的是使用 IObservable,你将创建 IObservable,然后围绕它编写事件处理流程。你将慢慢地为解决方案添加一层又一层,直到你拥有一个易于阅读和扩展的完整运行应用程序。
万事开头难。你将通过创建一个新项目(控制台应用程序即可)并添加 Rx 库来开始这段旅程。
2.2.1 选择 Rx 软件包
使用 Reactive Extensions 的第一步是将库添加到项目中。无论你编写的是 Windows Presentation Foundation (WPF) 应用程序、ASP.NET 网站、Windows Communication Foundation (WCF) 服务,还是简单的控制台应用程序,都可以在代码中使用 Rx,使你受益匪浅。但你需要从项目中选择正确的引用库。
图 2.4 Rx 程序集是一组可移植的类库(中间和底部绿色)和平台专用库(左上角橘色)。PlatformServices(右侧蓝色竖着的) 程序集包含平台启动项,是两者之间的粘合剂。
Rx 库以一组可移植类库(PCL)2 和特定于平台的提供程序的形式部署,你可根据项目平台安装这些程序。如图 2.4 所示。
要在项目中添加必要的引用,需要从 NuGet 中选择适当的软件包。NuGet 是一个 .NET 软件包管理器,可以轻松搜索和安装软件包(通常包含库)。表 2.2 介绍了本文撰写时可供选择的 Rx 软件包,图 2.5 显示了 NuGet 软件包管理器。
图 2.5 反应扩展 的NuGet 软件包。许多软件包都在 Rx的基础上添加了一些内容,以标识特定于 Rx.NET 的库。请查找带有 System.Reactive 前缀的软件包 ID,并确保发布者是 Microsoft。
注意 Rx 3.0 于 2016 年 6 月发布,为 .NET Core 和通用 Windows 平台(UWP)添加了 Rx 支持。Rx.NET 还加入了 .NET Foundation (www.dotnetfoundation.org/projects)。为了与 .NET Core 使用的命名约定保持一致,Rx 包已被重新命名以匹配其库名称,并且之前的 Rx 包现在已在 NuGet 库中隐藏起来。
包名 | 描述 |
---|---|
System.Reactive.Interfaces (Rx-Interfaces prior to Rx 3.0) | 安装 System.Reactive.Interfaces 程序集,该程序集仅包含其他 Rx 软件包依赖的接口。 |
System.Reactive.Core (Rx-Core prior to Rx 3.0) | 安装 System.Reactive.Core 程序集,其中包括调度程序、可处置程序等的可移植实现。 |
System.Reactive.Linq (Rx-Linq prior to Rx 3.0) | 安装 System.Reactive.Linq 程序集。这是实现查询处理器的地方。 |
System.Reactive.PlatformServices (Rx-PlatformServices prior to Rx 3.0) | 安装 System.Reactive.PlatformServices 程序集。它是可移植和非可移植 Rx 软件包之间的粘合剂。 |
System.Reactive (Rx-Main prior to Rx 3.0) | 这是 Rx 的主要软件包,大多数情况下都要安装它。它包括 System.Reactive.Interfaces、System.Reactive.Core、System.Reactive.Linq 和 System.Reactive.PlatformServices(将使用的特定启迪提供者取决于项目平台)。 |
System.Reactive.Providers (Rx-Providers prior to Rx 3.0) | 将 System.Reactive.Providers 与 System.Reactive.Core 软件包一起安装。该包添加了 IQbservable LINQ API 处理器,以便在事件树上创建表达式树,从而使查询提供程序能转换为目标查询语言。这是 Rx IQueryable 的对应程序。 |
System.Reactive.Windows.Threading (Rx-Xaml prior to Rx 3.0) | 与 System.Reactive.Core 包一起安装 System.Reactive.Windows .Threading 程序集。当你需要为任何支持 XAML 的平台(WPF、Silverlight、Windows Phone 和 Windows Store 应用程序)添加 UI 同步类时,请使用此包。 |
System.Reactive.Runtime.Remoting (Rx-Remoting prior to Rx 3.0) | 将 System.Reactive.Runtime.Remoting 与 System.Reactive.Core 软件包一起安装。使用此软件包可为 .NET Remoting 添加扩展,并将其作为 observable 序列公开。 |
System.Reactive.Windows.Forms / System.Reactive.WindowsRuntime (Rx-WPF/Rx-Silverlight/ Rx-WindowsStore/ Rx-WinForms prior to Rx 3.0) | 特定于平台的软件包子集。为平台类型添加用户界面同步类和 Rx 实用程序(如 WinRT 中的 IAsyncAction 和 IAsyncOperationWithProgress)。 |
Microsoft.Reactive.Testing (Rx-Testing prior to Rx 3.0) | Rx 测试库可以编写反应式单元测试。附录 C 包括反应式单元测试的解释和示例。 |
System.Reactive.Observable.Aliases (Rx-Aliases prior to Rx 3.0) | 为一些查询处理器提供别名(如 Map、FlatMap 和 Filter)。 |
大多数情况下,你会将 System.Reactive 包添加到项目中,因为它包含了最常用的类型。当你为特定平台或技术编写代码时,你会添加补充软件包3。
2.2.2 从 NuGet 安装
决定需要哪个软件包后,你可以通过软件包管理器对话框,或软件包管理器控制台进行安装。要使用软件包管理器控制台,请选择工具 > NuGet 软件包管理器 > 软件包管理器控制台。在控制台中,从默认项目下拉列表中选择目标项目,如图 2.6 所示。
在控制台中,编写所需包的安装命令:
Install-Package [Package Name]
图 2.6 通过软件包管理器控制台安装 Rx 库。确保从默认项目下拉列表中选择正确的项目进行安装。也可以通过键入 -ProjectName [项目名称] 来定义项目。
图 2.7 VS 2015 中的 NuGet 软件包管理器。输入软件包名称 来搜索想要的软件包,然后选择软件包并点击安装 。
安装软件包的另一个选择是通过软件包管理器对话框,如图 2.7 所示。这个用户界面能让你以更友好的方式搜索软件包并查看其信息。右键单击项目,选择 “管理 NuGet 软件包”。键入软件包名称,从下拉列表中选择要安装的软件包,然后单击 “安装”。
安装 NuGet 软件包后,你就可以编写 Rx 版本的 Stock-Monitor。你可以在 GitHub 存储库中找到本书的源代码:http://mng.bz/18Pr。
使用 .NET Core 运行示例的说明
微软最近宣布,我在这里描述的格式已被弃用(但在过渡期间将得到支持)。微软建议使用普通的 csproj 文件和新的 MSBuild 附加功能(例如 PackageReference)。要使用 .NET Core,首先需要从 www.microsoft .com/net/core 安装最新版本。然后,在你最喜欢的工具中创建一个新项目,如 Visual Studio 2015 或 Visual Studio Code (https://code.visualstudio.com/docs/runtimes/dotnet)。
更新 project.json 文件中的 depen-dencies 部分,添加对 System.Reactive NuGet 包的引用,如图所示:
{ "version": "1.0.0-*", "buildOptions": { "debugType": "portable", "emitEntryPoint": true }, "dependencies": { "System.Reactive": "3.0.0" }, "frameworks": { "netcoreapp1.0": { "dependencies": { "Microsoft.NETCore.App": { "type": "platform", "version": "1.0.0" }, }, "imports": "dnxcore50" } } }
最后,在命令提示符下运行 dotnet restore 命令。现在你就拥有了一个配置好的 Rx 项目。
2.3 编写事件处理流
安装 Rx 软件包并将所需引用添加到 Rx 库后,就可以开始构建应用程序了。要开始创建事件处理流程,你需要事件源。在 Rx 中,事件源(如果你愿意,也可以称为 “发布者”)是实现 IObservable 接口的对象。
概括地说,IObservable 接口定义了给Observer(观察者)订阅通知的方法—— Subscribe 。Observer(观察者)由 IObserver 接口实现,该接口定义了产生通知时Observer(观察者)被调用的方法。
Rx 提供了将各种类型的源转换为 IObservable 的工具,其中最基本的工具就是将标准 .NET 事件转换为 Observable(可观察序列)的工具。
在我们之前创建的示例中(即检测股票大幅波动预警的程序),你将继续使用 StockTick 事件。并将看到如何将其转换为Observable(可观察序列),并用它来创造奇迹。
2.3.1 订阅事件
StockTicker 公开了事件 StockTick,每次股票更新时都会触发该事件。但要使用 Rx,需要将此事件转换为 Observable(可观察序列)。幸运的是,Rx 提供了 FromEventPattern 方法,使你能够做到这一点:
IObservable<EventPattern<StockTick>> ticks =
Observable.FromEventPattern<EventHandler<StockTick>, StockTick>
(
h => ticker.StockTick += h,//lambda 表达式注册事件监听
h => ticker.StockTick -= h //lambda 表达式取消事件监听
)
图 2.8 FromEventPattern 函数解释
/* TDelegate 可注册事件的委托类型,在我们的例子中是 EventHandler<StockTick> TEventArgs 事件携带的 EventArgs 的类型 - 在我们的例子中为 StockTick */ FromEventPattern<TDelegate, TEventArgs> ( //委托(在我们的示例中是一个 lambda 表达式),用于从事件中注册 Rx 事件处理程序。 Action<TDelegate> addHandler, //委托(在我们的示例中是 lambda 表达式),用于从事件中取消注册 Rx 事件处理程序。 Action<TDelegate> removeHandler )
FromEventPattern 方法有几个重载。这里使用的方法需要两个泛型参数和两个方法参数。图 2.8 显示了方法签名说明。
addHandler 和 removeHandler 参数注册和取消注册事件的 Rx 处理程序;Rx 处理程序将被事件调用,然后将调用 observable 的 OnNext 方法。
解包事件
ticks 变量是一个 IObservable<EventPattern > 类型的 Observable(可观察序列)。每当事件发生时,Rx的处理程序都会被调用,并将事件和事件源封装为一个 EventPattern 对象,并把EventPattern 对象传递给Observer(观察者)的OnNext方法。由于我们只关心每个通知中的 StockTick类型对象(也就是EventArgs变量,它是EventPattern类中的一个成员 ),因此可以添加select处理器来转换通知,并从通知中分离出EventArgs(也就是StockTick ),最终只有 StockTick 会被推送到数据流中:
var ticks = Observable.FromEventPattern<EventHandler<StockTick>, StockTick>(
h => ticker.StockTick += h、
h => ticker.StockTick -= h)
.Select(tickEvent => tickEvent.EventArgs) //给Select传参一个delegate(委托)(如 lambda 表达式),该delegate(委托)接收输入通知并返回你感兴趣的值--本例中为StockTick对象。
译者注:这里解释一下,作者其实想说ticks本身是一个Observable(可观察序列),在程序中的类型是IObservable,所以ticks是一个集合,之前章节中详细说过。
那ticks这个集合中的每一个元素是什么呢?是EventPattern对象。再次强调,是EventPattern对象,不是StockTick对象。区别在哪呢?
EventPattern对象包含事件和事件源两个东西。在本例中,事件就是StockTick对象。事件源这里没说,应该是前文说的StockTicker类的某个实例。EventPattern把事件存储在成员EventArgs中,在本例中,EventArgs就是StockTick类型对象,这从代码EventPattern 中可以看出来出来。
然后作者说,后面的Observer(观察者)或者叫监听者。他们非常挑剔,只接受StockTick类型的通知,不接受乱七八糟的EventPattern。所以为了让Observer能得StockTick类型的对象。就借用Select函数,也叫Select处理器。用它对ticks中的元素进行转换。转换过程就是把每个EventPattern对象中的EventArgs拿出来,并以EventArgs作为通知元素传递给Observer。
读到这里,我有个比较形象的比喻,ticks就是一条工作中的传送带,随着StockTicker 类不停的触发StockTick事件(注意这里的StockTick单词是事件event,不是StockTick类,千万別搞混了),流水线上便出现了一个个玉米,但是这些玉米带叶子(类似EventPattern),当玉米经过一个叫select处理器的黑盒子后,玉米叶子就没了,传送带上就变成了一个个干净的玉米(类似StockTick)。而继续顺着传送带往后观察,你会发现,后面会有人拿起干净的玉米再进行处理,至于他们是要煎炒烹炸咱就不知道了。反正这些厨子都叫Observer。他们只愿意接受和处理干净的玉米。
2.3.2 按股票号分组
现在,你已经有了一个携带 ticks(股价变更信息)的 Observable(可观察序列),可以开始围绕它编写查询了。首先要做的是按股票代码(QuoteSymbol)对ticks进行分组,这样就可以分别处理每个组(即每只股票)。如图 2.9 所示,使用 Rx 可以轻松完成这项任务。
图 2.9 按QuoteSymbol对股票ticks进行简单分组
from tick in ticks group tick by tick.QuoteSymbol into company
该表达式创建了一个 Observable(可观察序列),其中存储了多个组。每个组代表一家公司的股票,也就是只推送该公司股票的 Observable(可观察序列)。来自 ticks数据源的每个 tick 都会根据其股票代码(QuoteSymbol)转到对应的组。如图 2.10 所示。
图 2.10 将 ticks Observable(可观察序列)分成两个公司组,每个公司组对应一个股票代码。当通知推送到 ticks 观察对象时,它们会被转发到各自的组。如果股票代码是首次出现,则会为该组创建一个新的 Observable(可观察序列)。
这种分组是用Select表达式编写的。Select表达式是用声明式写法,本身是一种糖语法,编译器会将其转化为真正的方法调用链。这是用方法语法编写的相同表达式:
ticks.GroupBy(tick => tick.QuoteSymbol);
2.3.3 找出tick之间的差异
为了发现剧烈波动,下一步是比较两个连续的tick,看它们之间的差值是否大于特定的比率。为此,你需要一种方法来分批次(Batch)处理组内的tick,以便将两个tick放在一起。分批处理的方式应该是,两个连续的批次将包含一个共享的tick;一个批次的尾tick,将是下一批次的首tick。图 2.11 显示了这种批处理的一个示例。
图 2.11 Ticks 被分批。每个批次有两个tick;连续两个批次有一个共享tick。
要在 Observable序列上创建批次,需要使用Buffer处理器。Buffer 处理器的两个参数分别表达,一个批次包含的tick数(本例中为两个),以及在创建新批次前,需要的位移量。你需要在创建新批次之前位移一个tick,从而使两个批次共享一个tick。你需要对每组都应用 Buffer 方法,具体步骤如下:
company.Buffer(2, 1)
如图 2.12 所示,Buffer 方法输出一个数组,该数组保存两个连续的tick。这样,你就可以计算两个tick之间的差值,以确定是否在允许的阈值范围内。
图 2.12 对每个组应用 Buffer(…) 方法后,你将获得一种新的通知类型,该通知类型是一个tick类型数组,其中保存了两个连续的tick。
通过使用 Let 关键字,Rx 可以将变化率计算结果保存在一个变量中,该变量将在 Observable(可观察序列)中进行运算:
from tick in ticks
group tick by tick.QuoteSymbol into company
from tickPair in company.Buffer(2, 1)
let changeRatio = Math.Abs((tickPair[1].Price - tickPair[0].Price) /
tickPair[0].Price)
这个代码片段包含了到目前为止的所有步骤。在company上应用Buffer创建了一个新的Observable(可观察序列),该Observable(可观察序列)每次缓冲两个 tick。可以使用 from … in … 语句观察其通知。每个通知都由 tickPair 变量表示。
然后引入 changeRatio 变量,该变量表示两个tick之间的变化比率;如图 2.13 所示,该变量将在 Observable(可观察序列)中向后传递。
图 2.13 根据每个company组的每对连tick,计算差值比。
现在你已经知道了变化率,剩下的就是通过应用 Where(…) 处理器过滤掉不感兴趣的通知(非剧烈波动),只保留那些大于设定变化率的通知:
var drasticChanges =
from tick in ticks
group tick by tick.QuoteSymbol
into company
from tickPair in company.Buffer(2, 1)
let changeRatio = Math.Abs((tickPair[1].Price - tickPair[0].Price)/
tickPair[0].Price)
where changeRatio > maxChangeRatio//在 where 处理器中写入通知的判定条件。
select new DrasticChange()
{//根据每个剧烈波动的通知创建一个对象。对象中应包括屏幕提示的属性。
Symbol = company.Key,
ChangeRatio = changeRatio,
OldPrice = tickPair[0].Price,
NewPrice = tickPair[1].Price
};
drasticChanges 变量是一个 Observable(可观察序列),只对股价变化超过 maxChangeRatio 的tick推送通知。在图 2.14 中,最大变化率为 10%。
图 2.14 使用 Where 处理器过滤通知后,你会发现只有一条通知属于剧烈波动。
想要获取剧烈波动通知,你需要订阅 drasticChange Observable(可观察序列)。然后,便可将其打印到屏幕通知用户。
_subscription =//订阅 observable 后产生的订阅对象,允许你取消注册
drasticChanges.Subscribe(change =>
//每个剧烈波动的通知,都会发送到 Subscribe 方法中的 lambda 表达式。参数change就是通知内容。
{
Console.WriteLine($"Stock:{change.Symbol} has changed with
{change.ChangeRatio} ratio,
Old Price:{change.OldPrice}
New Price:{change.NewPrice}");
},
//如果出现问题并抛出异常,或者序列完成时,就需要在下面处理这些情况。
ex => { /* 处理错误的代码 */},
() => {/* 处理observable结束的代码 */});
2.3.4 清理资源
如果用户不想再收到剧烈波动通知,则需要注销对drasticChanges的订阅。在订阅 Observable(可观察序列)时,会返回一个订阅对象,我们将其存储在 _subscription 类成员中。
与之前一样,StockMonitor的Dispose 方法非常适合进行注销(因为你实现了 IDisposable 接口,所以提供了该方法)。你在 Dispose 方法中唯一需要做的就是,调用订阅对象的 Dispose 方法:
public void Dispose()
{
_subscription.Dispose();
}
请注意,在处理你的查询过程中,你无需编写涉及委托的任何内容,也无需清理与存储先前 ticks 数据相关的任何数据结构。所有这些都由 Rx 内部处理器的实现中,当你dispose 订阅时,会产生一个dispose 链,导致所有内部数据结构也被dispose 。
2.3.5 处理并发性
在传统的事件编程中,你需要添加锁代码来处理应用程序中的关键部分代码。因为关键部分代码,允许两个线程同时执行事件处理程序,并同时读取和修改过去的tick集合,从而导致异常或者变化率计算错误。所以添加了一个锁来同步临界区段的访问,这是提供线程间同步的一种方法。
在 Rx 中,将同步添加到执行流程中的声明性更强。只需在开始同步的地方添加同步处理器,Rx 就会处理剩下的工作。在这种情况下,你可以从一开始就添加同步,因此你可以在创建 observable 本身时添加 Synchronize 处理器:
var ticks = Observable.FromEventPattern<EventHandler<StockTick>, StockTick>(
h => ticker.StockTick += h,
h => ticker.StockTick -= h)
.Select(tickEvent => tickEvent.EventArgs)
.Synchronize()//从现在起,执行将同步进行。只有在前一个执行完成后,才会推送通知。
没有比这更简单的了,但和以前一样,你需要记住,每当你添加任何类型的同步时,你都有可能增加一个死锁。Rx 并没有解决这个问题,因此开发人员仍需谨慎。Rx 只是提供了一些工具,让同步的引入变得更容易、更明显。当事情变得简单、明确和可读的时候,你就有更多机会把事情做对,但作为开发人员,确保事情做对仍然是你的工作。
2.3.6 总结
清单 2.9 显示了 Rx 版本的全部代码。与传统事件示例的主要区别在于,代码描述的是你试图实现的目标,而不是如何实现目标。这就是 Rx 所谓基于声明式编程的模型。
class RxStockMonitor : IDisposable
{
private IDisposable _subscription;
public RxStockMonitor(StockTicker ticker)
{
const decimal maxChangeRatio = 0.1m;
var ticks = //创建带有同步器的observable,从 StockTick 事件中推送股票tick。
Observable.FromEventPattern<EventHandler<StockTick>, StockTick>(
h => ticker.StockTick += h,
h => ticker.StockTick -= h)
.Select(tickEvent => tickEvent.EventArgs)
.Synchronize();
//对tick进行分组,并检查连续两个tick之间的差值是否超过阈值。
var drasticChanges =
from tick in ticks
group tick by tick.QuoteSymbol
into company
from tickPair in company.Buffer(2, 1)
let changeRatio = Math.Abs((tickPair[1].Price -tickPair[0].Price)
/tickPair[0].Price)
where changeRatio > maxChangeRatio
select new
{
Symbol = company.Key,
ChangeRatio = changeRatio,
OldPrice = tickPair[0].Price,
NewPrice = tickPair[1].Price
};
//订阅带有激烈波动通知的observable,在屏幕上显示警报。
//同时处理出错情况和 observable 序列完成时的情况。
_subscription =
drasticChanges.Subscribe(change =>
{
Console.WriteLine("Stock:{change.Symbol} has changed
with {change.ChangeRatio} ratio,
Old Price: {change.OldPrice}
New Price: {change.NewPrice}");
},
ex => { /* 错误处理代码 */},
() =>{/* observable完结处理代码 */});
}
public void Dispose()
{
_subscription.Dispose();
}
}
保持代码紧凑
在 Rx 示例中,所有与关于发现剧烈波动的逻辑的代码,都编写在同一个地方——从将事件转换至observable,再到订阅通知并打印信息到屏幕。所有代码都在同一个方法中,这使得浏览解决方案变得更加容易。这是一个很小的案例,尽管所有代码都放在一起,但方法体并没有多么庞大。相比之下,传统事件版本是将代码及其数据结构分散在类中的各个地方。
提供更好、更少的资源处理
Rx 版本几乎不需要处理任何资源,那些需要释放的资源会通过调用 Dispose 释放。你并不知道 Rx 流水线创建了哪些真正的资源,因为这些资源被很好地封装在了处理器的实现中。需要管理的资源越少,代码管理资源的能力就越强。这与传统的事件版本恰恰相反,在传统版本中,你需要添加所涉及的每一个资源,并对其生命周期进行管理,这使得代码容易出错。
使用可嵌套的处理器
计算机科学中最难的问题之一就是命名(比如方法、类等等)。但是,当你给某样东西起了一个好名字后,以后使用它的过程就会变得简单流畅。这正是 Rx 处理器的作用。Rx 处理器是一种重复命名的代码模式,可以减少代码中的重复性,否则你就不得不自己编写代码,也就是说,现在你可以编写更少的代码,并重复使用现有的代码。在对 observable 构建查询的每一步中,你都在之前构建的表达式上添加了一个新的处理器;这就是可嵌套性的最佳体现。可嵌套性使将来扩展查询和在构建查询时进行调整变得容易。这与传统的事件版本相反,在传统的事件版本中,构建整个过程时处理每个步骤的代码片段之间没有明确的分隔,因此无法发现剧烈波动的变化。
执行同步
Rx 有几个专门用于并发管理的处理器。在本例中,你只使用了 Synchronize处理器,正如之前关于 Rx 处理器的简单说明,它让你在使用锁时更不容易犯错。默认情况下,Rx 不会在线程间执行任何同步操作(这一点与常规事件编程相同)。但当需要采取行动时,Rx 可以让开发人员简单地添加同步,而无需使用底层同步原语,从而使代码更具吸引力。
2.4 小结
本章介绍了一个简单但功能强大的示例,这功能你过去可能做过(或将来可能会发现自己做过),并用两种方法解决了这个问题:传统的事件风格和 Rx 风格的事件处理流程。
-
在 .NET 中编写事件驱动程序非常直观,但在资源清理和代码可读性方面需要注意。
-
要使用 Rx 库,你需要安装 Rx 软件包。最常见的是安装 System.Reactive 软件包。
-
你可以在任何类型的程序: WPF 桌面客户端、ASP.NET 网站或简单的控制台应用程序等中使用 Rx。
-
传统的 .NET 事件可以转换为 Observable。
-
Rx 允许你在 Observable的基础上编写查询表达式。
-
Rx 提供了许多查询处理器,如使用 Where 处理器进行过滤、使用 Select 处理器进行转换等。
当然,这还没有结束。这只是旅程的开始。要在应用程序中正确使用 Rx并使用所有丰富的处理器,你需要了解它们以及将它们组合在一起的技巧,这就是本书的全部内容。在下一章中,你将了解函数式思维方式,这种思维方式与 .NET 内部的核心概念相结合,使 Rx 得以发展。