本章继续学习实现数据并行,本文主要介绍并行循环中的线程存储。这也是本章节的最后一篇。
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode
5、了解并行循环中的线程存储
默认情况下,所有并行循环都可以访问全局变量。但是,访问全局变量是有同步开销的,因此,尽可能使用线程内局部变量是有意义的。在并行循环中,可以创建和使用线程局部变量(Thread Local Varable,也称为线程本地变量)或分区局部变量(Partition Local Variable,也称分区本地变量)。
5.1、线程局部变量(Thread Local Varable)
这里用到了一个 Parallel.For 的一个重载:
Parallel.For 方法 (System.Threading.Tasks) | Microsoft Learn执行 for 循环,其中可能会并行运行迭代。 https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.parallel.for?view=netstandard-2.1#system-threading-tasks-parallel-for-1(system-int64-system-int64-system-threading-tasks-paralleloptions-system-func((-0))-system-func((system-int64-system-threading-tasks-parallelloopstate-0-0))-system-action((-0))) 示例代码如下:
private void RunByThreadLocalVarable()
{
var options = new ParallelOptions { MaxDegreeOfParallelism = 3 };//最大并行度3
Parallel.For(0, 11, options,//从0开始循环,循环11次(到10结束)
() => 100,//第一次循环时的初始值
(i, loop, ret) =>
{
ret += i;
return ret;//返回给下一次循环的值
}, TestFunction.OnTaskFinish);//最后运行完成的回调
}
迭代里面的方法很简单,就是单纯地累加。从0加到10,显然就是55。但是我们给每次循环的初始值是100,也就是每次迭代开始,就会从100开始计数。然后最大并行度为3,也就是最多会有 3 次线程并行。根据上述代码,我们人脑编译一下,运行结果应该是 100+100+100+55=355 (如果并行了3次)。
跑一下代码:
结果为 :134+100+121=355,与理论相同。
5.2、分区局部变量(Partition Local Variable)
分区局部变量与线程局部变量类似,区别在于其可以用于分区,这次用到的重载是这个:
Parallel.ForEach 方法 (System.Threading.Tasks) | Microsoft Learn执行 foreach(在 Visual Basic 中为 For Each )操作,其中可能会并行运行迭代。 https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.parallel.foreach?view=netstandard-2.1#system-threading-tasks-parallel-foreach-2(system-collections-concurrent-partitioner((-0))-system-threading-tasks-paralleloptions-system-func((-1))-system-func((-0-system-threading-tasks-parallelloopstate-1-1))-system-action((-1))) 这次需要加一个 Partitioner 的测试代码:
using System.Collections.Concurrent;
using System.Collections.Generic;
using UnityEngine;
namespace HOPP.Ch03
{
/// <summary>
/// 测试用分区局部变量
/// </summary>
public class PartitionerTest : Partitioner<List<int>>
{
//这里共享一个列表,以观察线程竞争
private List<int> L = new List<int>();
public override bool SupportsDynamicPartitions => true;
//这个方法在此示例中没有被调用过
public override IList<IEnumerator<List<int>>> GetPartitions(int partitionCount)
{
Debug.Log($" GetPartitions {L.Count} | {partitionCount}");
for (int i = 0; i < 10; i++)
L.Add(i);
return (IList<IEnumerator<List<int>>>)L;
}
//迭代:每次调用就添加一个值;
public override IEnumerable<List<int>> GetDynamicPartitions()
{
Debug.Log($"GetDynamicPartitions Start: {L.Count}");
for (int i = 0; i < 10; i++)
{
L.Add(i);
Debug.Log("Add Value : " + i);
//Debug.Log($" GetDynamicPartitions {L.Count} ");
yield return L;
}
Debug.Log($" GetDynamicPartitions Last: {L.Count} ");
yield return L;
}
}
}
Partitioner表示将数据源拆分为多个分区的特定方式。 https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.partitioner-1?view=netstandard-2.1 之后我们开始调用:
private void RunByPartitionLocalVarable()
{
var partitioner = new PartitionerTest();
var options = new ParallelOptions { MaxDegreeOfParallelism = 3 };//最大并行度3
Parallel.ForEach(partitioner,
options,
() => 0,//第一次迭代时的初始值
(L, loop, ret) =>
{
//每次都将列表中的值汇总
ret = 0;
foreach (var item in L)
ret += item;
return ret;//返回给下一次循环的值
}, TestFunction.OnTaskFinish);//最后运行完成的回调
}
可以想象的是,因为线程竞争,所以3次并行的结果都会有所差别,实际也是如此。我们直接看最后的结果打印:
2个线程同步执行完成,打印出了结果105,最后一个线程结果135,最后一个完成。
本章总结
本章详细介绍了使用任务并行库(TPL)实现数据并行的方式。
这一章的实用性也很强了,而且已经开始涉及到数据的处理,但是这里还没有讲如何处理线程竞争的问题。不过通过这一章的学习,相信大家对多线程的运行模式已经有了更为深刻的理解。
虽然本章提到了怎么分区,但是并没有很深入。要用好各种分区、局部变量策略,还需要继续深入学习。
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode