一文说通异步 LINQ
用不好異步 LINQ,基本上就等于用不好 LINQ 了。
LINQ 這個東西,出來很早了,寫過幾年代碼的兄弟們,或多或少都用過一些。
早期的 LINQ,主要是同步的,直到 C# 8.0 加入 IAsyncEnumerable,LINQ 才真正轉向異步。這本來是個非常好的改變,配合 System.Linq.Async 庫提供的擴展,可以在諸如 Where、Select、GroupBy 等各種地方用到異步。
但事實上,在我 Review 代碼時,見了很多人的代碼,并沒有按異步的規則去使用,出現了很多的坑。
舉個簡單的例子:
static?async?Task<List<T>>?Where<T>(this?IAsyncEnumerable<T>?source,?Func<T,?bool>?predicate) {var?filteredItems?=?new?List<T>();await?foreach?(var?item?in?source){if?(predicate(item)){filteredItems.Add(item);}}return?filteredItems; }這樣的寫法,看著是用到了 async / await 對,但實際上并沒有實現異步,程序依然是按照同步在運行。換句話說,這只是一個樣子上的異步,實際沒有任何延遲執行的效果。
1. 延遲執行
其實,這兒正確的寫法也挺簡單,用到的就是個異步的迭代器(關于異步迭代器,如果需要了解,可以看我的另一篇推文):
static?async?IAsyncEnumerable<T>?Where<T>(this?IAsyncEnumerable<T>?source,?Func<T,?bool>?predicate) {await?foreach?(var?item?in?source){if?(predicate(item)){yield?return?item;}} }這種寫法下,編譯器會將方法轉了狀態機,并在實際調用時,才通過枚舉器返回異步枚舉項。
看看調用過程:
IAsyncEnumerable<User>?users?=?... IAsyncEnumerable<User>?filteredUsers?=?users.Where(User?=>?User.Name?==?"WangPlus");await?foreach?(User?user?in?filteredUsers) {Console.WriteLine(user.Age); }在這個調用的例子中,在 Where 時,實際方法并不會馬上開始。只有在下面 foreach 時,才真正開始執行 Where 方法。
延遲執行,這是異步 LINQ 的第一個優勢。
2. 流執行
流執行,依托的也是異步迭代器。
所謂流執行,其實就是根據調用的要求,一次返回一個對象。通過使用異步迭代器,可以不用一次返回所有的對象,而是一個一個地返回單個的對象,直到枚舉完所有的對象。
流執行需要做個技巧性的代碼,需要用到一個 C# 8.0 的新特性:局部方法。
看代碼:
static?IAsyncEnumerable<T>?Where<T>(this?IAsyncEnumerable<T>?source,?Func<T,?bool>?predicate) {return?Core();async?IAsyncEnumerable<T>?Core(){await?foreach?(var?item?in?source){if?(predicate(item)){yield?return?item;}}} }3. 取消異步 LINQ
前面兩個小節,寫的是異步 LINQ 的執行。
通常使用異步 LINQ 的原因,就是因為執行時間長,一般需要一段時間來完成。因此,取消異步 LINQ 就很重要。想象一下,一個長的 DB 查詢已經超時了的情況,該怎么處理?
為了支持取消,IAsyncEnumerable.GetEnumerator 本身接受一個 CancellationToken 參數來中止任務,并用一個擴展方法掛接到 foreach 調用:
CancellationToken?cancellationToken?=?... IAsyncEnumerable<User>?users?=?... IAsyncEnumerable<User>?filteredUsers?=?users.Where(User?=>?User.Name?==?"WangPlus");await?foreach?(var?User?in?filteredUsers.WithCancellation(cancellationToken)) {Console.WriteLine(User.Age); }同時,在上面的 Where 定義中,也要響應 CancellationToken 參數:
static?IAsyncEnumerable<T>?Where<T>(this?IAsyncEnumerable<T>?source,?Func<T,?bool>?predicate) {return?Core();async?IAsyncEnumerable<T>?Core([EnumeratorCancellation]?CancellationToken?cancellationToken?=?default){await?foreach?(var?item?in?source.WithCancellation(cancellationToken)){if?(predicate(item)){yield?return?item;}}} }多解釋一下:在 Where 方法中,CancellationToken 只能加到局部函數 Core 中,一個簡單的原因是 Where 本身并不是異步方法,而且,我們也不希望從 Where 往里傳遞。想象一下:
Users.Where(xxx,?cancellationToken).Select(xxx,?cancellationToken).OrderBy(xxx,?cancellationToken);這樣的代碼會讓人暈死。
所以,我們會采用上面的方式,允許消費者在枚舉數據時傳遞 CancellationToken 來達到取消異步操作的目的。
4. 處理ConfigureAwait(false)
這是另一個異步必須要注意的部分,其實就是上下文。
通常大多數的方法,我們不需要關注上下文,但總有一些需要,在等待的異步操作恢復后,需要返回到某個上下文的情況。這種情況在 UI 線程編碼時通常都需要考慮。很多人提到的異步死鎖,就是這個原因。
處理也很簡單:
static?IAsyncEnumerable<T>?Where<T>(this?IAsyncEnumerable<T>?source,?Func<T,?bool>?predicate) {return?Core();async?IAsyncEnumerable<T>?Core([EnumeratorCancellation]?CancellationToken?cancellationToken?=?default){await?foreach?(var?item?in?source.WithCancellation(cancellationToken).ConfigureAwait(false)){if?(predicate(item)){yield?return?item;}}} }這兒也多說兩句:按微軟的說法,await foreach 本身是基于模式的,WithCancellation 和 ConfigureAwait 返回同樣的結構體 ConfiguredCancelableAsyncEnumerable。這個結構體沒有實現 IAsyncEnumerable 接口,而是做了一個 GetAsyncEnumerator 方法,返回一個具有 MoveNextAsync、Current、DisposeAsync 的枚舉器,因此可以 await foreach 。
5. 方法擴展
上面 4 個小節,我們完成了一個 Where 異步 LINQ 的全部內容。
不過,這個方法有一些限制和不足。熟悉異步的兄弟們應該已經看出來了,里面用了一個委托 predicate 來做數據過濾,而這個委托,是個同步的方法。
事實上,根據微軟對異步 LINQ 的約定,每個操作符應該是三種重載:
同步委托的實現,就是上面的 Where 方法;
異步委托的實現,這個是指具有異步返回類型的實現,通常這種方法名稱會用一個 Await 做后綴,例如:WhereAwait;
可以接受取消的異步委托的實現,通常這種方法會用 AwaitWithCancellation 做后綴,例如:WhereAwaitWithCancellation。
參考微軟的異步方法,基本上都是以這種結構來命名方法名稱的。
下面,我們也按這個方式,來做一個 Where 方法的幾個重載。
WhereAwait 方法
上面說了,這會是一個異步實現。所以,條件部分就不能用?Func<T, bool>?這樣的同步委托了,而需要改為?Func<T, ValueTask<bool>>。這里的 ValueTask 倒不是必須,用 Task 也可以,只不過我更習慣用 ValueTask。兩個的區別:Task 是類,有上下文,而 ValueTask 是結構。
代碼是這樣:
static?IAsyncEnumerable<T>?WhereAwait<T>(this?IAsyncEnumerable<T>?source,?Func<T,?ValueTask<bool>>?predicate) {return?Core();async?IAsyncEnumerable<T>?Core([EnumeratorCancellation]?CancellationToken?cancellationToken?=?default){await?foreach?(var?item?in?source.WithCancellation(cancellationToken).ConfigureAwait(false)){if?(await?predicate(item).ConfigureAwait(false)){yield?return?item;}}} }調用時是這樣:
IAsyncEnumerable<User>?filteredUsers?=?users.WhereAwait(async?user?=>?await?someIfFunction());WhereAwaitWithCancellation方法
在上面的基礎上,又加了一個取消操作。
看代碼:
static?IAsyncEnumerable<T>?WhereAwaitWithCancellation<T>(this?IAsyncEnumerable<T>?source,?Func<T,?CancellationToken,?ValueTask<bool>>?predicate) {return?Core();async?IAsyncEnumerable<T>?Core([EnumeratorCancellation]?CancellationToken?cancellationToken?=?default){await?foreach?(var?item?in?source.WithCancellation(cancellationToken).ConfigureAwait(false)){if?(await?predicate(item,?cancellationToken).ConfigureAwait(false)){yield?return?item;}}} }調用時是這樣:
IAsyncEnumerable<User>?filteredUsers?=?users.WhereAwaitWithCancellation(async?(user,?token)?=>?await?someIfFunction(user,?token));6. 總結
異步 LINQ,多數是在 LINQ 的擴展方法中使用,而不是我們通常習慣的 LINQ 直寫。
事實上,異步 LINQ 的擴展,對 LINQ 本身是有比較大的強化作用的,不管從性能,還是可讀性上,用多了,只會更爽。
喜歡就來個三連,讓更多人因你而受益
總結
以上是生活随笔為你收集整理的一文说通异步 LINQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Dapr牵手.NET学习笔记:想入非非的
- 下一篇: 12 个问题搞懂 Redis