C# .NET 中的反应式系统

概述:反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)了解反应式系统反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:响应式:系统及时响应。弹性:系统在面对故障时保持响应。弹性:系统在不同的工作负载下保持响应。消息驱动:系反应式系统已成为构建健壮、可扩展和响应迅速的应用程序的强大范式。

这些系统被设计为更具弹性、弹性和消息驱动性,确保它们在各种条件下保持响应,包括高负载、网络延迟和故障。

在本文中,我们将探讨 .NET 生态系统中反应式系统的概念,利用 (Rx.NET) 和 来说明一个复杂的用例:用于实时监控和分析股票市场数据的实时仪表板。Reactive ExtensionsActor model (Akka.NET)

了解反应式系统

反应式系统旨在对事件、负载、故障甚至用户做出反应。反应性宣言概述了四个关键特征:

  • 响应式:系统及时响应。

  • 弹性:系统在面对故障时保持响应。

  • 弹性:系统在不同的工作负载下保持响应。

  • 消息驱动:系统依靠异步消息传递来确保松耦合、隔离和位置透明。

实时股市仪表板

想象一下这样的场景:金融机构希望为其用户提供一个实时仪表板,该仪表板显示股票市场趋势、重大股票变动警报并提供实时分析。

此应用程序需要处理大量数据,有效地处理数据,并立即更新用户界面。它是反应式系统的完美候选者。

工具:Rx.NET 和 Akka.NET

为了应对这一挑战,我们将使用两个强大的库:

  • **Rx.NET (Reactive Extensions for .NET):**一个库,用于使用可观察序列和 LINQ 样式的查询运算符编写异步和基于事件的程序。

  • Akka.NET:一个开源工具包和运行时,用于在 .NET 上构建高度并发、分布式和容错的事件驱动应用程序。

在 .NET 中构建反应式系统 — 图片来源:由 Author 创建

构建我们的应用程序

我们的应用程序由几个组件组成:

  1. 数据摄取服务:连接到模拟股票市场数据流。

  2. 处理引擎:分析重大事件(例如,价格急剧上涨)的数据。

  3. 仪表盘服务:实时更新实时仪表盘。

先决条件

  • 安装 Rx.NET 软件包 (System.Reactive)

  • 安装 Akka.NET 软件包 (Akka)

项目结构

此示例的结构如下,全部位于单个控制台应用程序项目中:

  • StockTick类来表示股票数据。

  • StockMarketSimulator类来模拟股票数据流。

  • Akka.NET actors:用于更新仪表板和处理重大动作。DashboardActorSignificantMovementActor

  • 将所有内容连接在一起的主类。Program

步骤 1:设置数据引入服务

我们需要模拟股票市场价格的实时数据馈送。为简单起见,假设我们有一个返回流的函数,其中是一个表示股票符号、价格和时间戳的类。GetStockStream()IObservable<StockTick>StockTick

我们将使用 Rx.NET 定期生成对象流。 方法每秒(或您喜欢的任何其他合理间隔)创建一个价格变动,然后将这些变动映射到具有随机生成的价格和交易品种的对象。StockTickObservable.IntervalStockTick

using System;  
using System.Reactive.Linq;  
using System.Reactive.Threading.Tasks;  
using System.Threading.Tasks;  
  
public class StockTick  
{  
    public string Symbol { get; set; }  
  
    public double Price { get; set; }  
  
    public DateTime Timestamp { get; set; }  
}  
  
public static class StockMarketSimulator  
{  
    private static readonly Random rand = new Random();  
    private static readonly string[] symbols = new[] {   
      "AAPL",   
      "MSFT",   
      "GOOGL",   
      "AMZN",   
      "FB" };  
  
    public static IObservable<StockTick> GetStockStream()  
    {  
        return Observable.Interval(TimeSpan.FromSeconds(1))  
            .Select(_ => new StockTick  
            {  
                Symbol = symbols[rand.Next(symbols.Length)],  
                Price = Math.Round(100 + (rand.NextDouble() * 1000), 2),  
                Timestamp = DateTime.Now  
            });  
    }  
}

第 2 步:使用 Rx.NET 分析数据

Rx.NET 在处理数据流方面大放异彩。我们可以订阅并使用 LINQ 查询来实时处理和分析股票即时报价。IObservable<StockTick>

让我们假设,一个重大的变动是在 5 秒缓冲窗口内价格变化超过 30%。

using System;  
using System.Linq;  
using System.Reactive.Linq;  
  
