如何在 C# 8 中使用 Channels
在面對 ?生產者-消費者 的場景下, netcore 提供了一個新的命名空間 System.Threading.Channels 來幫助我們更高效的處理此類問題,有了這個 Channels 存在, 生產者 和 消費者 可以各自處理自己的任務而不相互干擾,有利于兩方的并發處理,這篇文章我們就來討論下如何使用 System.Threading.Channels。
Dataflow vs Channel
在 System.Threading.Tasks.Dataflow 命名空間下提供了一個數據流庫,主要封裝了 存儲 和 處理 兩大塊,該庫專注于 pipeline 處理,而 System.Threading.Tasks.Channels 主要專注于 存儲 這塊,從單一職責上來說,在 生產者-消費者 場景下,Channels 比 Dataflow 性能要高得多。
為什么要使用 Channels
可以利用 Channels 來實現 生產者和消費者 之間的解耦,大體上有兩個好處:
- 生產者 和 消費者 是相互獨立的,兩者可以并行執行。 
- 如果生產者不給力,可以創建多個的生產者,如果消費者不給力,可以創建更多的消費者。 
總的來說,在 生產者-消費者 模式下可以幫助我們提高應用程序的吞吐率。
安裝 System.Threading.Channels
要想使用 Channel,需要用 nuget 引用 System.Threading.Channels 包,還可以通過 Visual Studio 2019 的 NuGet package manager 可視化界面安裝 或者 通過 NuGet package manager 命令行工具輸入以下命令:
dotnet?add?package?System.Threading.Channels創建 channel
本質上來說,你可以創建兩種類型的 channel,一種是有限容量的 bound channel,一種是無限容量的 unbound channel,接下來的問題是,如何創建呢?Channels 提供了兩種 工廠方法 用于創建,如下代碼所示:
- CreateBounded<T> 創建的 channel 是一個有消息上限的通道。 
- CreateUnbounded<T> 創建的 channel 是一個無消息上限的通道。 
下面的代碼片段展示了如何創建 unbounded channel,并且只能存放 string 類型。
static?void?Main(string[]?args){var?channel?=?Channel.CreateUnbounded<string>();}對了,Bounded channel 還提供了一個 FullMode 屬性,用于指定當 channel 已滿時該如何對插入的 message 進行處理,通常有四種做法。
- Wait 
- DropWrite 
- DropNewest 
- DropOldest 
下面的代碼片段展示了如何在 Bounded channel 上使用 FullMode。
static?void?Main(string[]?args){var?channel?=?Channel.CreateBounded<string>(new?BoundedChannelOptions(1000){FullMode?=?BoundedChannelFullMode.Wait});}將消息寫入到 channel
要想將 message 寫入到 channel,可以使用 WriteAsync() 方法,如下代碼所示:
static?async?Task?Main(string[]?args){var?channel?=?Channel.CreateBounded<string>(new?BoundedChannelOptions(1000){FullMode?=?BoundedChannelFullMode.Wait});await?channel.Writer.WriteAsync("Hello?World!");}從 channel 中讀取消息
要想從 channel 中讀取 message,可以使用 ReadAsync(),如下代碼所示:
static?async?Task?Main(string[]?args){var?channel?=?Channel.CreateBounded<string>(new?BoundedChannelOptions(1000){FullMode?=?BoundedChannelFullMode.Wait});while?(await?channel.Reader.WaitToReadAsync()){if?(channel.Reader.TryRead(out?var?message)){Console.WriteLine(message);}}}System.Threading.Channels 例子
下面是完整的代碼清單,展示了如何從 channel 中讀寫 message。
class?Program{static?async?Task?Main(string[]?args){await?SingleProducerSingleConsumer();Console.ReadKey();}public?static?async?Task?SingleProducerSingleConsumer(){var?channel?=?Channel.CreateUnbounded<int>();var?reader?=?channel.Reader;for?(int?i?=?0;?i?<?10;?i++){await?channel.Writer.WriteAsync(i?+?1);}while?(await?reader.WaitToReadAsync()){if?(reader.TryRead(out?var?number)){Console.WriteLine(number);}}}}可以看到,控制臺中輸出了數字 1-10,這些數字正是 Writer 寫入到 channel 中的,對吧。
總的來說,要想使用 生產者-消費者 場景,有幾種實現途徑,比如:BlockingCollection 和 TPL Dataflow,但本篇介紹的 Channels 要比前面的兩種性能更高,關于 Channels 更多的細節,我會在未來的文章中進行討論,如果您現在想急于了解的話,可以參考MSDN:https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
譯文鏈接:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html
總結
以上是生活随笔為你收集整理的如何在 C# 8 中使用 Channels的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 巧用Dictionary实现日志数据批量
- 下一篇: 当 .NET 5 遇上OpenTelem
