如何在 C# 8 中使用 Channels
本文轉(zhuǎn)載自微信公眾號(hào)「碼農(nóng)讀書」,作者碼農(nóng)讀書。轉(zhuǎn)載本文請(qǐng)聯(lián)系碼農(nóng)讀書公眾號(hào)。
在面對(duì) 生產(chǎn)者-消費(fèi)者 的場(chǎng)景下, netcore 提供了一個(gè)新的命名空間 System.Threading.Channels 來(lái)幫助我們更高效的處理此類問題,有了這個(gè) Channels 存在, 生產(chǎn)者 和 消費(fèi)者 可以各自處理自己的任務(wù)而不相互干擾,有利于兩方的并發(fā)處理,這篇文章我們就來(lái)討論下如何使用 System.Threading.Channels。
Dataflow vs Channel
在 System.Threading.Tasks.Dataflow 命名空間下提供了一個(gè)數(shù)據(jù)流庫(kù),主要封裝了 存儲(chǔ) 和 處理 兩大塊,該庫(kù)專注于 pipeline 處理,而 System.Threading.Tasks.Channels 主要專注于 存儲(chǔ) 這塊,從單一職責(zé)上來(lái)說(shuō),在 生產(chǎn)者-消費(fèi)者 場(chǎng)景下,Channels 比 Dataflow 性能要高得多。
為什么要使用 Channels
可以利用 Channels 來(lái)實(shí)現(xiàn) 生產(chǎn)者和消費(fèi)者 之間的解耦,大體上有兩個(gè)好處:
- 生產(chǎn)者 和 消費(fèi)者 是相互獨(dú)立的,兩者可以并行執(zhí)行。
- 如果生產(chǎn)者不給力,可以創(chuàng)建多個(gè)的生產(chǎn)者,如果消費(fèi)者不給力,可以創(chuàng)建更多的消費(fèi)者。
總的來(lái)說(shuō),在 生產(chǎn)者-消費(fèi)者 模式下可以幫助我們提高應(yīng)用程序的吞吐率。
安裝 System.Threading.Channels
要想使用 Channel,需要用 nuget 引用 System.Threading.Channels 包,還可以通過 Visual Studio 2019 的 NuGet package manager 可視化界面安裝 或者 通過 NuGet package manager 命令行工具輸入以下命令:
- dotnet add package System.Threading.Channels
創(chuàng)建 channel
本質(zhì)上來(lái)說(shuō),你可以創(chuàng)建兩種類型的 channel,一種是有限容量的 bound channel,一種是無(wú)限容量的 unbound channel,接下來(lái)的問題是,如何創(chuàng)建呢?Channels 提供了兩種 工廠方法 用于創(chuàng)建,如下代碼所示:
- CreateBounded
創(chuàng)建的 channel 是一個(gè)有消息上限的通道。 - CreateUnbounded
創(chuàng)建的 channel 是一個(gè)無(wú)消息上限的通道。
下面的代碼片段展示了如何創(chuàng)建 unbounded channel,并且只能存放 string 類型。
- static void Main(string[] args)
- {
- var channel = Channel.CreateUnbounded<string>();
- }
對(duì)了,Bounded channel 還提供了一個(gè) FullMode 屬性,用于指定當(dāng) channel 已滿時(shí)該如何對(duì)插入的 message 進(jìn)行處理,通常有四種做法。
- Wait
- DropWrite
- DropNewest
- DropOldest
下面的代碼片段展示了如何在 Bounded channel 上使用 FullMode。
- static void Main(string[] args)
- {
- var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
- {
- FullMode = BoundedChannelFullMode.Wait
- });
- }
將消息寫入到 channel
要想將 message 寫入到 channel,可以使用 WriteAsync() 方法,如下代碼所示:
- static async Task Main(string[] args)
- {
- var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
- {
- FullMode = BoundedChannelFullMode.Wait
- });
- await channel.Writer.WriteAsync("Hello World!");
- }
從 channel 中讀取消息
要想從 channel 中讀取 message,可以使用 ReadAsync(),如下代碼所示:
- static async Task Main(string[] args)
- {
- var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
- {
- FullMode = BoundedChannelFullMode.Wait
- });
- while (await channel.Reader.WaitToReadAsync())
- {
- if (channel.Reader.TryRead(out var message))
- {
- Console.WriteLine(message);
- }
- }
- }
System.Threading.Channels 例子
下面是完整的代碼清單,展示了如何從 channel 中讀寫 message。
- class Program
- {
- static async Task Main(string[] args)
- {
- await SingleProducerSingleConsumer();
- Console.ReadKey();
- }
- public static async Task SingleProducerSingleConsumer()
- {
- var channel = Channel.CreateUnbounded<int>();
- var reader = channel.Reader;
- for (int i = 0; i < 10; i++)
- {
- await channel.Writer.WriteAsync(i + 1);
- }
- while (await reader.WaitToReadAsync())
- {
- if (reader.TryRead(out var number))
- {
- Console.WriteLine(number);
- }
- }
- }
- }
可以看到,控制臺(tái)中輸出了數(shù)字 1-10,這些數(shù)字正是 Writer 寫入到 channel 中的,對(duì)吧。
總的來(lái)說(shuō),要想使用 生產(chǎn)者-消費(fèi)者 場(chǎng)景,有幾種實(shí)現(xiàn)途徑,比如:BlockingCollection 和 TPL Dataflow,但本篇介紹的 Channels 要比前面的兩種性能更高,關(guān)于 Channels 更多的細(xì)節(jié),我會(huì)在未來(lái)的文章中進(jìn)行討論,如果您現(xiàn)在想急于了解的話,可以參考MSDN:https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
譯文鏈接:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html