Flink 容错机制:Checkpoints、Savepoints
文章目錄
- Checkpoints(檢查點)
 - 恢復流程
 - 生成策略
 
- Savepoints(保存點)
 
Checkpoints(檢查點)
Flink中基于異步輕量級的分布式快照技術提供了Checkpoints容錯機制,Checkpoints可以將同一時間點作業/算子的狀態數據全局統一快照處理,包括前面提到的算子狀態和鍵值分區狀態。當發生了故障后,Flink會將所有任務的狀態恢復至最后一次Checkpoint中的狀態,并從那里重新開始執行。
那么Checkpoints的生成策略是什么樣的呢?它會在什么時候進行快照的生成呢?
其實就是在所有任務都處理完同一個輸入數據流的時候,這時就會對當前全部任務的狀態進行一個拷貝,生成Checkpoints。
為了方便理解,這里先簡單的用一個樸素算法來解釋這一生成過程(Flink的Checkpoints算法實際要更加復雜,在下面會詳細講解)
恢復流程
為了方便進行實例的講解,假設當前有一個Source任務,負責從一個遞增的數字流(1、2、3、4……)中讀取數據,讀取到的數據會分為奇數流和偶數流,求和算子的兩個任務會分別對它們進行求和。在當前任務中,數據源算子的任務會將輸入流的當前偏移量存為狀態,求和算子的任務會將當前和存為狀態。
某流式應用的一致性檢查點如上圖,在當前生成的Checkpoints中保存的輸入偏移為5,偶數求和為6,奇數求和為9。
故障:任務sum_odd失敗假設在下一輪計算中,任務sum_odd計算出現了問題,任務sum_odd的時候產生了問題,導致結果出現錯誤。由于出現問題,為了防止從頭開始重復計算,此時會通過Checkpoints來進行快照的恢復。
Checkpoints恢復應用需要以下三個步驟
- 第一步我們需要先重啟整個應用,恢復到最原始的狀態。
 
- 緊接著從檢查點的快照信息中讀取出輸入源的偏移量以及算子計算的結果,進行狀態的恢復
 
- 狀態恢復完成后,繼續Checkpoints恢復的位置開始繼續處理。
 
從檢查點恢復后,它的內部狀態會和生成檢查點的時候完全一致,并且會緊接著重新處理那些從之前檢查點完成開始,到發生系統故障之間已經處理過的數據。雖然這意味著Flink會重復處理部分消息,但上述機制仍然可以實現精確一次的狀態一致性,因為所有的算子都會恢復到那些數據處理之前的時間點。
但這個機制仍然面臨一些問題,因為Checkpoints和恢復機制僅能重置應用內部的狀態,而應用所使用的Sink可能在恢復期間將結果向下游系統(如事件日志系統、文件系統或數據庫)重復發送多次。為了解決這個問題,對于某些存儲系統,Flink提供的Sink函數支持精確一次輸出(在檢查點完成后才會把寫出的記錄正式提交)。另一種方法則是適用于大多數存儲系統的冪等更新。
生成策略
Flink中的Checkpoints是基于Chandy-Lamport分布式快照算法實現的,該算法不會暫停整個應用,而是會將生成Checkpoints的過程和處理過程分離,這樣在部分任務持久化狀態的過程中,其他任務還可以繼續執行。
在介紹生成策略之前,首先需要介紹一下**Checkpoints barrier(屏障)**這一種特殊記錄。
barrir劃分Checkpoints如上圖,與水位線相同,Flink會在Source中間隔性地生成barrier,通過barrier把一條流上的數據劃分到不同的Checkpoints中,在barrier之前到來的數據導致的狀態更改,都會被包含在當前所屬的Checkpoints中;而基于barrier之后的數據導致的所有更改,就會被包含在之后的Checkpoints中。
擁有兩個有狀態的Source,兩個有狀態的任務,以及兩個無狀態Sink的流式應用- 假設當前有兩個Source任務,各自消費一個遞增的數字流(1、2、3、4……),讀取到的數據會分為奇數流和偶數流,求和算子的兩個任務會分別對它們進行求和,并將結果值更新至下游Sink。
 
- 此時JobManager向每一個Source任務發送一個新的Checkpoints編號,以此啟動Checkpoints生成流程。
 
- 在Source任務收到消息后,會暫停發出記錄,緊接著利用狀態后端生成本地狀態的Checkpoints,并把barrier連同編號廣播給所有傳出的數據流分區。
 - 狀態后端在狀態存入Checkpoints后通知Source任務,并向JobManager發送確認消息。
 - 在所有barrier發出后,Source將恢復正常工作。
 
- Source任務會廣播barrier至所有與之相連的任務,確保這些任務能從它們的每個輸入都收到一個barrier
 - 在等待過程中,對于barrier未到達的分區,數據會繼續正常處理。而barrier已經到達的分區,它們新到來的記錄會被緩沖起來,不能處理。這個等待所有barrier到來的過程被稱為barrier對齊
 
- 任務中收齊全部輸入分區發送的barrier后,就會通知狀態后端開始生成Checkpoints,同時繼續把Checkpoints barrier廣播轉發到下游相連的任務。
 
- 任務在發出所有的Checkpoints barrier后就會開始處理緩沖的記錄。等到所有緩沖記錄處理完后,任務就會繼續處理Source。
 
- Sink任務在收到分隔符后會依次進行barrier對齊,然后將自身狀態寫入Checkpoints,最終向JobManager發送確認信息。
 - JobManager在接收到所有任務返回的Checkpoints確認信息后,就說明此次Checkpoints生成結束。
 
Savepoints(保存點)
- 由于Cheakpoints是周期性自動生成的,但有些時候我們需要手動的去進行鏡像保存功能,于是Flink同時還為我們提供了Savepoints來完成這個功能,Savepoints不僅可以做到故障恢復,還可以用于手動備份、版本遷移、暫停或重啟應用等。
 - Savepoints是Checkpoints的一種特殊實現,底層也是使用Checkpoint機制,因此Savepoints可以認為是具有一些額外元數據的Checkpoints。
 - Savepoints的生成和清理都無法由Flink自動進行,因此都需要用戶自己來顯式觸發。
 
總結
以上是生活随笔為你收集整理的Flink 容错机制:Checkpoints、Savepoints的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Flink 状态管理:算子状态、键值分区
 - 下一篇: 分布式系统概念 | 分布式锁:数据库、R