C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{
    for (int i = 0; i < 100; i++)
    {
        await writer.WriteAsync(i.ToString());
        await Task.Delay(100); // 模拟数据生成的时间间隔
    }
    writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{
   while (await reader.WaitToReadAsync())
   {
      if (reader.TryRead(out var msgstring))
      {
         Console.WriteLine($"Consumed: {msgstring}");
        // 在这里处理数据
      }
   }
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more information

using System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;

Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();

switch (key.KeyChar)
{
   case '1':
       await SingleProducerSingleConsumer();
       break;

   case '2':
       await MultiProducerSingleConsumer();
       break;

   case '3':
       await SingleProduceMultipleConsumers();
       break;

   case '4':
       await MultiProducerMultipleConsumers();
       break;
   default:
       Console.WriteLine("请先选择运行模式!");
       break;
}

// 单生产单消费
static async Task SingleProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 2000);
   var consumer1 = new Consumer(channel.Reader, 1, 1500);

   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费
   Task producerTask1 = producer1.ProducerAsync(); // 开始生产

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await consumerTask1;
}

// 多生产单消费
static async Task MultiProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 2000);
           await producer.ProducerAsync();
       }));

       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   var consumer1 = new Consumer(channel.Reader, 1, 250);
   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());
   await consumerTask1;
}

// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 100);
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   Task producerTask1 = producer1.ProducerAsync();

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}


// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <=3; i++)
   {
       Console.WriteLine("线程"+i.ToString());
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 100);
           await producer.ProducerAsync();
       }));
       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i < 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}



  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   internal class Producer
   {
       private readonly ChannelWriter<string> _writer;
       private readonly int _identifier;
       private readonly int _delay;

       public Producer(ChannelWriter<string> writer, int identifier, int delay)
       {
           _writer = writer;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ProducerAsync()
       {
           Console.WriteLine($"开始 ({_identifier}): 发布消息");

           for (var i = 0; i < 10; i++)
           {
               await Task.Delay(_delay); // 停顿一下,方便观察数据

               var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";

               Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");

               await _writer.WriteAsync(msg);
           }

           Console.WriteLine($"发布 ({_identifier}): 完成");
       }
   }
}

  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   /// <summary>
   /// 消费
   /// </summary>
   internal class Consumer
   {
       private readonly ChannelReader<string> _reader;
       private readonly int _identifier;
       private readonly int _delay;

       public Consumer(ChannelReader<string> reader, int identifier, int delay)
       {
           _reader = reader;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ConsumerAsync()
       {
           Console.WriteLine($" 开始({_identifier}):消费 ");

           while (await _reader.WaitToReadAsync())
           {
               if (_reader.TryRead(out var timeString))
               {
                   await Task.Delay(_delay); // 停顿一下,方便观察数据

                   Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");
               }
           }

           Console.WriteLine($"消费 ({_identifier}): 完成");
       }
   }
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/954272.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ArcSegment绘制及计算

ArcSegment绘制及计算 给定起始点、终止点和 bulge 值计算弧线中心点和半径&#xff0c;绘制ArcSegment。 import math def calculate_arc_center_and_radius(x1, y1, x2, y2, bulge):angle4*math.atan(bulge)# 计算弦中点mid_x (x1 x2) / 2mid_y (y1 y2) / 2# 计算弦长的…

【高可用自动化体系】自动化体系

架构设计的愿景就是高可用、高性能、高扩展、高效率。为了实现架构设计四高愿景&#xff0c;需要实现自动化系统目标&#xff1a; 标准化。 流程自助化。 可视化&#xff1a;可观测系统各项指标、包括全链路跟踪。 自动化&#xff1a;ci/cd 自动化部署。 精细化&#xff1a…

FakeLocation 1599 | 内部旧版

前言:FakeLocation又更新了,在某安上面看见一些&#xff0c;大概问题就是地图没了&#xff0c;然后有更难搞了 任务一 我们先去看看地图是怎么个事情 这里用的是百度地图就没有了哈 高德地图是有的 任务二 null 选择成功了&#xff0c;虽然是null 任务三 地图位置 虽然不显示了…

初识算法和数据结构P1:保姆级图文详解

文章目录 前言1、算法例子1.1、查字典&#xff08;二分查找算法&#xff09;1.2、整理扑克&#xff08;插入排序算法&#xff09;1.3、货币找零&#xff08;贪心算法&#xff09; 2、算法与数据结构2.1、算法定义2.2、数据结构定义2.3、数据结构与算法的关系2.4、独立于编程语言…

2025年华数杯国际赛B题论文首发+代码开源 数据分享+代码运行教学

176项指标数据库 任意组合 千种组合方式 14页纯图 无水印可视化 63页无附录正文 3万字 1、为了方便大家阅读&#xff0c;全文使用中文进行描述&#xff0c;最终版本需自行翻译为英文。 2、文中图形、结论文字描述均为ai写作&#xff0c;可自行将自己的结果发给ai&#xff0c…

CSS的小知识

一、子选择器 (>) 让 CSS 样式只作用于子级和孙级元素&#xff0c;而不影响其他元素 有>是只对其子级有效&#xff0c;子选择器只会影响直接的子级元素&#xff0c;而不会影响更深层次的孙级元素 无>时是对子级、孙级、曾孙级等所有后代都有效

【经管数据】ZF数字采购采购明细数据(2015.3-2024.3)

一、数据来源&#xff1a; 原始数据来源为ZF采购网。数据涵盖了自2015年3月至2024年3月的ZF数字采购合同明细&#xff0c;反映了数字化转型在政府采购中的应用情况。 二、参考文献&#xff1a; [1] 申志轩, 祝树金, 文茜, 等. ZF数字采购与企业数字化转型[J]. 数量经济技术经济…

【Linux】Mysql部署步骤

一、JDK安装配置 在home目录下执行命令&#xff1a;mkdir Jdk 1.将JDK 上传至该文件夹&#xff0c;有些终端工具可以直接上传文件&#xff0c;比如&#xff1a;MobaXterm 可以看到安装包已经上传上来了 2.直接安装 命令&#xff1a;rpm -ivh jdk-8u311-linux-x64.rpm 3.安装成…

虚拟同步机(VSG)Matlab/Simulink仿真模型

虚拟同步机控制作为原先博文更新的重点内容&#xff0c;我将在原博客的基础上&#xff0c;再结合近几年的研究热点对其内容进行更新。Ps&#xff1a;VSG相关控制方向的simulink仿真模型基本上都搭建出来了&#xff0c;一些重要的控制算法也完成了实验验证。 现在搭建出来的虚拟…

二分查找算法——点名

一.题目描述 LCR 173. 点名 - 力扣&#xff08;LeetCode&#xff09; 二.题目解析 有0~n-1这n个数&#xff0c;但是数组中只有n-1个数&#xff0c;我们要找到消失的那个数。 三.算法原理 1.哈希表 我们先创建一个n个数的哈希表并初始化为0&#xff0c;然后将数组中的数存放…

FIDO2密码钥匙与无密码认证:打造安全便捷的数字世界

在数字化时代&#xff0c;密码曾被视为网络安全的基石&#xff0c;但随着网络攻击手段日益复杂&#xff0c;传统的密码认证方法越来越无法抵御这些挑战。对于用户来说&#xff0c;登录密码不仅繁琐易忘&#xff0c;而且一旦泄露&#xff0c;往往会导致数据泄露&#xff0c;造成…

Jmeter进行http接口并发测试

目录&#xff1a; 1、Jmeter设置&#xff08;1&#xff09;设置请求并发数&#xff08;2&#xff09;设置请求地址以及参数&#xff08;3&#xff09;添加结果数 2、启动看结果 1、Jmeter设置 &#xff08;1&#xff09;设置请求并发数 &#xff08;2&#xff09;设置请求地址…

osg中实现模型的大小、颜色、透明度的动态变化

以博饼状模型为对象,实现了模型大小、颜色、透明度的动态变化。 需要注意的是一点: // 创建材质对象osg::ref_ptr<osg::Material> material = new osg::Material;material->setDiffuse(osg::Material::FRONT_AND_BACK, osg::Vec4(0.0, 1.0, 0.0, 0.5));// 获取模型的…

VSCode使用纪要

1、常用快捷键 1&#xff09;注释 ctrl? 单行注释&#xff0c; altshifta 块注释&#xff0c; 个人测试&#xff0c;ctrl? 好像也能块注释 2&#xff09;开多个项目 可以先开一个新窗口&#xff0c;再新窗口打开另一个项目&#xff0c;这时就是同时打开多个项目了。 打开…

Jmeter 简单使用、生成测试报告(一)

一、下载Jmter 去官网下载&#xff0c;我下载的是apache-jmeter-5.6.3.zip&#xff0c;解压后就能用。 二、安装java环境 JMeter是基于Java开发的&#xff0c;运行JMeter需要Java环境。 1.下载JDK、安装Jdk 2.配置java环境变量 3.验证安装是否成功&#xff08;java -versio…

Linux 服务器挖矿木马防护实战:快速切断、清理与加固20250114

Linux 服务器挖矿木马防护实战&#xff1a;快速切断、清理与加固 引言 挖矿木马作为一种常见的恶意软件&#xff0c;对服务器资源和安全构成严重威胁。据安全机构统计&#xff0c;2023 年全球约 45%的 Linux 服务器遭受过挖矿木马攻击&#xff0c;平均每台被感染服务器每月造…

Linux Kernel 之十 详解 PREEMPT_RT、Xenomai 的架构、源码、构建及使用

概述 现在的 RTOS 基本可以分为 Linux 阵营和非 Linux 阵营这两大阵营。非 Linux 阵营的各大 RTOS 都是独立发展,使用上也相对独立;而 Linux 阵营则有多种不同的实现方法来改造 Linux 以实现实时性要求。本文我们重点关注 Linux 阵营的实时内核实现方法! 本文我们重点关注 …

计算机网络(四)——网络层

目录 一、功能 二、IP数据报分片 三、DHCP动态主机配置协议 四、网络地址转换&#xff08;NAT&#xff09;技术 五、无分类编址CIDR 六、ARP地址解析协议 七、ICMP网际控制报文协议 八、IPv4和IPv6的区别 九、IPv4向IPv6的两种过渡技术——双栈协议和隧道技术 十、路由…

apache-skywalking-apm-10.1.0使用

apache-skywalking-apm-10.1.0使用 本文主要介绍如何使用apache-skywalking-apm-10.1.0&#xff0c;同时配合elasticsearch-8.17.0-windows-x86_64来作为存储 es持久化数据使用。 步骤如下&#xff1a; 一、下载elasticsearch-8.17.0-windows-x86_64 1、下载ES(elasticsear…

CVE-2025-22777 (CVSS 9.8):WordPress | GiveWP 插件的严重漏洞

漏洞描述 GiveWP 插件中发现了一个严重漏洞&#xff0c;该插件是 WordPress 最广泛使用的在线捐赠和筹款工具之一。该漏洞的编号为 CVE-2025-22777&#xff0c;CVSS 评分为 9.8&#xff0c;表明其严重性。 GiveWP 插件拥有超过 100,000 个活跃安装&#xff0c;为全球无数捐赠平…