一文說通異步 LINQ
LINQ 這個東西,出來很早了,寫過幾年代碼的兄弟們,或多或少都用過一些。
早期的 LINQ,主要是同步的,直到 C# 8.0 加入 IAsyncEnumerable,LINQ 才真正轉(zhuǎn)向異步。這本來是個非常好的改變,配合 System.Linq.Async 庫提供的擴展,可以在諸如 Where、Select、GroupBy 等各種地方用到異步。
但事實上,在我 Review 代碼時,見了很多人的代碼,并沒有按異步的規(guī)則去使用,出現(xiàn)了很多的坑。
舉個簡單的例子:
- static async Task<List<T>> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- var filteredItems = new List<T>();
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- filteredItems.Add(item);
- }
- }
- return filteredItems;
- }
這樣的寫法,看著是用到了 async / await 對,但實際上并沒有實現(xiàn)異步,程序依然是按照同步在運行。換句話說,這只是一個樣子上的異步,實際沒有任何延遲執(zhí)行的效果。
1. 延遲執(zhí)行
其實,這兒正確的寫法也挺簡單,用到的就是個異步的迭代器(關(guān)于異步迭代器,如果需要了解,可以看我的另一篇推文):
- static async IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
這種寫法下,編譯器會將方法轉(zhuǎn)了狀態(tài)機,并在實際調(diào)用時,才通過枚舉器返回異步枚舉項。
看看調(diào)用過程:
- IAsyncEnumerable<User> users = ...
- IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");
- await foreach (User user in filteredUsers)
- {
- Console.WriteLine(user.Age);
- }
在這個調(diào)用的例子中,在 Where 時,實際方法并不會馬上開始。只有在下面 foreach 時,才真正開始執(zhí)行 Where 方法。
延遲執(zhí)行,這是異步 LINQ 的第一個優(yōu)勢。
2. 流執(zhí)行
流執(zhí)行,依托的也是異步迭代器。
所謂流執(zhí)行,其實就是根據(jù)調(diào)用的要求,一次返回一個對象。通過使用異步迭代器,可以不用一次返回所有的對象,而是一個一個地返回單個的對象,直到枚舉完所有的對象。
流執(zhí)行需要做個技巧性的代碼,需要用到一個 C# 8.0 的新特性:局部方法。
看代碼:
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core()
- {
- await foreach (var item in source)
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
3. 取消異步 LINQ
前面兩個小節(jié),寫的是異步 LINQ 的執(zhí)行。
通常使用異步 LINQ 的原因,就是因為執(zhí)行時間長,一般需要一段時間來完成。因此,取消異步 LINQ 就很重要。想象一下,一個長的 DB 查詢已經(jīng)超時了的情況,該怎么處理?
為了支持取消,IAsyncEnumerable.GetEnumerator 本身接受一個 CancellationToken 參數(shù)來中止任務(wù),并用一個擴展方法掛接到 foreach 調(diào)用:
- CancellationToken cancellationToken = ...
- IAsyncEnumerable<User> users = ...
- IAsyncEnumerable<User> filteredUsers = users.Where(User => User.Name == "WangPlus");
- await foreach (var User in filteredUsers.WithCancellation(cancellationToken))
- {
- Console.WriteLine(User.Age);
- }
同時,在上面的 Where 定義中,也要響應(yīng) CancellationToken 參數(shù):
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken))
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
多解釋一下:在 Where 方法中,CancellationToken 只能加到局部函數(shù) Core 中,一個簡單的原因是 Where 本身并不是異步方法,而且,我們也不希望從 Where 往里傳遞。想象一下:
- Users.Where(xxx, cancellationToken).Select(xxx, cancellationToken).OrderBy(xxx, cancellationToken);
這樣的代碼會讓人暈死。
所以,我們會采用上面的方式,允許消費者在枚舉數(shù)據(jù)時傳遞 CancellationToken 來達到取消異步操作的目的。
4. 處理ConfigureAwait(false)
這是另一個異步必須要注意的部分,其實就是上下文。
通常大多數(shù)的方法,我們不需要關(guān)注上下文,但總有一些需要,在等待的異步操作恢復(fù)后,需要返回到某個上下文的情況。這種情況在 UI 線程編碼時通常都需要考慮。很多人提到的異步死鎖,就是這個原因。
處理也很簡單:
- static IAsyncEnumerable<T> Where<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (predicate(item))
- {
- yield return item;
- }
- }
- }
- }
這兒也多說兩句:按微軟的說法,await foreach 本身是基于模式的,WithCancellation 和 ConfigureAwait 返回同樣的結(jié)構(gòu)體 ConfiguredCancelableAsyncEnumerable。這個結(jié)構(gòu)體沒有實現(xiàn) IAsyncEnumerable 接口,而是做了一個 GetAsyncEnumerator 方法,返回一個具有 MoveNextAsync、Current、DisposeAsync 的枚舉器,因此可以 await foreach 。
5. 方法擴展
上面 4 個小節(jié),我們完成了一個 Where 異步 LINQ 的全部內(nèi)容。
不過,這個方法有一些限制和不足。熟悉異步的兄弟們應(yīng)該已經(jīng)看出來了,里面用了一個委托 predicate 來做數(shù)據(jù)過濾,而這個委托,是個同步的方法。
事實上,根據(jù)微軟對異步 LINQ 的約定,每個操作符應(yīng)該是三種重載:
- 同步委托的實現(xiàn),就是上面的 Where 方法;
- 異步委托的實現(xiàn),這個是指具有異步返回類型的實現(xiàn),通常這種方法名稱會用一個 Await 做后綴,例如:WhereAwait;
- 可以接受取消的異步委托的實現(xiàn),通常這種方法會用 AwaitWithCancellation 做后綴,例如:WhereAwaitWithCancellation。
參考微軟的異步方法,基本上都是以這種結(jié)構(gòu)來命名方法名稱的。
下面,我們也按這個方式,來做一個 Where 方法的幾個重載。
WhereAwait 方法
上面說了,這會是一個異步實現(xiàn)。所以,條件部分就不能用 Func
代碼是這樣:
- static IAsyncEnumerable<T> WhereAwait<T>(this IAsyncEnumerable<T> source, Func<T, ValueTask<bool>> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (await predicate(item).ConfigureAwait(false))
- {
- yield return item;
- }
- }
- }
- }
調(diào)用時是這樣:
- IAsyncEnumerable<User> filteredUsers = users.WhereAwait(async user => await someIfFunction());
在上面的基礎(chǔ)上,又加了一個取消操作。
看代碼:
- static IAsyncEnumerable<T> WhereAwaitWithCancellation<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask<bool>> predicate)
- {
- return Core();
- async IAsyncEnumerable<T> Core([EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
- {
- if (await predicate(item, cancellationToken).ConfigureAwait(false))
- {
- yield return item;
- }
- }
- }
- }
調(diào)用時是這樣:
IAsyncEnumerable
6. 總結(jié)
異步 LINQ,多數(shù)是在 LINQ 的擴展方法中使用,而不是我們通常習(xí)慣的 LINQ 直寫。
事實上,異步 LINQ 的擴展,對 LINQ 本身是有比較大的強化作用的,不管從性能,還是可讀性上,用多了,只會更爽。