public class StockAnalysis  
{  
    public static void AnalyzeStockTicks(IActorRef significantMovementActor)  
    {  
        IObservable<StockTick> stockStream = StockMarketSimulator.GetStockStream();  
  
        return stockStream  
            .GroupBy(tick => tick.Symbol)  
            .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(30)))  
            .Select(buffer =>  
            {  
                var firstTick = buffer.First();  
                var lastTick = buffer.Last();  
  
                // Avoid potential issues if the buffer is empty  
                if (firstTick == null || lastTick == null) return null;  
  
                var priceChange = Math.Abs((lastTick.Price - firstTick.Price) / firstTick.Price);  
  
                return new  
                {  
                    Symbol = firstTick.Symbol,  
                    PriceChange = priceChange,  
                    StartPrice = firstTick.Price,  
                    EndPrice = lastTick.Price,  
                    Timestamp = DateTime.Now  
                };  
            })  
            .Where(x => x != null && x.PriceChange > 0.05); // Filter for more than 5% price change   
    }  
}

对每个股票品种的报价进行分组,缓冲 30 秒,然后进行分析以发现重大的价格变化。如果价格变化超过 5%,则认为价格变动显著,并且有关变动的信息将打印到控制台。

调用是 Rx.NET 模式的重要组成部分,其中订阅了处理的结果(重大变动)。该方法是触发可观察序列的执行,并定义如何处理每个发出的项目(重大的股价变动)。.Subscribe(movement => { ... })Subscribe

步骤 3:集成仪表板更新的 Akka.NET

我们将创建一个 Akka.NET 参与者系统,以处理我们的 Rx.NET 分析检测到的重大股票变动。此执行组件系统将由两个主要执行组件组成:

  • DashboardActor:负责接收重要的移动消息并更新仪表板。

  • SignificantMovementActor:订阅可观察对象,并在检测到重大移动时向 发送消息。significantMovementsDashboardActor

public class SignificantMovement  
{  
    public string Symbol { get; set; }  
  
    public double PriceChange { get; set; }  
  
    public double StartPrice { get; set; }  
  
    public double EndPrice { get; set; }  
  
    public DateTime Timestamp { get; set; }  
}

using Akka.Actor;  
  
// Akka.NET Actor for dashboard updates  
public class DashboardActor : ReceiveActor  
{  
    public DashboardActor()  
    {  
        Receive\<SignificantMovement>(movement =>  
        {  
            // Logic to update the dashboard with the significant movement  
            Console.WriteLine($"Dashboard updated for {movement.Symbol}: {movement.PriceChange \* 100:F2}% change, from {movement.StartPrice} to {movement.EndPrice}");  
        });  
    }  
}

using Akka.Actor;  
using System;  
using System.Reactive.Linq;  
  
// Akka.NET Actor to handle significant movements  
public class SignificantMovementActor : ReceiveActor  
{  
    private readonly IActorRef _dashboardActor;  
  
    public SignificantMovementActor(IActorRef dashboardActor, IObservable<dynamic> significantMovements)  
    {  
        this._dashboardActor = dashboardActor;  
  
        significantMovements.Subscribe(movement =>  
        {  
            var significantMovement = new SignificantMovement  
            {  
                Symbol = movement.Symbol,  
                PriceChange = movement.PriceChange,  
                StartPrice = movement.StartPrice,  
                EndPrice = movement.EndPrice,  
                Timestamp = movement.Timestamp  
            };  
  
            _dashboardActor.Tell(significantMovement);  
        });  
    }  
}

创建执行组件系统,并在主应用程序逻辑中将所有内容绑定在一起。

using Akka.Actor;  
using System;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        var system = ActorSystem.Create("StockMonitorSystem");  
        var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");  
  
        var significantMovements = StockAnalysis.AnalyzeStockTicks();  
        var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));  
        system.ActorOf(props, "significantMovementActor");  
  
        Console.WriteLine("System is running. Press any key to exit...");  
        Console.ReadLine();  
        system.Terminate().Wait();  
    }  
}

该方法设置并运行一个 Akka.NET actor 系统,该系统与 Rx.NET 集成,以处理和显示显着的库存变动。Main

让我们分解该方法的每个部分,以了解它是如何工作的,以及它如何实现对股票市场数据模拟和处理的持续监听。Main

var system = ActorSystem.Create("StockMonitorSystem");

此行初始化名为 的 Akka.NET 执行组件系统的新实例。执行组件系统是一个分层的执行组件组,它为创建执行组件、调度消息和管理执行组件生命周期提供基础结构。这是使用 Akka.NET 的入口点。StockMonitorSystem

var dashboardActor = system.ActorOf<DashboardActor>("dashboardActor");

这将在执行组件系统中创建类型的执行组件。 是用于实例化 actor 的方法,返回对新创建的 actor 的引用。 是为此执行组件实例提供的名称,可用于在系统中查找它。DashboardActorActorOfdashboardActor

var props = Props.Create(() => new SignificantMovementActor(dashboardActor, significantMovements));

Props 是一个配置类,用于描述如何创建 actor 的实例。通过使用 ,您可以指定应如何构造 ,包括其依赖项。在这里,它采用一个 lambda 表达式,该表达式构造一个新的 ,传入引用和可观察序列。Props.CreateSignificantMovementActorSignificantMovementActordashboardActorsignificantMovements

