TPL Dataflow .Net 数据流组件,了解一下?
作為單體程序,依賴的第三方服務雖不多,但是2C的程序還是有不少內容可講;作為一個常規互聯網系統,無外乎就是接受請求、處理請求,輸出響應。
由于業務漸漸增長,數據處理的過程會越來越復雜和冗長,【連貫高效的處理數據】 越來越被看重,? .Net 提供了TPL? Dataflow組件使我們更高效的實現基于數據流和 流水線操作的代碼。
? ? 下圖是單體程序中 數據處理的用例圖。
?
?程序中用到的TPL Dataflow 組件,Dataflow是微軟前幾年給出的數據處理庫,?是由不同的處理塊組成,可將這些塊組裝成一個處理管道,"塊"對應處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。
TPL Dataflow庫為消息傳遞和并行化CPU密集型和I / O密集型應用程序提供了編程基礎,這些應用程序具有高吞吐量和低延遲。它還可以讓您明確控制數據的緩沖方式并在系統中移動。
為了更好地理解數據流編程模型,請考慮從磁盤異步加載圖像并創建這些圖像的應用程序。
? 傳統的編程模型通常使用回調和同步對象(如鎖)來協調任務和訪問共享數據,?從宏觀看傳統模型:任務是一步步緊接著完成的。
? 通過使用數據流編程模型,您可以創建在從磁盤讀取圖像時處理圖像的數據流對象。在數據流模型下,您可以聲明數據在可用時的處理方式以及數據之間的依賴關系。由于運行時管理數據之間的依賴關系,因此通常可以避免同步訪問共享數據的要求。此外,由于運行時調度基于數據的異步到達而工作,因此數據流可以通過有效地管理底層線程來提高響應性和吞吐量。 ? 也就是說:?你定義的是任務內容和任務之間的依賴,不關注數據什么時候流到這個任務?。
? ?需要注意的是:TPL Dataflow 非分布式數據流,消息在進程內傳遞, ??使用nuget引用 System.Threading.Tasks.Dataflow 包。
TPL Dataflow 核心概念
?1.? Buffer & Block
TPL Dataflow 內置的Block覆蓋了常見的應用場景,當然如果內置塊不能滿足你的要求,你也可以自定“塊”。
Block可以劃分為下面3類:
Buffering Only? ? 【Buffer不是緩存Cache的概念, 而是一個緩沖區的概念】
Execution
Grouping?
使用以上塊混搭處理管道, 大多數的塊都會執行一個操作,有些時候需要將消息分發到不同Block,這時可使用特殊類型的緩沖塊給管道“”分叉”。
2. Execution Block
可執行的塊有兩個核心組件:
輸入、輸出消息的緩沖區(一般稱為Input,Output隊列)
在消息上執行動作的委托
消息在輸入和輸出時能夠被緩沖:當Func委托的運行速度比輸入的消息速度慢時,后續消息將在到達時進行緩沖;當下一個塊的輸入緩沖區中沒有容量時,將在輸出時緩沖。
每個塊我們可以配置:
緩沖區的總容量, 默認無上限
執行操作委托的并發度, 默認情況下塊按照順序處理消息,一次一個。
我們將塊鏈接在一起形成一個處理管道,生產者將消息推向管道。
TPL Dataflow有一個基于pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機制。
TransformBlock(Execution category)-- 由輸入輸出緩沖區和一個Func<TInput, TOutput>委托組成,消費的每個消息,都會輸出另外一個,你可以使用這個Block去執行輸入消息的轉換,或者轉發輸出的消息到另外一個Block。
TransformManyBlock (Execution category) -- 由輸入輸出緩沖區和一個Func<TInput, IEnumerable<TOutput>>委托組成, 它為輸入的每個消息輸出一個 IEnumerable<TOutput>
BroadcastBlock (Buffering category)-- 由只容納1個消息的緩沖區和Func<T, T>委托組成。緩沖區被每個新傳入的消息所覆蓋,委托僅僅為了讓你控制怎樣克隆這個消息,不做消息轉換。
該塊可以鏈接到多個塊(管道的分叉),雖然它一次只緩沖一條消息,但它一定會在該消息被覆蓋之前將該消息轉發到鏈接塊(鏈接塊還有緩沖區)。
ActionBlock (Execution category)-- 由緩沖區和Action<T>委托組成,他們一般是管道的結尾,他們不再給其他塊轉發消息,他們只會處理輸入的消息。
BatchBlock (Grouping category)-- 告訴它你想要的每個批處理的大小,它將累積消息,直到它達到那個大小,然后將它作為一組消息轉發到下一個塊。
還有一下其他的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時不會深入。
3. Pipeline Chain React
當輸入緩沖區達到上限容量,為其供貨的上游塊的輸出緩沖區將開始填充,當輸出緩沖區已滿時,該塊必須暫停處理,直到緩沖區有空間,這意味著一個Block的處理瓶頸可能導致所有前面的塊的緩沖區被填滿。
但是不是所有的塊變滿時,都會暫停,BroadcastBlock 有允許1個消息的緩沖區,每個消息都會被覆蓋, 因此如果這個廣播塊不能將消息轉發到下游,則在下個消息到達的時候消息將丟失,這在某種意義上是一種限流(比較生硬).
編程實踐
? 將按照上圖實現TPL Dataflow?
①? 定義Dataflow? pipeline
上述程序在部署時就遇到相關的坑位,在測試環境_eqid2ModelTransformBlock 內Func委托穩定執行,程序并未出現異樣;
部署到生產之后, 該Pipeline會運行一段時間就停止工作,一直很困惑, 后來通過監測_eqid2ModelTransformBlock.Completion 屬性,該塊提前進入“完成態”? ?:???程序在執行某次Func委托時報錯,Block提前進入完成態
TransfomrBlock.Completion 一個Task對象,當TPL Dataflow不再處理消息并且能保證不再處理消息的時候,就被定義為完成態, Task對象的TaskStatus枚舉值將標記此Block進入完成態的真實原因
- TaskStatus.RanToCompletion? ? ? ?根據Block定義的任務成功完成
- TaskStatus.Fault? ? ? ? ? ? ? ? ? ? ? ? ? ? 因為未處理的異常?導致"過早的完成"
- TaskStatus.Cancled? ? ? ? ? ? ? ? ? ? ?? 因為取消操作?導致 "過早的完成"
我們需要小心處理異常, 一般情況下我們使用try、catch包含所有的執行代碼以確保所有的異常都被處理。
??? 可將TPL Dataflow 做為進程內消息隊列,本文只是一個入門參考,更多復雜用法還是看官網, 你需要記住的是, 這是一個.Net 進程內數據流組件, 能讓你專注于流程。
原文地址:https://www.cnblogs.com/JulianHuang/p/11177766.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結
以上是生活随笔為你收集整理的TPL Dataflow .Net 数据流组件,了解一下?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dotnet 新项目格式与对应框架预定义
- 下一篇: windows container 踩坑