flink checkpoint 恢复_Flink断点恢复机制
作為流式計算,Flink通過checkpoint機制和kafka的可回溯性來保證作業在failover時不丟失狀態。
作為生產環境的flink,我們期待做到快速failover、彈性擴縮容和平滑遷移,盡量做到用戶無感知和變更方便,從而讓用戶將更多精力放在功能實現上。
本文先介紹checkpoint機制,接著介紹flink如何rescale,最后再介紹下代碼流程。
注:下文中統一以task表示算子的一個并發,比如并發度為3的map算子包含3個并行的task,下文中出現的subtask語義上等同于task。
Checkpoint機制
flink快照從源頭開始觸發(記錄消費的offset),通過barrier來標記本次快照,如圖1的offset。
barrier流過的地方都會將state保存到共享存儲中,如圖2中的sum。
當barrier流到Sink時,所有算子都完成快照,本次作業快照也就完成,如圖3(Sink算子無狀態)。
一旦作業失敗重啟,會將state都恢復到各個算子中,同時從記錄的offset開始消費,確保從上一次快照的地方恢復作業。我們可以看到,Task中的狀態為21(1+2+3+4+5+6),下一次累加的數據為7,保證了flink內部狀態的一致性。
以上為最簡單的情況,實際情況可能包含多個快照、多個算子、迭代等復雜場景。
當多個快照時,flink通過barrier將數據分段,每個barrier都標記著一個checkpointID,如下圖所示:
當一個task有多個輸入時,必須等待上游所有的barrier都到達后,才能做快照。如果一個上游的barrier已經達到,想要做到exactly-once,需要先把之后到達的數據緩存下來,等做完快照再處理。
具體的實現原理可以參考論文:State Management in Apache Flink
Rescale原理
當作業的并發度改變時,flink會重新分配狀態。這里采取的partition策略是固定總partition個數,當task并發改變時,重新計算并將partition分配到每個task上。除了這種partition策略,還存在根據partition大小自動合并和拆分的策略,比如Hbase所使用的。
在flink中,一個key group就是一個partition。之所以選擇以key group為基本單位來操作狀態,是為了減少磁盤訪問IO和隨機讀寫(如果以key為單位就會出現這種情況,比如恢復時每個task都需要讀取全部的state來決定每個key是否屬于自己)。
flink中有兩種狀態,包括operator state和key state。operator state是以task為單位的,一般采用list的形式存儲,當重新rescale時每個task可以選擇接受全部的operator state或者按照list平分。key state時以key為單位的,必須在keyby時候才能使用這種狀態。
下面以kafka offset作為operator state作為介紹。每個task都會以list的形式記錄自己負責的<partitionId, offset>,當做快照的時候,將狀態保存在共享存儲,所有task的list state會拼接成一個大的list。當重新rescale的時候,flink將list中的元素平分給每個task。(實際的flink kafka consumer是通過union方式獲取所有list,然后再選擇屬于自己的)
下面簡單介紹下key state的rescale。key group的個數等于作業的最大并發(一旦設置不可改變,即key group的個數必須大于等于task的并發度),每個key通過hash映射屬于其中一個key group。比如下圖,共有10個KG,KG-1包含1-11的key。
當作業rescale的時候,會將list形式的KG平分到每個task。
上圖中最下面給出了key->KG→task的映射過程:
- 計算key的哈希值。
- 根據哈希值和最大并發確定key所屬KG。
- 根據key所屬KG來確定發到下游哪個并發的task。
對應KeyGroupRangeAssignment代碼如下:
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}上游的計算在KeyGroupStreamPartitioner類里,下游的計算在KeyGroupPartitioner類里。
代碼流程
這里的代碼以flink-1.10.0作為參考。
關鍵類
CheckpointCoordinator
這個是位于Master節點的快照控制中心,負責定期的觸發checkpoint和手動觸發savepoint,維護在做和已完成的快照。
StateAssignmentOperation
這個是位于Master節點的作業恢復時負責rescale的類,主要是根據新作業的并發重新分配狀態。針對operator state,主要采用broadcast的方式使得每個task都能接觸算子全部的狀態;針對key state,采用均分KG的方法來重新劃分state的歸屬。
TaskStateManagerImpl
具體Task的狀態管理中心,包括和JobMaster做checkpoint的交互,管理本地狀態。
StreamTaskStateInitializerImpl
具體task的狀態恢復,這里也是各個statebackend開始創建的地方。
RocksDBKeyedStateBackend
具體task通過rocksdb做key state的地方。使用這種backend,每個狀態是一個cf,主鍵的組織形式為<KG, key, namespace>。支持增量快照和全量快照。
HeapKeyedStateBackend
具體task通過rocksdb做key state的地方。使用這種backend,底層通過使用CopyOnWriteStateMap來存儲,主鍵的組織形式為<NS, K, SV>。相比rocksdb,內存的存取速度都非常快,但是狀態大小受制于內存。
如何制作快照
- CheckpointCoordinator::triggerCheckpoint()。這個是checkpoint和savepoint共同的入口函數,checkpoint是通過定時調度來做的,savepoint則需要人工觸發。這里頭會做一些控制檢查,沒有問題的話就會向source task發送制作快照通知。
- Execution::triggerCheckpointHelper()。通知source task對應的節點做快照。
- TaskExecutor::triggerCheckpoint()。快照通知到TaskManager。
- Task::triggerCheckpointBarrier()。進到具體task里。
- StreamTask::performCheckpoint()。新版本采用了mailBox模型來解決持鎖競爭問題。這里會首先下發barrier,然后開始本地快照。
- CheckpointingOperation::executeCheckpointing()。進行同步快照(checkpointStreamOperator方法)和異步快照(AsyncCheckpointRunnable類)。
- StreamOperator::snapshotState() → AbstractStreamOperator::snapshotState()。同步快照的制作,主要保存KeyedStateRaw、OperatorStateRaw、OperatorStateManaged、KeyedStateManaged等狀態。
- AsyncCheckpointRunnable::run()。異步快照的制作。
- CheckpointResponder::acknowledgeCheckpoint()??煺兆鐾旰髤R報給主節點。
具體的快照制作,取決于所選statebackend,這里不再詳述,可以參考RocksDBKeyedStateBackend和HeapKeyedStateBackend。
針對非source節點,需要上游的barrier對齊后才能觸發快照,這點跟source task略有不同,如下所示:
- CheckpointedInputGate::pollNext()。從輸入里頭獲取barrier。
- CheckpointBarrierAligner::processBarrier()。處理barrier,負責exactly-once快照的處理。另一個類似類CheckpointBarrierTracker則負責at-least-once快照的處理。
- CheckpointBarrierAligner::notifyCheckpoint()。如果barrier都到齊了,那么開始制作快照。
- StreamTask::triggerCheckpointOnBarrier()。進到task里,之后的流程就如上面所述了。
如何恢復快照
Master端的分配
主要的狀態分配邏輯都在類StateAssignmentOperation里。這里先明確幾個概念:
- ExecutionJobVertex,表示一個邏輯上的執行節點,可能是好幾個operator通過chain連到一起的。
- Execution,對應ExecutionJobVertex的一個并發執行,也是由好幾個operator通過chain連到一起的。
- OperatorState,表示一個operator的所有并發的狀態。
- OperatorID,一個operator的唯一標識。
- KeyGroupRange,表示一個subtask所負責的KG范圍。
- 狀態。包括ManagedOperatorStates、RawOperatorStates、ManagedKeyedState和RawKeyedState。
- TaskStateSnapshot,一個Execution所有operator的狀態。
這里的處理邏輯是,對ExecutionJobVertex的所有operator做狀態分配,對operator的所有subtask做狀態分配。基本流程如下:
- 檢查并發是否符合要求,主要是確保設置并發不要超過最大并發等。
- 計算每個subtask負責的KeyGroupRange,下面根據這個標準來分配KG。
- 重新分配operatorState,主要在reDistributePartitionableStates里實現,這里頭對unionState進行了合并,按照round-bin的方式來分配list的state。
- 重新分配keyedState,主要在reDistributeKeyedStates里實現,這里頭會具體到subtask里,從之前的快照里找到所有屬于它的stateHandler。
- 將分配好的狀態賦值給ExecutionJobVertices。這里會以Execution為基本單位,設置它的JobManagerTaskRestore(由多個operator的狀態組成)。
Task端的恢復
當狀態都分配好了之后,在Task端就可以進行狀態恢復了。大概流程如下:
- TaskStateManagerImpl::prioritizedOperatorState() ,將對應operator的狀態(OperatorSubtaskState)拿出來,最后存到PrioritizedOperatorSubtaskState里。
- StreamTaskStateInitializerImpl::streamOperatorStateContext,初始化keyedStatedBackend、operatorStateBackend、timeServiceManager等過程。
- BackendRestorerProcedure::createAndRestore(),這個是在初始化keyedStatedBackend的時候調用的,將狀態保存到了keyedStatedBackend中。
- RestoreOperation::restore(),針對歷史狀態進行恢復。這是個抽象函數,比如一個具體的實現是RocksDBFullRestoreOperation::restore()。
- 之后根據具體的stateHandle進行恢復。
總結
以上是生活随笔為你收集整理的flink checkpoint 恢复_Flink断点恢复机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 启动资金什么意思
- 下一篇: bootstrap 空行不显示横杠_电脑