本章将介绍并行编程模式,重点是理解并行代码问题场景并使用并行编程/异步技术解决他们。本章会介绍几种最重要的编程模式。
本教程学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode
1、MapReduce 模式
引入 MapReduce 是为了解决处理大数据的问题,例如跨服务器的大规模计算需求。该模式可以在单核计算机上使用。
1.1、映射和归约
MapReduce 程序顾名思义,即 Map(映射) + Reduce(归约)。MapReduce 程序的输入作为键值对被传递,输出也是同样形式。
书上讲的听起来很抽象,画张图来辅助理解:
输入一个列表,然后通过某种方式先进行筛选(返回列表),然后进行分组(返回键值),最后返回各个分组的键值对作为结果:
1.2、使用 LINQ 实现 MapReduce
其示例中,扩展方法如下:
public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, TResult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce)
{
return source.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
我们用一个需求来理解这段函数:
-
源数据为 1000 个 -100~100 的随机数;
-
筛选出其中的正数;
-
将其按照10位进行分组(0~9一组、10~19一组,以此类推);
-
统计每个分组的个数。
那么,使用上述 MapReduce 模板进行处理,示例代码如下:
private void RunMapReduce()
{
//初始化原始数据
int length = 1000;
List<int> L = new List<int>(length);
for (int i = 0; i < length; i++)
{
L.Add(Random.Range(-100, 100));
}
var ret = L.AsParallel().MapReduce(
mapPositiveNumbers,//筛选正数
groupNumbers,//映射分组
reduceNumbers);//归约合并结果
foreach (var item in ret)
{
Debug.Log($"{item.Key * 10} ~ {(item.Key + 1) * 10} 出现了:{item.Value} 次 !");
}
}
public static IEnumerable<int> mapPositiveNumbers(int number)
{
IList<int> PositiveNumbers = new List<int>();
if (number > 0)
PositiveNumbers.Add(number);
return PositiveNumbers;
}
public static int groupNumbers(int number)
{
return number / 10;
}
public static IEnumerable<KeyValuePair<int, int>> reduceNumbers(IGrouping<int, int> grouping)
{
return new[]
{
new KeyValuePair<int, int>(grouping.Key,grouping.Count())
};
}
运行结果如下所示:
通过上述示例,这个映射与归约就容易理解多了:实际上就是某一种特定的业务模板写法:筛选 → 分组 → 合并。在并行编程中,类似这样的写法都可以通过同样的模板代码实现。
2、聚合
聚合(Aggregation)是并行应用程序中使用的另一种常见的设计模式。在并行程序中,数据被划分为多个单元,以便可以通过多个线程在内核之间进行处理。在某个时候,需要将所有相关来源数据组合起来,然后才能呈现给用户。
书上的例子只讨论了使用 PLINQ 代码的示例,我们也照着写一个:
private void RunAggregation()
{
var L = Utils.GetOrderList(10);
var L2 = L.AsParallel()
.Select(TestFunction.IntToString)//并行处理
.ToList();//合并
foreach (var item in L2)
Debug.Log(item);
}
public static string IntToString(int x)
{
return $"ToString_{x}";
}
上述代码运行结果如下:
可以看到,这个运行模式是保证顺序的(源数据是List)。
一般来讲,我们为了避免锁、同步等额外处理,要么使用 PLINQ 这样的语法,要么使用并发集合。这样可以减少我们需要手动处理锁、同步等工作。
3、分叉/合并模式
在分叉/合并(Fork/Join)模式中,工作被分叉(拆分)为一组可以异步执行的任务,然后根据并行化的要求和范围,以相同(或不同)的顺序合并分叉的任务。
分叉/合并模式常见的一些实现如下:
-
Parallel.For
-
Parallel.ForEach
-
Paralle.Invoke
-
System.Threading.CountdownEvent
利用这些同步框架开发人员能快速实现开发,而不必担心同步开销(系统已经内部处理同步了,实际上如如果额外开销不可接受,用这些 API 也没办法优化)。
我们将之前的代码通过 分叉/合并模式 再改一版:
private void RunForkJoin()
{
var L = Utils.GetOrderList(10);
ConcurrentQueue<string> queue = new ConcurrentQueue<string>();
Parallel.For(0, L.Count, x =>
{
var ret = IntToString(x);
queue.Enqueue(ret);
});
while (queue.Count > 0)
{
string str;
if (queue.TryDequeue(out str))
Debug.Log(str);
}
}
这次我们看运行结果:
很显然,已经乱序了,这种模式就没有按照原来数据顺序进行数据处理。这也是这个模式的特点之一,我们可以选择是否要按照顺序进行合并。
4、推测处理模式
推测处理模式(Speculative Processing Pattern)是依赖高吞吐量以减少等待时间的另一种并行编程模式。
推测处理模式(Speculative Processing Pattern):
如果同时存在多种处理任务,但并不知道哪一种方式速度最快。因此第一个执行的完成的任务将被输出,其他任务处理结果将会忽略。
以下是一种推测处理模式的常见写法:
//选择一个最快执行方法的结果并返回
public static TResut SpeculativeForEach<TSource, TResut>(TSource source, IEnumerable<Func<TSource, TResut>> funcs)
{
TResut result = default;
Parallel.ForEach(funcs, (func, loopState) =>
{
result = func(source);
loopState.Stop();
});
return result;
}
//返回特定方法的最快执行结果并返回
public static TResut SpeculativeForEach<TSource, TResut>(IEnumerable<TSource> source, Func<TSource, TResut> func)
{
TResut result = default;
Parallel.ForEach(source, (item, loopState) =>
{
result = func(item);
loopState.Stop();
});
return result;
}
这种写法只会返回一个结果,首先完成的任务将被返回。但是其他任务仍然有可能执行完成,只是结果将不会被返回。
这里我们选择方法一进行示例,调用代码如下:
private void RunSpeculativeMethod_1()
{
Debug.Log($">===== RunSpeculativeMethod_1 开始 =====<");
var L1 = new List<Func<int, string>>
{
IntToString,
IntToString2
};
string result = SpeculativeForEach(4, L1);
Debug.Log($"运行结果:{result}");
}
连续运行2次,其结果如下:
第一次是使用了 IntToString2 的结果,而第二次使用的 IntToString 的结果。
5、延迟模式
也就是在使用时才创建,也就是懒加载。这个在之前的章节中已经有详细介绍了,这里就不重复了。
详见:使用延迟初始化提高性能
【C#】并行编程实战:使用延迟初始化提高性能_魔术师Dix的博客-CSDN博客在前面的章节中讨论了 C# 中线程安全并发集合,有助于提高代码性能、降低同步开销。本章将讨论更多有助于提高性能的概念,包括使用自定义实现的内置构造。本章主要内容为通过延迟初始化提高性能,相对比较简单。https://blog.csdn.net/cyf649669121/article/details/131780600
6、共享状态模式
这个主要在 【C#】并行编程实战:同步原语(1)_魔术师Dix的博客-CSDN博客 中已经介绍过共享状态(Shared State Pattern)的实现(其实就是各种加锁,搞的好像很高级)。
不过上锁不能上太多,不然性能很差;而且我们也应该尽可能实现无锁代码。
7、本章小结
本章介绍了各种并行编程模式,其实就是各种模板的示例。当然,这里讲的不可能包罗所有,只是给大家提供一些参考。至此,多线程编程的学习告一段落,书上的内容已经讲完了。后续如果有补充会加到这个系列里。
多线程的实践还是需要在项目中多多练习。
本教程学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode