TPL 和传统 .NET Framework 异步编程
.NET Framework 提供了以下兩種標準模式,用于執行 I/O 密集型和計算密集型異步操作:
-
異步編程模型 (APM),其中異步操作由一對 Begin/End 方法(如?FileStream.BeginRead?和Stream.EndRead)表示。
-
基于事件的異步模式 (EAP),其中異步操作由 OperationName Async 和 OperationName Completed 方法/事件對(如?WebClient.DownloadStringAsync?和?WebClient.DownloadStringCompleted)表示。?(EAP 是在 .NET Framework 2.0 版本中引入的。)
任務并行庫 (TPL) 可采用各種方法與任一異步模式協同使用。?可將 APM 和 EAP 操作作為任務向庫使用者公開,也可以公開 APM 模式但用 Task 對象在內部實現它們。?在這兩種情況下,可通過使用 Task 對象簡化代碼和利用以下有用的功能:
-
在任務開始后隨時以任務延續形式注冊回調。
-
通過使用?ContinueWhenAll?和?ContinueWhenAny?方法,或者?WaitAll?方法或?WaitAny?方法并列為響應?Begin_?方法而執行的多個操作。
-
封裝同一 Task 對象中的異步 I/O 密集型和計算密集型操作。
-
監視 Task 對象的狀態。
-
使用?TaskCompletionSource<TResult>?將操作狀態封送處理至 Task 對象。
在任務中包裝 APM 操作
System.Threading.Tasks.TaskFactory?和?System.Threading.Tasks.TaskFactory<TResult>?類都提供了幾個?TaskFactory.FromAsync?和?TaskFactory<TResult>.FromAsync?方法的重載,可以將 APM Begin/End 方法對封裝在?Task?或?Task<TResult>?實例中。?各種重載都可容納任何具有零至三個輸入參數的 Begin/End 方法對。
對于具有返回值(在 Visual Basic 中為?Function)的?End?方法的對,使用?TaskFactory<TResult>?中創建?Task<TResult>?的方法。?對于具有返回 void(在 Visual Basic 中為?Sub)的?End?方法,使用?TaskFactory?中創建?Task?的方法。
在極少情況下,如果?Begin?方法具有三個以上參數或包含?ref?或?out?參數,則提供僅封裝?End?方法的其他?FromAsync?重載。
下面的示例顯示了匹配?FileStream.BeginRead?和?FileStream.EndRead?方法的?FromAsync?重載的簽名。?此重載采用三個輸入參數,如下所示。
public Task<TResult> FromAsync<TArg1, TArg2, TArg3>(Func<TArg1, TArg2, TArg3, AsyncCallback, object, IAsyncResult> beginMethod, //BeginReadFunc<IAsyncResult, TResult> endMethod, //EndReadTArg1 arg1, // the byte[] bufferTArg2 arg2, // the offset in arg1 at which to start writing dataTArg3 arg3, // the maximum number of bytes to readobject state // optional state information) Public Function FromAsync(Of TArg1, TArg2, TArg3)(ByVal beginMethod As Func(Of TArg1, TArg2, TArg3, AsyncCallback, Object, IAsyncResult),ByVal endMethod As Func(Of IAsyncResult, TResult),ByVal dataBuffer As TArg1,ByVal byteOffsetToStartAt As TArg2,ByVal maxBytesToRead As TArg3,ByVal stateInfo As Object)第一個參數是匹配?FileStream.BeginRead?方法簽名的?Func<T1,T2,T3,T4,T5,TResult>?委托。?第二個參數使用?IAsyncResult?并返回?TResult?的?Func<T,TResult>?委托。?由于?EndRead?返回一個整數,因此編譯器會將?TResult?類型推斷為?Int32?并將任務類型推斷為?Task。?最后第四個參數與?FileStream.BeginRead?方法中的參數相同:
-
存儲文件數據的緩沖區。
-
開始寫入數據的緩沖區的偏移量。
-
要從文件中讀取的最大數據量。
-
存儲要傳遞至回調的用戶定義狀態數據的可選對象。
使用 ContinueWith 執行回調功能
如果需要訪問文件中的數據,而不僅僅訪問字節數,則?FromAsync?方法不能滿足此操作。?請改用?Task,其?Result?屬性包含文件數據。?可以通過向原始任務添加延續來實現這種操作。?延續執行通常由?AsyncCallback?委托執行的任務。?先前任務完成且填充了數據緩沖區后調用此操作。?(FileStream?對象應在返回前關閉。)
下面的示例演示如何返回封裝?FileStream?類的 BeginRead/EndRead 對的?Task。
const int MAX_FILE_SIZE = 14000000; public static Task<string> GetFileStringAsync(string path) {FileInfo fi = new FileInfo(path);byte[] data = null;data = new byte[fi.Length];FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, data.Length, true);//Task<int> returns the number of bytes readTask<int> task = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, data, 0, data.Length, null);// It is possible to do other work here while waiting// for the antecedent task to complete.// ...// Add the continuation, which returns a Task<string>. return task.ContinueWith((antecedent) =>{fs.Close();// Result = "number of bytes read" (if we need it.)if (antecedent.Result < 100){return "Data is too small to bother with.";}else{// If we did not receive the entire file, the end of the// data buffer will contain garbage.if (antecedent.Result < data.Length)Array.Resize(ref data, antecedent.Result);// Will be returned in the Result property of the Task<string>// at some future point after the asynchronous file I/O operation completes.return new UTF8Encoding().GetString(data);}}); } Const MAX_FILE_SIZE As Integer = 14000000 Shared Function GetFileStringAsync(ByVal path As String) As Task(Of String)Dim fi As New FileInfo(path)Dim data(fi.Length) As ByteDim fs As FileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, data.Length, True)' Task(Of Integer) returns the number of bytes readDim myTask As Task(Of Integer) = Task(Of Integer).Factory.FromAsync(AddressOf fs.BeginRead, AddressOf fs.EndRead, data, 0, data.Length, Nothing)' It is possible to do other work here while waiting' for the antecedent task to complete.' ...' Add the continuation, which returns a Task<string>. Return myTask.ContinueWith(Function(antecedent)fs.Close()If (antecedent.Result < 100) ThenReturn "Data is too small to bother with."End If' If we did not receive the entire file, the end of the' data buffer will contain garbage.If (antecedent.Result < data.Length) ThenArray.Resize(data, antecedent.Result)End If' Will be returned in the Result property of the Task<string>' at some future point after the asynchronous file I/O operation completes.Return New UTF8Encoding().GetString(data)End Function)End Function然后可調用此方法,如下所示。
Task<string> t = GetFileStringAsync(path); // Do some other work:// ...try{Console.WriteLine(t.Result.Substring(0, 500));}catch (AggregateException ae){Console.WriteLine(ae.InnerException.Message);} Dim myTask As Task(Of String) = GetFileStringAsync(path)' Do some other work ' ...TryConsole.WriteLine(myTask.Result.Substring(0, 500)) Catch ex As AggregateExceptionConsole.WriteLine(ex.InnerException.Message) End Try提供自定義狀態數據
在通常的?IAsyncResult?操作中,如果AsyncCallback?委托需要一些自定義狀態數據,則必須通過?Begin?方法中的最后一個參數將它傳入,以便可將數據打包到最終要傳遞至回調方法的?IAsyncResult?對象中。?當使用?FromAsync?方法時,通常無需此操作。?如果延續知道自定義數據,可直接在延續委托中捕獲它。?下面的示例與以前的示例類似,但延續檢查此延續的用戶委托可直接訪問的自定義狀態數據,而不是檢查歷史任務的?Result?屬性。
public Task<string> GetFileStringAsync2(string path) { FileInfo fi = new FileInfo(path);byte[] data = new byte[fi.Length]; MyCustomState state = GetCustomState();FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, data.Length, true);// We still pass null for the last parameter because// the state variable is visible to the continuation delegate.Task<int> task = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, data, 0, data.Length, null);return task.ContinueWith((antecedent) =>{// It is safe to close the filestream now.fs.Close();// Capture custom state data directly in the user delegate.// No need to pass it through the FromAsync method.if (state.StateData.Contains("New York, New York")){return "Start spreading the news!";}else{// If we did not receive the entire file, the end of the// data buffer will contain garbage.if (antecedent.Result < data.Length)Array.Resize(ref data, antecedent.Result);// Will be returned in the Result property of the Task<string>// at some future point after the asynchronous file I/O operation completes.return new UTF8Encoding().GetString(data);}}); } Public Function GetFileStringAsync2(ByVal path As String) As Task(Of String)Dim fi = New FileInfo(path)Dim data(fi.Length) As ByteDim state As New MyCustomState()Dim fs As New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, data.Length, True)' We still pass null for the last parameter because' the state variable is visible to the continuation delegate.Dim myTask As Task(Of Integer) = Task(Of Integer).Factory.FromAsync(AddressOf fs.BeginRead, AddressOf fs.EndRead, data, 0, data.Length, Nothing)Return myTask.ContinueWith(Function(antecedent)fs.Close()' Capture custom state data directly in the user delegate.' No need to pass it through the FromAsync method.If (state.StateData.Contains("New York, New York")) ThenReturn "Start spreading the news!"End If' If we did not receive the entire file, the end of the' data buffer will contain garbage.If (antecedent.Result < data.Length) ThenArray.Resize(data, antecedent.Result)End If'/ Will be returned in the Result property of the Task<string>'/ at some future point after the asynchronous file I/O operation completes.Return New UTF8Encoding().GetString(data)End Function)End Function同步多個 FromAsync 任務
當結合使用?FromAsync?方法時,靜態?ContinueWhenAll?和?ContinueWhenAny?方法具有更大的靈活性。?下面的示例顯示如何啟動多個異步 I/O 操作,然后等待所有這些操作都完成后再執行延續。
public Task<string> GetMultiFileData(string[] filesToRead) {FileStream fs;Task<string>[] tasks = new Task<string>[filesToRead.Length];byte[] fileData = null;for (int i = 0; i < filesToRead.Length; i++){fileData = new byte[0x1000];fs = new FileStream(filesToRead[i], FileMode.Open, FileAccess.Read, FileShare.Read, fileData.Length, true);// By adding the continuation here, the // Result of each task will be a string.tasks[i] = Task<int>.Factory.FromAsync(fs.BeginRead, fs.EndRead, fileData, 0, fileData.Length, null).ContinueWith((antecedent) =>{fs.Close();// If we did not receive the entire file, the end of the// data buffer will contain garbage.if (antecedent.Result < fileData.Length)Array.Resize(ref fileData, antecedent.Result);// Will be returned in the Result property of the Task<string>// at some future point after the asynchronous file I/O operation completes.return new UTF8Encoding().GetString(fileData);});}// Wait for all tasks to complete. return Task<string>.Factory.ContinueWhenAll(tasks, (data) =>{// Propagate all exceptions and mark all faulted tasks as observed.Task.WaitAll(data);// Combine the results from all tasks.StringBuilder sb = new StringBuilder();foreach (var t in data){sb.Append(t.Result);}// Final result to be returned eventually on the calling thread.return sb.ToString();}); } Public Function GetMultiFileData(ByVal filesToRead As String()) As Task(Of String)Dim fs As FileStreamDim tasks(filesToRead.Length) As Task(Of String)Dim fileData() As Byte = NothingFor i As Integer = 0 To filesToRead.LengthfileData(&H1000) = New Byte()fs = New FileStream(filesToRead(i), FileMode.Open, FileAccess.Read, FileShare.Read, fileData.Length, True)' By adding the continuation here, the ' Result of each task will be a string.tasks(i) = Task(Of Integer).Factory.FromAsync(AddressOf fs.BeginRead,AddressOf fs.EndRead,fileData,0,fileData.Length,Nothing).ContinueWith(Function(antecedent)fs.Close()'If we did not receive the entire file, the end of the' data buffer will contain garbage.If (antecedent.Result < fileData.Length) ThenReDim Preserve fileData(antecedent.Result)End If'Will be returned in the Result property of the Task<string>' at some future point after the asynchronous file I/O operation completes.Return New UTF8Encoding().GetString(fileData)End Function)NextReturn Task(Of String).Factory.ContinueWhenAll(tasks, Function(data)' Propagate all exceptions and mark all faulted tasks as observed.Task.WaitAll(data)' Combine the results from all tasks.Dim sb As New StringBuilder()For Each t As Task(Of String) In datasb.Append(t.Result)Next' Final result to be returned eventually on the calling thread.Return sb.ToString()End Function) End Function僅用于 End 方法的 FromAsync 任務
在極少情況下,如果?Begin?方法需要三個以上的輸入參數,或具有?ref?或?out?參數,可以使用僅表示?End?方法的?FromAsync?重載,例如,TaskFactory<TResult>.FromAsync(IAsyncResult, Func<IAsyncResult,TResult>)。?這些方法還可用于傳遞?IAsyncResult?并將其封裝到 Task 的任何方案中。
static Task<String> ReturnTaskFromAsyncResult() {IAsyncResult ar = DoSomethingAsynchronously();Task<String> t = Task<string>.Factory.FromAsync(ar, _ =>{return (string)ar.AsyncState;});return t; } Shared Function ReturnTaskFromAsyncResult() As Task(Of String)Dim ar As IAsyncResult = DoSomethingAsynchronously()Dim t As Task(Of String) = Task(Of String).Factory.FromAsync(ar, Function(res) CStr(res.AsyncState))Return t End Function啟動和取消 FromAsync 任務
FromAsync?方法返回的任務具有 WaitingForActivation 狀態,并在創建任務后在某個時刻由操作系統啟動。?如果嘗試調用此類任務上的“啟動”,將引發異常。
無法取消?FromAsync?任務,因為基礎 .NET Framework API 目前不支持取消正在進行中的文件或網絡 I/O。?可以將取消功能添加到封裝?FromAsync?調用的方法中,但只能在調用?FromAsync?之前或在調用完成之后響應取消(例如,在延續任務中)。
一些支持 EAP 的類(如?WebClient)不支持取消,但可以通過使用取消標記集成該本機取消功能。
將復雜的 EAP 操作公開為任務
TPL 不提供任何專用于以?FromAsync?系列方法包裝?IAsyncResult?模式相同的方式封裝基于事件的異步操作的方法。?但是,TPL 會提供?System.Threading.Tasks.TaskCompletionSource<TResult>?類,此類可用于將任意一組操作表示為?Task<TResult>。?這些操作可能同步、可能異步,可能是 I/O 密集型、也可能是計算密集型,還可能兩者都是。
下面的示例顯示如何使用?TaskCompletionSource<TResult>?將一組異步?WebClient?操作作為基礎?Task<TResult>?向客戶端代碼公開。此方法允許輸入 Web URL 數組和術語或名稱來進行搜索,然后返回每個站點搜索字詞出現的次數。
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks;public class SimpleWebExample {public Task<string[]> GetWordCountsSimplified(string[] urls, string name,CancellationToken token){TaskCompletionSource<string[]> tcs = new TaskCompletionSource<string[]>();WebClient[] webClients = new WebClient[urls.Length];object m_lock = new object();int count = 0;List<string> results = new List<string>();// If the user cancels the CancellationToken, then we can use the// WebClient's ability to cancel its own async operations.token.Register(() =>{foreach (var wc in webClients){if (wc != null)wc.CancelAsync();}});for (int i = 0; i < urls.Length; i++){webClients[i] = new WebClient();#region callback// Specify the callback for the DownloadStringCompleted// event that will be raised by this WebClient instance.webClients[i].DownloadStringCompleted += (obj, args) =>{// Argument validation and exception handling omitted for brevity.// Split the string into an array of words,// then count the number of elements that match// the search term.string[] words = args.Result.Split(' ');string NAME = name.ToUpper();int nameCount = (from word in words.AsParallel()where word.ToUpper().Contains(NAME)select word).Count();// Associate the results with the url, and add new string to the array that// the underlying Task object will return in its Result property.lock (m_lock){results.Add(String.Format("{0} has {1} instances of {2}", args.UserState, nameCount, name));// If this is the last async operation to complete,// then set the Result property on the underlying Task.count++;if (count == urls.Length){tcs.TrySetResult(results.ToArray());}}};#endregion// Call DownloadStringAsync for each URL.Uri address = null;address = new Uri(urls[i]);webClients[i].DownloadStringAsync(address, address);} // end for// Return the underlying Task. The client code// waits on the Result property, and handles exceptions// in the try-catch block there.return tcs.Task;} } Imports System.Collections.Generic Imports System.Net Imports System.Threading Imports System.Threading.TasksPublic Class SimpleWebExampleDim tcs As New TaskCompletionSource(Of String())Dim token As CancellationTokenDim results As New List(Of String)Dim m_lock As New Object()Dim count As IntegerDim addresses() As StringDim nameToSearch As StringPublic Function GetWordCountsSimplified(ByVal urls() As String, ByVal str As String,ByVal token As CancellationToken) As Task(Of String())addresses = urlsnameToSearch = strDim webClients(urls.Length - 1) As WebClient' If the user cancels the CancellationToken, then we can use the' WebClient's ability to cancel its own async operations.token.Register(Sub()For Each wc As WebClient In webClientsIf wc IsNot Nothing Thenwc.CancelAsync()End IfNextEnd Sub)For i As Integer = 0 To urls.Length - 1webClients(i) = New WebClient()' Specify the callback for the DownloadStringCompleted' event that will be raised by this WebClient instance.AddHandler webClients(i).DownloadStringCompleted, AddressOf WebEventHandlerDim address As New Uri(urls(i))' Pass the address, and also use it for the userToken' to identify the page when the delegate is invoked.webClients(i).DownloadStringAsync(address, address)Next' Return the underlying Task. The client code' waits on the Result property, and handles exceptions' in the try-catch block there.Return tcs.TaskEnd FunctionPublic Sub WebEventHandler(ByVal sender As Object, ByVal args As DownloadStringCompletedEventArgs)If args.Cancelled = True Thentcs.TrySetCanceled()ReturnElseIf args.Error IsNot Nothing Thentcs.TrySetException(args.Error)ReturnElse' Split the string into an array of words,' then count the number of elements that match' the search term.Dim words() As String = args.Result.Split(" "c)Dim name As String = nameToSearch.ToUpper()Dim nameCount = (From word In words.AsParallel()Where word.ToUpper().Contains(name)Select word).Count()' Associate the results with the url, and add new string to the array that' the underlying Task object will return in its Result property.SyncLock (m_lock)results.Add(String.Format("{0} has {1} instances of {2}", args.UserState, nameCount, nameToSearch))count = count + 1If (count = addresses.Length) Thentcs.TrySetResult(results.ToArray())End IfEnd SyncLockEnd IfEnd Sub End Class有關包括其他異常處理且展示了如何通過客戶端代碼調用方法的更完整示例,請參閱如何:在任務中包裝 EAP 模式。
請記住,通過?TaskCompletionSource<TResult>?創建的任何任務均由 TaskCompletionSource 啟動,因此用戶代碼不應在此任務中調用 Start 方法。
使用任務實現 APM 模式
在某些情況下,可能需要通過使用 API 中 Begin/End 方法對直接公開?IAsyncResult?模式。?例如,可能想要與現有的 API 保持一致,或者可能具有需要這種模式的自動化工具。?在這種情況下,可使用任務來簡化在內部實現 APM 模式的方式。
下面的示例顯示如何使用任務實現長時間運行計算密集型方法的 APM Begin/End 方法對。
class Calculator {public IAsyncResult BeginCalculate(int decimalPlaces, AsyncCallback ac, object state){Console.WriteLine("Calling BeginCalculate on thread {0}", Thread.CurrentThread.ManagedThreadId);Task<string> f = Task<string>.Factory.StartNew(_ => Compute(decimalPlaces), state);if (ac != null) f.ContinueWith((res) => ac(f));return f;}public string Compute(int numPlaces){Console.WriteLine("Calling compute on thread {0}", Thread.CurrentThread.ManagedThreadId);// Simulating some heavy work.Thread.SpinWait(500000000);// Actual implemenation left as exercise for the reader.// Several examples are available on the Web.return "3.14159265358979323846264338327950288";}public string EndCalculate(IAsyncResult ar){Console.WriteLine("Calling EndCalculate on thread {0}", Thread.CurrentThread.ManagedThreadId);return ((Task<string>)ar).Result;} }public class CalculatorClient {static int decimalPlaces = 12;public static void Main(){Calculator calc = new Calculator();int places = 35;AsyncCallback callBack = new AsyncCallback(PrintResult);IAsyncResult ar = calc.BeginCalculate(places, callBack, calc);// Do some work on this thread while the calulator is busy.Console.WriteLine("Working...");Thread.SpinWait(500000);Console.ReadLine();}public static void PrintResult(IAsyncResult result){Calculator c = (Calculator)result.AsyncState;string piString = c.EndCalculate(result);Console.WriteLine("Calling PrintResult on thread {0}; result = {1}",Thread.CurrentThread.ManagedThreadId, piString);} } Class CalculatorPublic Function BeginCalculate(ByVal decimalPlaces As Integer, ByVal ac As AsyncCallback, ByVal state As Object) As IAsyncResultConsole.WriteLine("Calling BeginCalculate on thread {0}", Thread.CurrentThread.ManagedThreadId)Dim myTask = Task(Of String).Factory.StartNew(Function(obj) Compute(decimalPlaces), state)myTask.ContinueWith(Sub(antedecent) ac(myTask))End FunctionPrivate Function Compute(ByVal decimalPlaces As Integer)Console.WriteLine("Calling compute on thread {0}", Thread.CurrentThread.ManagedThreadId)' Simulating some heavy work.Thread.SpinWait(500000000)' Actual implemenation left as exercise for the reader.' Several examples are available on the Web.Return "3.14159265358979323846264338327950288"End FunctionPublic Function EndCalculate(ByVal ar As IAsyncResult) As StringConsole.WriteLine("Calling EndCalculate on thread {0}", Thread.CurrentThread.ManagedThreadId)Return CType(ar, Task(Of String)).ResultEnd Function End ClassClass CalculatorClientShared decimalPlaces As IntegerShared Sub Main()Dim calc As New CalculatorDim places As Integer = 35Dim callback As New AsyncCallback(AddressOf PrintResult)Dim ar As IAsyncResult = calc.BeginCalculate(places, callback, calc)' Do some work on this thread while the calulator is busy.Console.WriteLine("Working...")Thread.SpinWait(500000)Console.ReadLine()End SubPublic Shared Sub PrintResult(ByVal result As IAsyncResult)Dim c As Calculator = CType(result.AsyncState, Calculator)Dim piString As String = c.EndCalculate(result)Console.WriteLine("Calling PrintResult on thread {0}; result = {1}",Thread.CurrentThread.ManagedThreadId, piString)End SubEnd Class使用 StreamExtensions 示例代碼
在使用 .NET Framework 4 的并行編程示例中,Streamextensions.cs 文件包含多個引用實現,以將 Task 對象用于異步文件和網絡 I/O。
請參閱
- 任務并行庫 (TPL)
總結
以上是生活随笔為你收集整理的TPL 和传统 .NET Framework 异步编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 疯狂印钞后,美联储资产负债突破7.1万亿
- 下一篇: 12代酷睿封杀的AVX-512被AMD