Flink 必知必会:Flink Runtime Architecture
作者:朱翥(長耕)
本文由 Apache Flink PMC 及 Committer 朱翥分享,主要介紹 Flink Runtime 的底層架構。本篇文章包含四部分:
一、Runtime 總覽
眾所周知 Flink 是分布式的數據處理框架,用戶的業務邏輯會以Job的形式提交給 Flink 集群。Flink Runtime作為 Flink 引擎,負責讓這些作業能夠跑起來并正常完結。這些作業既可以是流計算作業,也可以是批處理作業,既可以跑在裸機上,也可以在Flink集群上跑,Flink Runtime必須支持所有類型的作業,以及不同條件下運行的作業。
1.作業的表達
要執行作業,首先要理解作業是如何在 Flink 中進行表達的。
用戶通過API的方式寫一個作業,例如上圖左側StreamWordInput的示例,它可以不斷的輸出一個個單詞;下面的Map操作負責把單詞映射成一個二元組;再接一個keyBy,使相同的word的二元組都被分配在一起,然后sum將它們計數,最后打印出來。
左側的作業對應著右邊的邏輯拓撲(StreamGraph)。這個拓撲中有4個節點,分別是source、map、sum和print。這些是數據處理邏輯,又稱之為算子;節點之間的線條對應著數據的分發方式,影響著數據以什么樣的方式分發給下游。舉例來說,map到sum之間是keyBy,意味著map產出的數據,同一個key的數據都必須分發到同一個下游。
有了StreamGraph后, Flink Runtime會進一步的把它翻譯成JobGraph。JobGraph和StreamGraph的區別是,JobGraph會把一些節點 chain 起來,形成Operator chain。Chain條件是需要兩個算子的并發度是一樣的,并且它們的數據交換方式是一對一的。 形成的Operator chain,又稱為JobVertex。
Operator chain的意義是能夠減少一些不必要的數據交換,這樣chain的operator都是在同一個地方進行執行。在作業實際執行過程中,邏輯圖會進一步被翻譯成執行圖 — ExecutionGraph。執行圖是邏輯圖并發層面的視圖,如上圖所示,下面的執行圖就是上面邏輯圖所有算子并發都為2的表達。
為什么上圖中的map和sum不能嵌起來?因為它們的數據是涉及到多個下游算子的,并非一對一的數據交換方式。邏輯圖JobVertex中的一個節點,會對應著并發數個執行節點ExecutionVertex,節點對應著一個個任務,這些任務最后會作為實體部署到Worker節點上,并執行實際的數據處理業務邏輯。
2.分布式架構
Flink 作為分布式數據處理框架,它有一套分布式的架構,主要分為三塊:Client、Master和Worker節點。
Master是 Flink 集群的主控中心,它可以有一個到多個JobMaster,每個JobMaster對應一個作業,而這些JobMaster由一個叫Dispatcher的控件統一管理。Master節點中還有一個ResourceManager進行資源管理。ResourceManager管理著所有Worker節點,它同時服務于所有作業。此外Master節點中還有一個Rest Server,它會用于響應各種Client端來的Rest請求,Client端包括Web端以及命令行的客戶端,它可以發起的請求包括提交作業、查詢作業的狀態和停止作業等等。作業會通過執行圖被劃分成一個個的任務,這些任務最后都會在Worker節點中進行執行。Worker就是TaskExecutor,它們是任務執行的容器。
作業執行的核心組件有三個,分別是JobMaster、TaskExecutor和ResourceManager:JobMaster用于管理作業;TaskExecutor用于執行各個任務;ResourceManager用于管理資源,并服務于JobMaster的資源請求。
二、JobMaster:作業的控制中心
JobMaster的主要職責包括作業生命周期的管理、任務的調度、出錯恢復、狀態查詢和分布式狀態快照。
分布式狀態快照包括Checkpoint和Savepoint,其中Checkpoint主要是為出錯恢復服務的,而Savepoint主要是用于作業的維護,包括升級和遷移等等。分布式快照是由CheckpointCoordinator組件來進行觸發和管理的。
JobMaster中的核心組件是Scheduler,無論是作業的生命周期管理、作業的狀態維護,還是任務的調度以及出錯恢復,都是由Schedule來負責的。
1.作業的生命周期管理
作業的生命周期的狀態,作業所有可能的狀態遷移都在下圖展示出來了。
正常流程下作業會有三種狀態,分別是Created、Running和Finished。一個作業開始是處于Created的狀態,當這個作業被開始調度就開始進入Running狀態并開始調度任務,等到所有的任務都成功結束了,這個作業就走到Finished的狀態,并匯報最終結果,然后退出。
然而,一個作業在執行過程中可能會遇到一些問題,因此作業也會有異常處理的狀態。作業執行過程中如果出現作業級別錯誤,整個作業會進到Failing狀態,然后Cancel所有任務。等到所有任務都進入最終狀態后,包括 Failed、Canceled、Finished,再去check出錯的異常。如果異常是不可恢復的,那么整個作業會走到Failed狀態并退出。如果異常是可恢復的,那么會走到Restarting狀態,來嘗試重啟。如果重啟的次數沒有超過上限,作業會從Created狀態重新進行調度;如果達到上限,作業會走到Failed狀態并退出。 (注:在 Flink 1.10 之后的版本中,當發生錯誤時,如果可以恢復,作業不會進入 Failing 狀態而會直接進入 Restarting 狀態,當所有任務都恢復正常后,作業會回到 Running 狀態。如果作業無法恢復,則作業會經由 Failing 狀態最終進入 Failed 狀態并結束。)
Cancelling和Canceled兩種狀態只會在用戶手動去Cancel作業的時候走到。當用戶手動的在Web UI或通過 Flink command探索作業的時候, Flink 會首先把狀態轉到Cancel里,然后Cancel所有任務,等所有任務都進入最終狀態后,整個作業就會進入Canceled狀態并退出。
Suspended狀態只會在配置了high availability,并且當JobMaster丟掉leadership才會走到。這個狀態只意味著這個JobMaster出現問題終止了。一般來說等到JobMaster重新拿到leadership之后,或是另外有一個standby Master拿到leadership之后,會在拿到leadership的節點上重新啟動起來。
2.任務調度
任務調度是JobMaster的核心職責之一。要調度任務,一個首要的問題就是決定什么時候去調度任務。任務調度時機是由調度策略(SchedulingStrategy)來控制的。這個策略是一個事件驅動的組件,它監聽的事件包括:作業開始調度、任務的狀態發生變化、任務產出的數據變成可消費以及失敗的任務需要重啟,通過監聽這些事件,它能夠比較靈活地來決定任務啟動的時機。
目前我們有多種不同的調度策略,分別是Eager和Lazy from sources。EagerSchedulingStrategy主要是服務于流式作業,它的策略是在作業開始調度時,直接啟動所有的任務,這樣做的好處是可以降低調度時間。Lazy from sources主要服務于批處理作業。它的策略是作業一開始只調度Source節點,等到有任意節點的輸入數據可以被消費后,它才會被調起來。如下圖所示,source節點的數據開始產出后,agg節點才能被調起來,agg節點結束后,sink節點才能被調起來。
為什么Batch作業和Streaming作業會有不同的調度策略呢?是因為Batch作業里邊存在blocking shuffle數據交換模式。在這種模式下,需要等上游完全產出所有數據后,下游才能去消費這部分數據集,如果預先把下游調起來的話,它只會在那空轉浪費資源。相比Eager策略而言,對于批處理作業它能夠節省一定量的資源。
目前還有一個正在開發中的叫Pipelined region based調度策略,這個策略比較類似于Lazy from source策略,差異在于前者是以Pipelined region為粒度調度任務的。
Pipelined region 是以 pipelined 相連的任務集合。Pipelined邊意味著上下游節點會流式的進行數據交換,即上游邊寫,下游就邊讀邊消費。Pipelined region調度的好處是可以一定程度上繼承了Eager調度好處,能夠節省調度花費的時間,且讓上下游任務并行起來。同時也保留了Lazy from sources避免不必要資源的浪費。通過把一部分Task整體調度,就能知道這部分需要同時運行的作業所需的資源量是多少,能夠以此進行一些更深度的優化。
(注:從 Flink 1.11 開始, Pipelined region strategy成為默認調度策略,同時服務于流和批作業。)
3.任務調度的過程
任務具有很多種不同狀態,最初任務處在Created狀態。當調度策略認為這個任務可以開始被調的時候,它會轉到Scheduled狀態,并開始申請資源,即Slot。申請到Slot之后,它就轉到Deploying狀態來生成Task的描述,并部署到worker節點上,再之后Task就會在worker節點上啟動起來。成功啟動后,它會在worker節點上轉到running狀態并通知JobMaster,然后在JobMaster端把任務的狀態轉到running。
對于無限流的作業來說,轉到running狀態就是最終狀態了;對于有限流的作業,一旦所有數據處理完了,任務還會轉到finished狀態,標志任務完成。當有異常發生時,任務也會轉到Failed的狀態,同時其它受到影響的任務可能會被Cancel掉并走到Canceled狀態。
4.出錯恢復
當有任務出現錯誤時,JobMaster的策略或基本思路是,通過重啟出錯失敗的任務以及可能受到影響的任務,來恢復作業的數據處理。這包含三個步驟:
- 第一步,停止相關任務,包括出錯失敗任務和可能受其影響任務,失敗任務可能已經是FAILED的,然后其它受影響任務會被Cancel最終進到Canceled狀態;
- 第二步,重置任務回Created狀態;
- 第三步,通知調度策略重新調度這些任務。
上文提及了可能受到影響的任務,那么什么樣的任務可能受影響呢?這是由出錯恢復策略(FailoverStrategy)來決定的。
目前 Flink 默認的 FailoverStrategy是RestartPipelinedRegionFailoverStrategy。采用了這個策略后,如果一個Task失敗了就會重啟它所在的region。這其實跟上文提及的Pipelined數據交換有關系。在Pipelined數據交換的節點之間,如果任意一個節點失敗了,其相關聯的其它節點也會跟著失敗。這是為了防止出現數據的不一致。因此為了避免單個Task導致多次Failover,一般的操作是在收到第一個Task failed時,就把其他的一起cancel掉,再一起重啟。
RestartPipelinedRegion策略除了重啟失敗任務所在的Region外,還會重啟它的下游Region。原因是任務的產出很多時候是非確定性的,比如說一個record,分發到下游的第一個并發,重跑一次;分發到下游的第二個并發時,一旦這兩個下游在不同region中,就可能會導致 record丟失,甚至產生不一樣的數據。為了避免這種情況,采用PipelinedRegionFailoverStrategy會重啟失敗任務所在的Region以及它的所有的下游Region。
另外,還有一個RestartAllFailoverStrategy策略,它會在任意Task fail的時候,重啟作業中的所有任務。一般情況,這個策略并不被經常用到,但是在一些特殊情況下,比如當任務失敗,用戶不希望局部運行而是希望所有任務都結束并整體進行恢復,可以用這個策略。
三、TaskExecutor:任務的運行器
TaskExecutor是任務的運行器,為了運行任務它具有各種各樣的資源。如下圖所示,這里主要介紹memory的資源。
所有內存資源都是可以單獨配置的。TaskManager也對它們的配置進行了分層的管理,最外層是Process Memory,對應的是整個TaskExecutor JVM 的總資源。這份內存又包含了JVM自身占有的內存以及 Flink 占有內存。而 Flink 占用內存又包含了框架占有的內存和任務的內存。
任務占用內存包括了Task Heap Memory,即任務的Java對象占有的內存;Task Off-Heap Memory一般用于 native的第三方庫;Network Memory是用來創建Network Buffer用來服務于任務的輸入和輸出; Managed Memory則是受管控的 Off-Heap Memory,它會被一些組件用到,比如算子和StateBackend。這些Task資源會被它分成一個一個的Slot,Slot是任務運行的邏輯容器。當前,Slot大小是直接把 整個TasExecutor的資源,按照 Slot的數量進行均分得到的。
一個Slot里可以運行一個到多個任務,但是有一定約束,即同一個共享組中的不同類型的任務才可以同時在一個Slot中運行。一般來說,同一個PipelinedRegion中的任務都是在一個共享組中,流式作業的所有任務也都是在一個共享組中。不同類型指的是它們需要屬于不同的JobVertex。
如上圖右側示例,這是一個source、map、sink的作業??梢钥吹讲渴鸷?#xff0c;有三個Slot中都有三個任務,分別是source、map、sum各一份。而有一個slot中只有兩個任務,就是因為source只有三個并發,沒有更多并發可以部署進來。
進行SlotSharing第一個好處是,可以降低數據交換的開銷。像map、sink之間是一對一的數據交換,實際上有物理數據交換的這些節點都被共享在了一塊,這樣可以使得它們的數據交換在內存中進行,比在網絡中進行的開銷更低。
第二個好處是,方便用戶配置資源。通過SlotSharing,用戶只需要配置 n個Slot就可以保證一個sum作業總能跑起來。n是最大算子的并發度。
第三個好處是,在各個算子并發度差異不大的情況下,提高負載均衡。這是因為每個Slot里邊會有各種不同類型的算子各一份,這就避免某些負載重的算子全擠在同一個TaskExecutor中。
1.任務的執行模型
上文提到每個任務對應著一個OperatorChain。一般來說每個OperatorChain都有自己的輸入和輸出,輸入是InputGate,輸出是ResultPartition。這些任務總體會在一個獨占的線程中執行,任務從InputGate中讀取數據,將它喂給OperatorChain,OperatorChain進行業務邏輯的處理,最后會將產出的數據輸出到ResultPartition中。
這地方有一個例外是Source task,它不從InputGate中讀取數據,而直接通過SourceFunction方式來產出數據。上游的ResultPartition和下游的InputGate 會通過 Flink 的ShuffleService進行數據交換。ShuffleService是一個插件,目前 Flink 默認是NettyShuffleService,下游的InputGate會通過 Netty來從上游的ResultPartition中獲取數據。
ResultPartition是由一個個的SubPartition組成的,每個SubPartition都對應著一個下游消費者并發。InputGate也是由一個個的InputChannel組成的,每個不同的InputChannel都對應著一個上游并發。
四、ResouceManager:資源的管理中心
ResourceManager是 Flink 的資源管理中心。在前面我們有提到過TaskExecutor包含了各種各樣的資源。而ResourceManager,就管理著這些TaskExecutor。新啟動的TaskExecutor,需要向ResourceManager進行注冊,之后它里邊的資源才能服務于作業的請求。
ResourceManager里邊有個關鍵組件叫做SlotManager,它管理著Slot的狀態。這些Slot狀態是通過TaskExecutor到ResourceManager之間的心跳跳來進行更新的,在心跳信息中包含了TaskExecutor中的所有Slot的狀態。有了當前所有的Slot狀態之后,ResourceManager就可以服務于作業的資源申請。當JobMaster調度一個任務的時候,會向ResourceManager發起Slot請求。收到請求的ResourceManager會轉交給SlotManager,SlotManager會去檢查它里邊的可用的Slot有沒有符合請求條件的。如果有的話,它就會向相應的TaskExecutor發起Slot申請。如果請求成功,TaskExecutor會主動的向JobMaster offer這個 Slot。
之所以要這么繞一圈,是為了避免分布式帶來的不一致的問題。像剛才我們有提到,SlotManager中的Slot狀態是通過心跳來進行更新的,所以存在一定的延遲。此外在整個Slot申請過程中,Slot狀態也是可能發生變化的。所以最終我們需要以Slot offer,以及它的ACK來作為所有的申請的最終結果。
ResourceManager有多種不同的實現,Standalone模式下采用的ResourceManager,是StandaloneResourceManager,需要用戶手動拉起Worker節點,這樣就要求用戶要先了解這個作業會需要多少總資源。
除此之外,還有一些會去自動申請資源的ResourceManager,包括YarnResourceManager,MesosResourceManager和KubernetesResourceManager。采用這些ResourceManager后,在不能滿足的情況下,ResourceManager會在Slot的請求過程自動拉起Worker節點。
拿YarnResourceManager舉個例子,JobMaster去為某個任務請求一個Slot。YarnResourceManager將這個請求交給SlotManager,SlotManager發覺沒有Slot符合申請的話會告知YarnResourceManager,YarnResourceManager就會向真正的外部的YarnResourceManager去請求一個container,拿到container之后,它會啟動一個TaskExecutor,當TaskExecutor起來之后,它會注冊到 ResourceManager中,并去告知它可用的Slot信息。SlotManager拿到這個信息之后,就會嘗試去滿足當前pending的那些SlotRequest。如果能夠滿足,JobMaster就會去向TaskExecutor發起Slot請求,請求成功的話,TaskExecutor就會向JobMaster去offer這個Slot。這樣用戶就不需要在一開始去計算它作業的資源需求量是多少,而只需要保證單個Slot的大小,能夠滿足任務的執行了。
活動推薦:
僅需99元即可體驗阿里云基于 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接了解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
原文鏈接:https://developer.aliyun.com/article/782819?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Flink 必知必会:Flink Runtime Architecture的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 10倍性能提升!DLA SQL推出基于A
- 下一篇: DataWorks数据建模公开课上线啦!