本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode
本章继续介绍任务并行性,因篇幅所限,本章为下篇。
6、处理任务异常
所有优秀的程序员都擅长高效地处理异常,这也是并行编程最重要的方面之一。任务并行库(TPL)提供了一种高效的设计来处理异常:任务中发生的任何未处理异常都将被延迟,然后传播到使用 Join 方法加入的线程,后者负责观察任务中的异常。
下面我们通过代码实例来学习:
6.1、处理来自单个任务的异常
首先,我们需要写一个会出异常的程序:
/// <summary>
/// 一个“可能”错误的程序;
/// 会抛出异常错误
/// </summary>
public async static Task ErrorFunction()
{
var random = new System.Random();
int div = random.Next(-2, 2);
float ret = 1;
for (int i = 0; i < 10; i++)
{
if (div == 0)
{
//这里我们只打印,但是并不中断运行;
Debug.LogError("开始除0了!");
}
//直接除法,抛出除0的移除
ret += i / div;
await Task.Yield();
div = random.Next(-2, 2);
}
Debug.Log($"ErrorFunction 居然成功完成了!结果为:{ret} | {div}");
}
之后我们直接运行这段程序,就按照最简单的 Task.Run 来运行。结果很有意思啊:
发现没有,已经出现除0的警告了,但是并没有跑错误出来,Unity 一点反应没有!这说明在子线程里的异常是不会直接抛给主线程的。
下面我们换一个写法:
private void RunWithErrorTask()
{
try
{
Debug.Log("RunWithErrorTask 开始!");
var task=Task.Run(TestFunction.ErrorFunction);
task.Wait();//不用 task.Wait() 则不会抛出异常
}
catch (System.Exception ex)
{
Debug.LogError(ex.Message);
Debug.LogError(ex.StackTrace);
Debug.LogError(ex.InnerException);
}
}
我们调用 task.Wait(),用 try catch 语句进行包裹,结果如下:
其实没啥好说的,就是因为 task.Wait 调回了主线程,所以能接收到异常。上面2张截图,其实就是为了说明 Exception 的 StackTrace 和 InnerException 的区别:可以看到 StackTrace 是没有行号的,但是 InnerException 是可以定位到具体的方法。
6.2、处理来自多个任务的异常
类似于 5.3 那种,子任务有多个的情况,异常处理也类似。把 catch 的类型换成 AggregateException 就能拿到所有的异常了。
这里就不贴代码了,只要一张贴图就能明白所有:
6.3、使用回调函数处理任务异常
这里指的就是 AggregateException 运行使用回调来处理异常:
.......
catch (System.AggregateException ex)
{
ex.Handle(exception =>
{
Debug.LogError(exception.InnerException);
return true;
});
}
这里就是 Handle 提供一个方法,返回 true 表示此异常已经正确处理,返回 false 则系统会再次抛出此异常。
这些都是通用的 C# 函数异常处理方法了,就不必要再多说了。
AggregateException.Handle(FuncInvokes a handler on each Exception contained by this AggregateException.https://learn.microsoft.com/en-us/dotnet/api/system.aggregateexception.handle?view=netstandard-2.1
7、将APM模式转化为任务
传统的异步编程模型(Asynchronous Programming Model,APM)使用了 IAsyncResult 接口来创建异步方法,其设计模式采用了两个方法,即 BeginMethodName 和 EndMethodName。接下来,我们尝试从同步到APM,再到任务的过程:
7.1、同步
我们先整一个写入文件的测试代码:
public static string FilePath = "";
/// <summary>
/// 写文件的方法
/// </summary>
public static void WriteFile()
{
Debug.Log("WriteFile Start !");
using (FileStream fs = File.OpenWrite(FilePath))
{
UTF8Encoding encoding = new UTF8Encoding(true);
System.Random random = new System.Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100000; i++)
{
for (int j = 0; j < 1000; j++)
{
char c = (char)random.Next('A', 'Z' + 1);
sb.Append(c);
}
sb.Append('\n');
}
byte[] b = encoding.GetBytes(sb.ToString());
fs.Write(b, 0, b.Length);
fs.Close();
}
Debug.Log("WriteFile End !");
}
这里文件路径选择这些我就不贴代码了,这个比较简单。上述这个 10万行,每行1000个字符的数据,总计约100 MB,写入开销大约为 6S :
当然这么写会卡死 Unity 主线程,毕竟是同步方法。
7.2、APM 模式
采用APM模式,直接看代码,如下:
/// <summary>
/// 写文件的方法:APM 异步模式
/// </summary>
public static void WriteFileAsync()
{
Debug.Log("WriteFileAsync Start !");
using (FileStream fs = new FileStream(FilePath, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, FileOptions.Asynchronous))
{
UTF8Encoding encoding = new UTF8Encoding(true);
System.Random random = new System.Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 100000; i++)
{
for (int j = 0; j < 1000; j++)
{
char c = (char)random.Next('A', 'Z' + 1);
sb.Append(c);
}
sb.Append('\n');
}
byte[] b = encoding.GetBytes(sb.ToString());
Debug.Log("WriteFileAsync BeginWrite !");
IAsyncResult result = fs.BeginWrite(b, 0, b.Length, null, null);
Debug.Log("WriteFileAsync EndWrite !");
fs.EndWrite(result);
Debug.Log("WriteFileAsync Close !");
fs.Close();
}
Debug.Log("WriteFileAsync End !");
}
可以看到,程序是分为两次同步的,第一次是创建 10万 * 1000 个字符串,第二次则是写入文件的部分。在 BeginWrite 的时候没有阻塞,而是在 EndWrite 的时候阻塞了程序。
7.3、任务模式
接下来我们把 APM 模式转换为任务模式:
var task = Task.Factory.FromAsync(fs.BeginWrite(b, 0, b.Length, null, null), fs.EndWrite);
Debug.Log("WriteFileUsingTask FromAsync !");
task.Wait();
Debug.Log("WriteFileUsingTask Wait !");
//fs.Close();
在上述代码中,我们用 Task.Factory.FromAsync 替换了 BeginRead 方法,这是基于任务的异步模式(Task-based Asynchronous Pattern ,TAP)的一种方式。我这里使用了 Wait 方法来阻塞线程来获取结果,打印结果如下:
可以看到线程阻塞和之前是相同的,5s 创建字符串,1s 写入文件。当然,有时我们并不想等待这个写入文件完成,也可以不等待:
var task = Task.Factory.FromAsync(fs.BeginWrite(b, 0, b.Length, null, null), result =>
{
fs.EndWrite(result);
Debug.Log("最后写入文件完成 !");
});
Debug.Log("WriteFileUsingTask FromAsync !");
//task.Wait();
//fs.Close();
其结果如下:
8、将EAP模式转换为任务
基于事件的异步模式(Event-based Asynchronous Parrerns,EAP)常用于创建组件以包装那些成本很高的且很费时的操作。.NET Framework 中已使用此模式来创建组件,比如 BackgroundWorker 和 WebClient 。实现此模式的方法将在后台异步执行长时间运行的任务,但会通过事件不断向用户通知其进度和状态,所以称之为基于事件的异步模式。
这里我们写了方法,将上文 7、将APM模式转化为任务 生成的文件读取出来,然后统计每个字符(A~Z)的数量,最后将统计结果写入到另一个文件中。这个任务耗时还是很严重的,跑下来需要 26 秒。这个方法确实有点长了,就不贴上了,具体看示例工程的源码,毕竟和要讲的东西不很相关。
BackgroundWorker 的创建在上一章已经讲过了,不赘述,这是按照书上的示例改造成 EAP 模式:
private void RunWithBackgorundReadEAP()
{
//创建 TaskCompletionSource
var taskCompletionSource = new TaskCompletionSource<string>();
//省略,创建 BackgroundWorker
//设定工作方法
backgroundWorker.DoWork += TestFunction.ReadFile;
//将 TaskCompletionSource 当做参数传进去
backgroundWorker.RunWorkerAsync(taskCompletionSource);
RunningStringTask = taskCompletionSource.Task;
}
private Task<string> RunningStringTask;//用于轮询
//轮询,查询 RunningStringTask 状态
private void UpdateRnningStringTask()
{
if (RunningStringTask == null)
return;
switch (RunningStringTask.Status)
{
case TaskStatus.RanToCompletion:
Debug.LogError($"RunningStringTask 完成! {RunningStringTask.Result}");
RunningStringTask = null;
break;
}
}
之后在 工作方法(TestFunction.ReadFile)中做如下修改:
public static void ReadFile(object sender, DoWorkEventArgs e)
{
TaskCompletionSource<string> taskCompletionSource = e.Argument as TaskCompletionSource<string>;
……省略代码……
……读取并统计文件……
……省略代码……
taskCompletionSource?.SetResult($"统计完成:{StatisticsResultFilePath}");
}
运行结果如下:
可见创建 TaskCompletionSource 之后,线程就标记为后台等待了,之后完成时设置结果后,RunningStringTask 的状态就会切换为 TaskStatus.RanToCompletion。
与异步编程模型(APM)相比,将基于事件的异步模式(EAP)转换为基于任务的异步模式(TAP)非常棘手;因为它需要对 EAP 组件内部的性质有充分的了解,并且需要将新代码插入正确的事件中才能使其正常工作。
9、后续任务
当需要链接多个任务时,可以使用后续任务。第二个任务在第一个任务完成并将第一个任务的结果传递给子任务时开始。我们可以将多个任务连接在一起,创建一长串任务,或者也可以使用 TPL 提供的方法来创建选择性的延续链。
9.1、Task.ContinueWith
如下代码:
public static async Task<int> StartTask()
{
Debug.Log("任务链开始~");
await Task.Delay(1000);
Debug.Log("任务链完成 : 1");
return 1;
}
public static async Task<int> ContinueTask(Task<int> t)
{
int lastRet = Convert.ToInt32(t.Result);
lastRet++;
Debug.Log($"后续任务 {lastRet} 开始");
await Task.Delay(1000);
Debug.Log($"后续任务 {lastRet} 完成");
return lastRet;
}
现在需要第二个任务读取第一个任务的返回值,然后继续执行,调用代码如下:
private void RunWithContinueTask()
{
var task = Task.Run(TestFunction.StartTask);
task.ContinueWith(TestFunction.ContinueTask);
}
运行结果如下:
当然,也是可以链式串行多个后续任务的,代码如下:
private void RunWithContinueTask()
{
var task = Task.Run<int>(TestFunction.StartTask)
.ContinueWith(ret => TestFunction.ContinueTask(ret))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result))
.ContinueWith(ret => TestFunction.ContinueTask(ret.Result));
}
执行打印如下:
在执行后续任务时,可以增加参数 System.Threading.Tasks.TaskContinuationOptions 来控制后续任务如何执行。枚举值意义可参考以下链接:
TaskContinuationOptions Enum (System.Threading.Tasks) | Microsoft LearnSpecifies the behavior for a task that is created by using the ContinueWith(Action<Task>, CancellationToken, TaskContinuationOptions, TaskScheduler) or ContinueWith(Action<Task<TResult>>, TaskContinuationOptions) method.https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcontinuationoptions?view=netstandard-2.1
9.2、Task.Factory.ContinueWhenAll
使用 Task.Factory.ContinueWhenAll 可以等待多个任务,并且链接一个仅有所有任务完成后才能运行的后续任务。测试代码如下:
public static async Task<int> StartTaskWithDelayTime(int delayTime)
{
Debug.Log($"等待{delayTime} 开始!");
await Task.Delay(delayTime);
Debug.Log($"等待{delayTime} 结束!");
return delayTime;
}
public static void ContinueTaskWithAll(Task[] tasks)
{
int length = tasks.Length;
int ret = 0;
for (int i = 0; i < length; i++)
{
var task = tasks[i] as Task<int>;
if (task.IsCompletedSuccessfully)
ret += task.Result;
}
Debug.Log($"后续任务执行,之前已完成任务结果为:{ret}");
}
调用方法如下:
private void RunWithContinueWhenAll()
{
var task1 = Task.Run(() => TestFunction.StartTaskWithDelayTime(1000));
var task2 = Task.Run(() => TestFunction.StartTaskWithDelayTime(2000));
var task3 = Task.Run(() => TestFunction.StartTaskWithDelayTime(3000));
Task.Factory.ContinueWhenAll(new Task[] { task1, task2, task3 }, TestFunction.ContinueTaskWithAll);
}
运行结果如下:
9.3、Task.Factory.ContinueWhenAny
这个逻辑大家想必都能猜出来了,就是前置任务完成任何一个,都可以执行后续任务。这里直接上调用代码,只需要做少量修改:
private void RunWithContinueWhenAny()
{
var task1 = Task.Run(() => TestFunction.StartTaskWithDelayTime(1000));
var task2 = Task.Run(() => TestFunction.StartTaskWithDelayTime(2000));
var task3 = Task.Run(() => TestFunction.StartTaskWithDelayTime(3000));
Task.Factory.ContinueWhenAny(new Task[] { task1, task2, task3 }, TestFunction.ContinueTaskWithAny);
}
由于 ContinueWhenAny 只支持传入1个Task参数,所以 TestFunction 中代码如下:
/// <summary>
/// ContinueWhenAny 只支持传入一个任务参数
/// </summary>
/// <param name="t"></param>
public static void ContinueTaskWithAny(Task t)
{
var task = t as Task<int>;
Debug.Log($"后续任务执行,之前已完成任务结果为:{task.Result}");
}
运行结果如下所示:
10、父任务和子任务
线程之间可能发生的另一种类型的关系是父子关系,子任务被创建为父任务(Parent Task)主体内的嵌套任务。子任务可以被创建为附加(Attached)或分离的(Detached)。两种类型的任务都在父任务内部创建,并且在默认情况下,创建的子任务是分离的。要将子任务指定为附加任务,可以将任务的 AttachedToParent 属性为 true。
在以下情况下可以考虑创建附加任务:
-
子任务中引发的所有异常都必须传播到父任务。
-
父任务的状态取决于子任务。
-
父任务需要等待子任务完成。
10.1、创建分离的子任务
TestFunction 中任务代码如下:
public static void ParentTask()
{
Debug.Log("父任务开始!");
Task.Run(ChildTask);
Debug.Log("父任务完成!");
}
public static async Task ChildTask()
{
Debug.Log("子任务开始!");
await Task.Delay(2000);
Debug.Log("子任务完成!");
}
调用方法如下:
private void CreateSplitChildTaks()
{
var task = Task.Run(TestFunction.ParentTask);
task.Wait();
Debug.Log("所有任务完成!");
}
运行结果如下:
可以看到,父任务并没有等待子任务完成,而是自己先完成了。虽然是嵌套的子任务,但是各自的生命周期是分离的。而且,值得注意的是:父任务是先完成了,子任务才开始。
10.2、创建一个附加任务
按照书上的写法,只需要做如下修改:
public static void ParetTaskAttacahed()
{
Debug.Log("父任务开始!");
Task.Factory.StartNew(ChildTask, TaskCreationOptions.AttachedToParent);
Debug.Log("父任务完成!");
}
结果如下:
可以看到就一个时序发生了变化,那就是子任务的开始在所有任务完成前。也就是在 Task.Wait 的过程中,子任务就已经开始了,而在分离子任务模式中则并不是。
11、工作窃取队列
工作窃取(Work-Stealing)是一种针对线程池的性能优化技术,每个线程池维护一个进程内部创建的单个全局任务队列。
ThreadPool 还维护着一个线程的全局队列 (Global Queue)。在该队列中,所有工作项目将进行排队,然后分配到可用线程。由于这是单个队列,并且我们在多线程场景中工作,因此需要使用同步原语(Synchronization Primitive)来实现线程安全。在使用单个全局队列的情况下,同步会导致性能下降。
.NET Framework 可通过引入由线程管理的本地队列(Local Queue)的概念来解决此性能损失问题。每个线程都可以访问全局队列,并且还维护其自己的线程本地队列以存储工作项。父任务可以在全局队列内调度任务。
当任务执行并需要创建子任务时,可以在线程完成执行后立即将他们堆叠在本地队列中,稍后将使用先进先出(First In First Out,FIFO)算法进行处理。
而子任务创建了新任务,则将进入本地队列而不是全局队列。并且一旦线程完成了一项任务,他会采用后进先出原则(Last In Firts Out,LIFO)算法(最近一个任务很有可能仍然在高速缓存中,因此不需要重新加载,可以提高性能)。
而如果线程 T1 耗尽了本地队列,就会先全局搜索(FIFO)。如果全局队列中没有任务,就会从其他线程的本地队列查找可执行任务(FIFO,注意:不是LIFO),这就是所谓的工作窃取。如下图所示:
12、本章总结
这一章节的实用性就很强了,基本上 Task 就是已经可以在工程里用了。对于 Unity 来说,有一个封装好的开源库 UniTask :
GitHub - Cysharp/UniTask: Provides an efficient allocation free async/await integration for Unity.
他做了一些优化,也可以选择使用 UniTask。
通过 11、工作窃取队列 的学习,其实我们就知道任务在多线程中是如何调度的。所以在写代码的时候,尽量将工作拆分成多个可独立工作的子任务,以最大化提高并行效率。