京东开源asyncTool之线程编排
一,并行常見的場景
1, 客戶端請求服務端接口,該接口需要調用其他N個微服務的接口
譬如 請求我的購物車,那么就需要去調用用戶的rpc、商品詳情的rpc、庫存rpc、優惠券等等好多個服務。同時,這些服務還有相互依賴關系,譬如必須先拿到商品id后,才能去庫存rpc服務請求庫存信息。 最終全部獲取完畢后,或超時了,就匯總結果,返回給客戶端。
2 ,并行執行N個任務,后續根據這1-N個任務的執行結果來決定是否繼續執行下一個任務
如用戶可以通過郵箱、手機號、用戶名登錄,登錄接口只有一個,那么當用戶發起登錄請求后,我們需要并行根據郵箱、手機號、用戶名來同時查數據庫,只要有一個成功了,都算成功,就可以繼續執行下一步。而不是先試郵箱能否成功、再試手機號……
再如某接口限制了每個批次的傳參數量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆并行去查詢,后續就是統計這5堆的查詢結果。就看你是否強制要求全部查成功,還是不管有幾堆查成功都給客戶做返回
再如某個接口,有5個前置任務需要處理。其中有3個是必須要執行完畢才能執行后續的,另外2個是非強制的,只要這3個執行完就可以進行下一步,到時另外2個如果成功了就有值,如果還沒執行完,就是默認值。
3, 需要進行線程隔離的多批次任務
如多組任務, 各組任務之間彼此不相關,每組都需要一個獨立的線程池,每組都是獨立的一套執行單元的組合。有點類似于hystrix的線程池隔離策略。
4 ,單機工作流任務編排
5 ,其他有順序編排的需求
二,并行場景之核心任意編排
1 多個執行單元的串行請求
2 多個執行單元的并行請求
3 阻塞等待,串行的后面跟多個并行
4 阻塞等待,多個并行的執行完畢后才執行某個
5 串并行相互依賴
6 復雜場景
三,并行場景可能存在的需求
1,每個執行結果的回調
傳統的Future、CompleteableFuture一定程度上可以完成任務編排,并可以把結果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執行單元的回調。譬如A執行完畢成功了,后面是B,我希望A在執行完后就有個回調結果,方便我監控當前的執行狀況,或者打個日志什么的。失敗了,我也可以記錄個異常信息什么的。
此時,CompleteableFuture就無能為力了。
我的框架提供了這樣的回調功能。并且,如果執行異常、超時,可以在定義這個執行單元時就設定默認值。
2,執行順序的強依賴和弱依賴
如上圖的3,A和B并發執行,最后是C。
有些場景下,我們希望A和B都執行完畢后,才能執行C,CompletableFuture里有個allOf(futures...).then()方法可以做到。
有些場景下,我們希望A或者B任何一個執行完畢,就執行C,CompletableFuture里有個anyOf(futures...).then()方法可以做到。
我的框架同樣提供了類似的功能,通過設定wrapper里的addDepend依賴時,可以指定依賴的任務是否must執行完畢。如果依賴的是must要執行的,那么就一定會等待所有的must依賴項全執行完畢,才執行自己。
如果依賴的都不是must,那么就可以任意一個依賴項執行完畢,就可以執行自己了。
注意:這個依賴關系是有必須和非必須之分的,還有一個重要的東西是執行單元不能重復執行。譬如圖4,如果B執行完畢,然后執行了A,此時C終于執行完了,然后也到了A,此時就會發現A已經在執行,或者已經完畢(失敗),那么就不應該再重復執行A。
還有一種場景,如下圖,A和D并行開始,D先執行完了,開始執行Result任務,此時B和C都還沒開始,然后Result執行完了,雖然B和C都還沒執行,但是已經沒必要執行了。B和C這些任務是可以被跳過的,跳過的原則是他們的NextWrapper已經有結果了或者已經在執行了。我提供了checkNextWrapperResult方法來控制,當后面的任務已經執行了,自己還要不要執行的邏輯控制。當然,這個控制僅限于nextWrapper只有一個時才成立。
3,依賴上游的執行結果作為入參
譬如A-B-C三個執行單元,A的入參是String,出參是int,B呢它需要用A的結果作為自己的入參。也就是說A、B并不是獨立的,而是有結果依賴關系的。
在A執行完畢之前,B是取不到結果的,只是知道A的結果類型。
那么,我的框架也支持這樣的場景。可以在編排時,就取A的結果包裝類,作為B的入參。雖然此時尚未執行,必然是空,但可以保證A執行完畢后,B的入參會被賦值。
在V1.3后,框架支持在worker的action的入參Map<String, WorkerWrapper>中獲取任意一個執行單元的執行結果,當然,可以取其中的1個、多個執行結果作為自己的入參。Key就是在定義wrapper時通過id傳進來的唯一id標識。詳情demo可以查看test包下dependnew包案例。
4,全組任務的超時
一組任務,雖然內部的各個執行單元的時間不可控,但是我可以控制全組的執行時間不超過某個值。通過設置timeOut,來控制全組的執行閾值。
5,高性能、低線程數
該框架全程無鎖,不依靠線程鎖來保證順序。
創建線程量少。?如這樣的,A會運行在B、C執行更慢的那個單元的線程上,而不會額外創建線程。
四,asyncTool特點
解決任意的多線程并行、串行、阻塞、依賴、回調的并發框架,可以任意組合各線程的執行順序,帶全鏈路回調和超時控制。
其中的A、B、C分別是一個最小執行單元(worker),可以是一段耗時代碼、一次Rpc調用等,不局限于你做什么。
該框架可以將這些worker,按照你想要的各種執行順序,加以組合編排。最終得到結果。
并且,該框架?為每一個worker都提供了執行結果的回調和執行失敗后自定義默認值?。譬如A執行完畢后,A的監聽器會收到回調,帶著A的執行結果(成功、超時、異常)。
根據你的需求,將各個執行單元組合完畢后,開始在主線程執行并阻塞,直到最后一個執行完畢。并且?可以設置全組的超時時間?。
該框架支持后面的執行單元以前面的執行單元的結果為自己的入參?。譬如你的執行單元B的入參是ResultA,ResultA就是A的執行結果,那也可以支持。在編排時,就可以預先設定B或C的入參為A的result,即便此時A尚未開始執行。當A執行完畢后,自然會把結果傳遞到B的入參去。
更多結果查看以下地址
開源地址:https://gitee.com/jd-platform-opensource/asyncTool
同步地址:https://gitee.com/g7go/asyncTool
總結
以上是生活随笔為你收集整理的京东开源asyncTool之线程编排的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FlexRay总线原理及应用
- 下一篇: HDU2604