system.ActorOf(props, "significantMovementActor");

此行创建使用前面定义的 props 的实例。它类似于 的创建方式,但使用 用于更复杂的初始化。SignificantMovementActordashboardActorprops

现在,从 Rx.NET 创建的参与者和可观察序列被设置为连续处理和显示重要的库存变动。侦听可观察对象并将其收到的任何消息转发到 .反过来,将处理这些消息(在本例中,将信息打印到控制台)。SignificantMovementActorsignificantMovementsdashboardActordashboardActor

Console.ReadLine();

此行至关重要,因为它可以防止应用程序立即退出。它等待用户按下一个键,然后再继续,有效地保持应用程序(以及参与者系统)运行并能够处理传入的模拟股票数据。

system.Terminate().Wait();

system.Terminate()启动执行组件系统的关闭,停止所有执行组件并释放资源。这是一个异步操作,返回 .Task

.Wait()在终止任务上,确保应用程序在执行组件系统完全关闭之前不会退出。这对于干净和优雅的退出非常重要,确保完成所有正在进行的处理并正确释放资源。

因此,该方法建立了一个连续的、反应性的处理管道,使用 Akka.NET 参与者来处理和显示重大的股票市场走势。Main

问:我能否通过发布到队列并从队列中侦听而不是使用 Rx.NET 和 Akka.NET 来实现相同的功能?

是的,队列可用于类似的数据流和处理,但 Rx.NET 提供了更具表现力和简洁的流处理能力,而 Akka.NET 提供了一个强大的框架,用于使用基于参与者的模型构建并发和分布式系统,从而增强容错能力和系统响应能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/550138.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Django中的定时任务与后台任务队列的实践【第164篇—Django】

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 在Web开发中&#xff0c;处理定时任务和后台任务队列是很常见的需求。Django作为一个功能强…

arm64-v8a、armeabi-v7a、x86、x86_64

当我们去GitHub下载应用的时候是不是经常很懵逼&#xff0c;就像下图一样&#xff0c;粗看一下如此多安装包到底要选择下载哪个且每种安装包到底有哪差别&#xff1f;毕竟因为自己一无所知&#xff0c;有时便随意下载一个后&#xff0c;安装时却报『此版本与你的系统不兼容』的…

Qt---状态栏、工具栏、浮动窗口

文章目录 工具栏创建工具栏停靠位置浮动性与移动 状态栏创建状态栏并显示消息 浮动窗口创建浮动窗口设置停靠位置往浮动窗口添加内容 工具栏 工具栏是应用程序中集成各种功能实现快捷键使用的⼀个区域。 创建工具栏 工具栏与菜单栏不同&#xff0c;并不会自动创建&#xff0…

【网络编程】如何创建一个自己的并发服务器?

hello &#xff01;大家好呀&#xff01; 欢迎大家来到我的网络编程系列之如何创建一个自己的并发服务器&#xff0c;在这篇文章中&#xff0c;你将会学习到在Linux内核中如何创建一个自己的并发服务器&#xff0c;并且我会给出源码进行剖析&#xff0c;以及手绘UML图来帮助大家…

UE 录屏自动化上传阿里云OSS

前言 最近在做一个功能&#xff0c;然后就发现了一个很有趣的东西&#xff0c;虽然在一定程度上属于偷懒&#xff0c;但是在一些短频快的应用中还是很适用的&#xff0c;下面我就针对于这个测试做一些简单的分享&#xff0c;希望帮助到大家&#xff0c;在实际的开发中获得一些灵…

Java-常用API-1

Math类 public static int abs(int a) 获取绝对值public static double ceil(double a)向上取整&#xff08;向数轴右边取整&#xff09;public static double floor(double a)向下取整&#xff08;向数轴左边取整&#xff09;public static int round(float a)四舍五入public…

SQL SERVER的安装

目录 1.百度SQL SERVER找到图下的所显示的&#xff0c;点击进去 2.找到图下红色框起来的&#xff0c;点击立即下载 3.下载好之后点开&#xff0c;选择下载介质 4.SQLSERVER下载成功之后选择打开文件夹 6.双击后缀名是.iso的镜像文件 7.双击setup.exe进行安装 8.安装成功…

Web3D智慧医院平台(HTML5+Threejs)

智慧医院的建设将借助物联网、云计算、大数据、数字孪生等技术&#xff0c;以轻量化渲染、极简架构、三维可视化“一张屏”的形式&#xff0c;让医院各大子系统管理既独立又链接&#xff0c;数据相互融合及联动。 建设医院物联网应用的目标对象&#xff08;人、物&#xff09;都…

Java复习第二十天学习笔记(过滤器Filter),附有道云笔记链接

