C# 多线程六之Task(任务)三之任务工厂
1、知識回顧,簡要概述
前面兩篇關于Task的隨筆,C# 多線程五之Task(任務)一?和?C# 多線程六之Task(任務)二,介紹了關于Task的一些基本的用法,以及一些使用的要點,如果都看懂了,本文將介紹另一個Task的特殊用法,前面介紹了,如何通過一個父任務創建多個子任務,且這些子任務都必須要支持取消的例子,常規做法是,通過new 一個Task數組對象,然后在該對象的內部創建多個Task任務,然后給這些任務指定TaskCreationOptions.AttachedToParent,這樣所有的子任務都關聯到了父任務,接著給這些子任務,綁定一個CancellationToken類實例,當其中一個子任務發生異常時,調用CancellationToken類實例的Cancel方法,將其余的子任務全都取消,大致代碼如下:
static void Main(string[] args){var parentTask = new Task<int[]>(() =>{var results = new int[3];var cancelTokenSource = new CancellationTokenSource();var childTasks = new Task[] {new Task(() => results[0] = ChildThreadOne(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),new Task(() => results[1] = ChildThreadTwo(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),new Task(() => results[2] = ChildThreadThree(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),};//開啟所有的子任務childTasks.ForEach(f => { f.Start(); });//如果有子任務發生異常,那么通過取消信號量終止所有的任務childTasks.ForEach(f =>{f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});return results;});parentTask.Start();parentTask.ContinueWith(x =>{Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新線程,將父任務的返回值(子任務的返回值)輸出,所以這里不會有任何的線程發生阻塞");foreach (var re in parentTask.Result){Console.WriteLine("子任務的返回值分別為:{0}", re);}});Console.WriteLine("主線程不會阻塞,它會繼續執行");Console.ReadKey();//必須加這行代碼,因為Task時線程池線程,屬于后臺線程 }/// <summary>/// 子任務一/// </summary>static int ChildThreadOne(CancellationToken token){Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務一完成了計算任務,并返回值:{0}", 6);return 6;}/// <summary>/// 子任務二/// </summary>static int ChildThreadTwo(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();throw new Exception("模擬拋出異常");}/// <summary>/// 子任務三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務三完成了計算任務,并返回值:{0}", 6);return 6;}}/// <summary>/// Linq擴展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}這里需要注意,這里給父任務parentTask開啟了三個子任務,并且通過TaskCreationOptions.AttachedToParent指定了所有的子任務不能獨立于父任務運行,并且給所有的子任務,傳遞了CancellationToken信號量,當其中一個子任務發生異常時,所有其余的子任務都終止,但是你必須知道的是,你沒有判斷哪個任務會被終止,因為如果不指定線程優先級,哪怕制定了優先級,你也無法確定的判斷某個計算任務在什么時候會調度完,所以我給正常的執行的任務,Sleep了三秒,拋出異常的任務Sleep了兩秒,所以所有的子線程都無法執行完畢.
?
2、代碼重構
?ok,雖然上面的代碼很好的完成了我們在代碼層面的需求,但是處于對代碼的重用性考慮,有沒有發現這個問題:
這塊操作,可以重構的,因為所有的參數都一樣,當然你可以去抽象一個共有的方法,里面放一個Func委托,當然把參數抽象出來,形成一個公共的方法,像下面這樣做:
class Program{private static CancellationTokenSource cancelTokenSource = new CancellationTokenSource();private static TaskCreationOptions taskCreationOptions = TaskCreationOptions.AttachedToParent;static void Main(string[] args){var parentTask = new Task<int[]>(() =>{var results = new int[3];var childTasks = new Task[] {ExecuteChildThread(task=> results[0]=ChildThreadOne(cancelTokenSource.Token)),ExecuteChildThread(task=> results[1]=ChildThreadTwo(cancelTokenSource.Token)),ExecuteChildThread(task=> results[2]=ChildThreadThree(cancelTokenSource.Token))};//開啟所有的子任務childTasks.ForEach(f => { f.Start(); });//如果有子任務發生異常,那么通過取消信號量終止所有的任務childTasks.ForEach(f =>{f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});return results;});parentTask.Start();parentTask.ContinueWith(x =>{Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新線程,將父任務的返回值(子任務的返回值)輸出,所以這里不會有任何的線程發生阻塞");foreach (var re in parentTask.Result){Console.WriteLine("子任務的返回值分別為:{0}", re);}});Console.WriteLine("主線程不會阻塞,它會繼續執行");Console.ReadKey();//必須加這行代碼,因為Task時線程池線程,屬于后臺線程 }/// <summary>/// 子任務一/// </summary>static int ChildThreadOne(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務一完成了計算任務,并返回值:{0}", 6);return 6;}/// <summary>/// 子任務二/// </summary>static int ChildThreadTwo(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務二完成了計算任務,并返回值:{0}", 6);return 6;}/// <summary>/// 子任務三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務三完成了計算任務,并返回值:{0}", 6);return 6;}/// <summary>/// 創建一個通用的子線程方法,里面封裝了所有子線程的需要設置的公共參數/// </summary>/// <param name="func"></param>/// <returns></returns>static Task<int> ExecuteChildThread(Func<CancellationToken, int> func){var t=new Task<int>(()=>func.Invoke(cancelTokenSource.Token), cancelTokenSource.Token, taskCreationOptions);return t;}}/// <summary>/// Linq擴展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}ok,通過對子任務的抽象,你可以這么干,但是MS提供了更好的辦法,你又何必重復造輪子呢?而且這里存在著潛在的多線程爭用問題,
所有的線程都用到了這兩個全局變量,最好加個鎖,但是加了鎖之后,性能就會受到影響.
但是奇怪的是,我無法重現,如果你能重現那是最好的,下面就開始介紹Ms提供的任務工廠
?
3、任務工廠實戰
下面再次對上面的方法進行重構,用任務工廠的方式,首先使用TaskFactory任務工廠的前提你必須清楚,就是創建的子任務,必須是一組共享配置的子任務對象集,所以,如果當中如果某個子任務需要使用特殊的配置,那就不能使用任務工廠,也不是不能使用,就是那個子任務你必須獨立出來,不能放到任務工廠里面.ok,了解了前提條件后,開始實踐,代碼如下:
class Program{static void Main(string[] args){var parentTask=Task.Run(()=> {var cts = new CancellationTokenSource();//通過TaskFactory設置子任務的公共參數var tf = new TaskFactory<int>(cts.Token,TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);//通過TaskFactory設置所有的子任務,這些子任務共享上面公共參數var childTasks = new Task<int>[] {tf.StartNew(() => ChildThreadOne(cts.Token)),tf.StartNew(() => ChildThreadTwo(cts.Token)),tf.StartNew(() => ChildThreadThree(cts.Token))};//如果子任務發生異常,則向余下沒有執行完畢的子任務傳遞取消執行的信號,如果有子任務執行完畢了,那就沒有辦法了childTasks.ForEach(f =>{f.ContinueWith(childTask => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);});//遍歷所有通過TaskFactory創建的子任務,然后篩選出沒有被取消和沒有發生異常的子任務,或者這些任務中的最大返回值//這個任務不阻塞線程,只有當所有的子任務執行完畢之后,CLR會喚起線程池中的一個新線程來執行這個操作//通過給喚起子線程設置CancellationToken.None,來達到這個線程不會被任何因素來取消該線程的目的var tfTask = tf.ContinueWhenAll(childTasks,completedTasks => completedTasks.Where(completedTask => !completedTask.IsCanceled && !completedTask.IsFaulted).Max(completedTask => completedTask.Result), CancellationToken.None);//輸出所有符合要求的子任務集合的返回值集合中的最大值,并指定該任務,在tfTask任務的基礎上同步執行的效果通過TaskContinuationOptions.ExecuteSynchronouslytfTask.ContinueWith(childTasksCompleteTask =>{Console.WriteLine("The Max Return Value is {0}", childTasksCompleteTask.Result);},TaskContinuationOptions.ExecuteSynchronously);});Console.WriteLine("主線程繼續做它的事情");Console.ReadKey();//必須加這行代碼,因為Task時線程池線程,屬于后臺線程 }/// <summary>/// 子任務一/// </summary>static int ChildThreadOne(CancellationToken token){var returnValue = 6;Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務一完成了計算任務,并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務二/// </summary>static int ChildThreadTwo(CancellationToken token){var returnValue = 66;Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務二完成了計算任務,并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作throw new Exception("模擬拋出異常");}}/// <summary>/// Linq擴展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}因為我給異常線程設置了2秒的休眠時間,正常子線程設置了3秒的休眠時間,所以所有的線程都沒有執行完畢,就被取消掉了.如果修改下正常線程的休眠時間為1秒,將會得到以下的輸出:
so,TaskFactory完美的完成了它的任務,且不會有任務線程發生阻塞的情況。
?
4、如何解決任務工廠拋出的異常
我發現一個很奇怪的問題,就是當當外部通過一個Task.Run創建的父任務,無法獲取TaskFactory下子任務集群拋出的異常,代碼如下:
class Program{static void Main(string[] args){var pTask = Task.Run(() =>{var cts = new CancellationTokenSource();//通過TaskFactory設置子任務的公共參數var tf = new TaskFactory<int>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);//通過TaskFactory設置所有的子任務,這些子任務共享上面公共參數var childTasks = new Task<int>[] {tf.StartNew(() => ChildThreadOne(cts.Token)),tf.StartNew(() => ChildThreadTwo(cts.Token)),tf.StartNew(() => ChildThreadThree(cts.Token))};});pTask.ContinueWith(tasks =>{var exceptions = tasks.Exception;foreach (var ex in exceptions.InnerExceptions){Console.WriteLine(ex);}},TaskContinuationOptions.OnlyOnFaulted);Console.WriteLine("主線程繼續做它的事情");Console.ReadKey();//必須加這行代碼,因為Task時線程池線程,屬于后臺線程 }/// <summary>/// 子任務一/// </summary>static int ChildThreadOne(CancellationToken token){var returnValue = 6;Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務一完成了計算任務,并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務二/// </summary>static int ChildThreadTwo(CancellationToken token){var returnValue = 66;Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested();Console.WriteLine("子任務二完成了計算任務,并返回值:{0}", returnValue);return returnValue;}/// <summary>/// 子任務三/// </summary>static int ChildThreadThree(CancellationToken token){Thread.Sleep(2000);//模擬長時間計算操作throw new Exception("模擬拋出異常");}}/// <summary>/// Linq擴展/// </summary>public static class LinqExtension{public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action){foreach (var item in enumerators){action(item);}}}很其怪,不過這說明,外部的父任務,無法和TaskFactory建立關聯,如果你們能找到方法,歡迎在下面評論區評論,因為這個所以,要處理子任務拋出的異常.只能通過過濾異常子任務,然后在子任務里單獨記錄日志的方式,去處理:
暫時沒有想到更好的辦法.
?
轉載于:https://www.cnblogs.com/GreenLeaves/p/10088635.html
總結
以上是生活随笔為你收集整理的C# 多线程六之Task(任务)三之任务工厂的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第四课:PHP 变量
- 下一篇: 进程初识和multiprocessing