一、方案对比:不同线程安全集合的适用场景
二、推荐方案及示例代码
方案 1:使用 BlockingCollection(同步模型)
public class QueueDemo
{
private readonly BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public QueueDemo()
{
}
public void ProduceData()
{
Task.Run(() =>
{
var rnd = new Random();
while (!_cts.IsCancellationRequested)
{
var item = rnd.Next(1, 100);
_blockingCollection.Add(item);
Console.WriteLine($"Produced1: {item}");
Thread.Sleep(500);
}
_blockingCollection.CompleteAdding();
});
}
public void ConsumeData()
{
Task.Run(() =>
{
try
{
Thread.Sleep(1000);
foreach (var item in _blockingCollection.GetConsumingEnumerable(_cts.Token))
{
Console.WriteLine($"Consumed from BlockingCollection: {item}, 当前个数:{_blockingCollection.Count}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumption canceled");
}
});
}
public void Stop()
{
_cts.Cancel();
}
}
var demo = new QueueDemo();
demo.ProduceData();
demo.ConsumeData();
Console.WriteLine("Press any key to stop...");
Console.ReadKey();
demo.Stop();
方案 2:使用 Channel(异步模型 - 推荐)
public class ChannelDemo
{
private readonly Channel<int> _channel = Channel.CreateUnbounded<int>(
new UnboundedChannelOptions { SingleWriter = false, SingleReader = false }
);
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public async Task ProduceAsync()
{
while (true)
{
var item = GenerateItem();
await _channel.Writer.WriteAsync(item);
Console.WriteLine($"Produce: {item}");
await Task.Delay(20);
}
}
public async Task ConsumeAsync()
{
while (await _channel.Reader.WaitToReadAsync())
{
if (_channel.Reader.TryRead(out var item))
{
await ProcessItemAsync(item);
}
}
}
private int GenerateItem() => new Random().Next(1, 100);
private async Task ProcessItemAsync(int item)
{
await Task.Delay(100);
Console.WriteLine($"Processed: {item}");
}
public void Stop()
{
_cts.Cancel();
}
}
三、选型建议