【有道云笔记】二十 4.8 过滤器Filter https://note.youdao.com/s/dSofip3f 一、为什么要使用过滤器 项目开发中&#xff0c;经常会用到重复代码的实现。 1、请求每个servlet都要设置编码 2、判断用户是否登录&#xff0c;只有登录了才有操作权限。 二、过滤器相关Api int…

【机器学习300问】71、神经网络中前向传播和反向传播是什么?

我之前写了一篇有关计算图如何帮助人们理解反向传播的文章&#xff0c;那为什么我还要写这篇文章呢&#xff1f;是因为我又学习了一个新的方法来可视化前向传播和反向传播&#xff0c;我想把两种方法总结在一起&#xff0c;方便我自己后续的复习。对了顺便附上往期文章的链接方…

论文笔记:(INTHE)WILDCHAT:570K CHATGPT INTERACTION LOGS IN THE WILD

iclr 2024 spotlight reviewer 评分 5668 1 intro 由大型语言模型驱动的对话代理&#xff08;ChatGPT&#xff0c;Claude 2&#xff0c;Bard&#xff0c;Bing Chat&#xff09; 他们的开发流程通常包括三个主要阶段 预训练语言模型在被称为“指令调优”数据集上进行微调&…

通过腾讯云搭建跨境电商demo的详细操作过程(建站系统 保姆级指导,巨详细)

引言&#xff1a; 有许多做跨境电商的朋友&#xff0c;或者为跨境电商服务的小企业&#xff0c;都会面临搭建电商平台V1.0的问题 因此&#xff0c;花了点时间&#xff0c;找了一个开源的项目&#xff0c;让大家可以跑起来&#xff0c;一方面了解平台都有哪些模块&#xff0c;另…

护网行动 | 蓝队应急响应流程概述

了解蓝队应急响应的流程 应急响应通常是指为了应对各种意外事件发生前所做的准备&#xff0c;以及在意外事件发生后所采取的措施。 网络安全应急响应是指对已经发生或可能发送的安全事件进行监控、分析、协调、处理、保护资产安全。 网络安全应急响应主要是为了让人们对网络安全…

3D模型处理的并行化

今天我们将讨论如何使用 Python 多进程来处理大量3D数据。 我将讲述一些可能在手册中找到的一般信息&#xff0c;并分享我发现的一些小技巧&#xff0c;例如将 tqdm 与多处理 imap 结合使用以及并行处理存档。 那么我们为什么要诉诸并行计算呢&#xff1f; 使用数据有时会出现…

IAM 统一身份认证与访问管理服务

即统一身份认证与访问管理服务&#xff0c;是云服务商提供的一套云上身份管理解决方案&#xff0c;可帮助企业安全地管理云上资源的访问权限。 在当今云计算时代&#xff0c;企业越来越依赖云服务来存储和处理敏感数据。然而&#xff0c;这也带来了新的安全挑战&#xff0c;即…

ssm 体检预约管理系统开发mysql数据库web结构java编程计算机网页源码eclipse项目

一、源码特点 ssm 体检预约管理系统是一套完善的信息系统&#xff0c;结合springMVC框架完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库&#xff0c; 系统主要采用B/S…

URL GET +号后台接收成空格

问题&#xff1a;参数spdmwhbs001 其中包含URL特殊符号 如果用GET请求方式不做任何不处理那么浏览器自动将转为%20 请求链接为 details?spdmwhbs%20001&limitKcysType1 后台接收到的参数为 whbs 001 &#xff0c;自动将号转成空格了。 尝试解决&#xff08;失败&#…

【Java】@RequestMapping注解在类上使用

RequestMapping 是 Spring Web 应用程序中最常被用到的注解之一。这个注解会将 HTTP 请求映射到控制器&#xff08;controller类&#xff09;的处理方法上。 Request Mapping 基础用法 在 Spring MVC 应用程序中&#xff0c;RequestDispatcher (在 Front Controller 之下) 这…

git clone遇到报错“SSL certificate problem: self signed certificate”

在git clone的时候发现遇到了一个问题&#xff1a; cristDESKTOP-JKRNKSH MINGW64 ~/Desktop $ git clone -b dev https://xxx.xxx.xxx.xxx:3001/crist/AVM_V9M.git Cloning into AVM_V9M... fatal: unable to access https://xxx.xxx.xxx.xxx3001/crist/AVM_V9M.git/: SSL ce…

PSCAD|应用于输电线路故障测距的行波波速仿真分析

1 主要内容 该程序参考文献《应用于输电线路故障测距的行波波速仿真分析》&#xff0c;利用线路内部故障产生的初始行波浪涌达线路两端测量点的绝对时间之差值计算故障点到两端测量点之间的距离&#xff0c;并利用小波变换得到初始行波波头准确到达时刻&#xff0c;从而精准定…