Flink状态管理与CheckPoint、Savepoint
轉載自:https://blog.csdn.net/hxcaifly/article/details/84673292
????https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81517928
????https://blog.csdn.net/qq_26654727/article/details/83833517
????https://blog.csdn.net/zero__007/article/details/88201498
Flink提供了Exactly once特性,是依賴于帶有barrier的分布式快照+可部分重發的數據源功能實現的。而分布式快照中,就保存了operator的狀態信息。
Flink的失敗恢復依賴于 檢查點機制 + 可部分重發的數據源。
檢查點機制機制:checkpoint定期觸發,產生快照,快照中記錄了:
可部分重發的數據源:Flink選擇最近完成的檢查點K,然后系統重放整個分布式的數據流,然后給予每個operator他們在檢查點k快照中的狀態。數據源被設置為從位置Sk開始重新讀取流。例如在Apache Kafka中,那意味著告訴消費者從偏移量Sk開始重新消費。
Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常。
快照的核心概念之一是barrier。 這些barrier被注入數據流并與記錄一起作為數據流的一部分向下流動。 barriers永遠不會超過記錄,數據流嚴格有序,barrier將數據流中的記錄隔離成一系列的記錄集合,并將一些集合中的數據加入到當前的快照中,而另一些數據加入到下一個快照中。
每個barrier都帶有快照的ID,并且barrier之前的記錄都進入了該快照。 barriers不會中斷流處理,非常輕量級。 來自不同快照的多個barrier可以同時在流中出現,這意味著多個快照可能并發地發生。
單流的barrier:
barrier在數據流源處被注入并行數據流中。快照n的barriers被插入的位置(記之為Sn)是快照所包含的數據在數據源中最大位置。例如,在Apache Kafka中,此位置將是分區中最后一條記錄的偏移量。 將該位置Sn報告給checkpoint協調器(Flink的JobManager)。
然后barriers向下游流動。當一個中間操作算子從其所有輸入流中收到快照n的barriers時,它會為快照n發出barriers進入其所有輸出流中。 一旦sink操作算子(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協調器確認快照n完成。在所有sink確認快照后,意味快照著已完成。
一旦完成快照n,job將永遠不再向數據源請求Sn之前的記錄,因為此時這些記錄(及其后續記錄)將已經通過整個數據流拓撲,也即是已經被處理結束。
多流的barrier:
接收多個輸入流的運算符需要基于快照barriers上對齊(align)輸入流。 上圖說明了這一點:
- 一旦操作算子從一個輸入流接收到快照barriers n,它就不能處理來自該流的任何記錄,直到它從其他輸入接收到barriers n為止。 否則,它會搞混屬于快照n的記錄和屬于快照n + 1的記錄。
- barriers n所屬的流暫時會被擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩沖區。可以看到1,2,3會一直放在Input buffer,直到另一個輸入流的快照到達Operator。
- 一旦從最后一個流接收到barriers n,操作算子就會發出所有掛起的向后傳送的記錄,然后自己發出快照n的barriers。
之后,它恢復處理來自所有輸入流的記錄,在處理來自流的記錄之前優先處理來自輸入緩沖區的記錄。
state一般指一個具體的task/operator的狀態。Flink中包含兩種基礎的狀態:Keyed State和Operator State。
Keyed State,就是基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Operator State與Keyed State不同,Operator State跟一個特定operator的一個并發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。
舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。
Keyed State和Operator State,可以以兩種形式存在:原始狀態和托管狀態(Raw and Managed State)。托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。而raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容。通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義operator時,會使用到原始狀態。
這里重點說說State-Keyed State,基于key/value的狀態接口,這些狀態只能用于keyedStream之上。keyedStream上的operator操作可以包含window或者map等算子操作。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
key/value下可用的狀態接口:
ValueState: 狀態保存的是一個值,可以通過update()來更新,value()獲取。 ListState: 狀態保存的是一個列表,通過add()添加數據,通過get()方法返回一個Iterable來遍歷狀態值。 ReducingState: 這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態值。 MapState:即狀態值為一個map。用戶通過put或putAll方法添加元素。以上所述的State對象,僅僅用于與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。實際上,這些狀態有三種存儲方式: HeapStateBackend、MemoryStateBackend、FsStateBackend、RockDBStateBackend。
- MemoryStateBackend: state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中。
- FsStateBackend: state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中,可以使用hdfs等分布式文件系統。
- RocksDBStateBackend: RocksDB跟上面的都略有不同,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時RocksDB需要配置一個遠端的filesystem。RocksDB克服了state受內存限制的缺點,同時又能夠持久化到遠端文件系統中,比較適合在生產中使用。
通過創建一個StateDescriptor,可以得到一個包含特定名稱的狀態句柄,可以分別創建ValueStateDescriptor、 ListStateDescriptor或ReducingStateDescriptor狀態句柄。狀態是通過RuntimeContext來訪問的,因此只能在RichFunction中訪問狀態。這就要求UDF時要繼承Rich函數,例如RichMapFunction、RichFlatMapFunction等。
Checkpoint的簡單設置
默認情況下,checkpoint不會被保留,取消程序時即會刪除它們,但是可以通過配置保留定期檢查點。開啟Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中做系統設置;其二是針對任務再代碼里靈活配置。推薦第二種方式,針對當前任務設置,設置代碼如下所示:
上面調用enableExternalizedCheckpoints設置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint處理。
ExternalizedCheckpointCleanup 可選項如下:
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業時保留檢查點。請注意,在這種情況下,您必須在取消后手動清理檢查點狀態。
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業時刪除檢查點。只有在作業失敗時,檢查點狀態才可用。
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數:
state.checkpoints.num-retained: 20Flink checkpoint目錄分別對應的是 jobId,flink提供了在啟動之時通過設置 -s 參數指定checkpoint目錄, 讓新的jobId 讀取該checkpoint元文件信息和狀態信息,從而達到指定時間節點啟動job。
Savepoint
說到Checkpoint不得不介紹Savepoint。Savepoint是通過Flink的檢查點機制創建的流作業執行狀態的一致圖像。可以使用Savepoints來停止和恢復,分叉或更新Flink作業。保存點由兩部分組成:穩定存儲(例如HDFS,S3,…)上的(通常是大的)二進制文件和(相對較小的)元數據文件的目錄。穩定存儲上的文件表示作業執行狀態圖像的凈數據。Savepoint的元數據文件以(絕對路徑)的形式包含(主要)指向作為Savepoint一部分的穩定存儲上的所有文件的指針。
從概念上講,Flink的Savepoints與Checkpoints的不同之處在于備份與傳統數據庫系統中的恢復日志不同。檢查點的主要目的是在意外的作業失敗時提供恢復機制。
Checkpoint的生命周期由Flink管理,即Flink創建,擁有和發布Checkpoint,無需用戶交互。作為一種恢復和定期觸發的方法,Checkpoint實現的兩個主要設計目標是:i)being as lightweight to create (輕量級),ii)fast restore (快速恢復)。針對這些目標的優化可以利用某些屬性,例如,JobCode在執行嘗試之間不會改變。
與此相反,Savepoints由用戶創建,擁有和刪除。它們的用例是planned (計劃) 的,manual backup( 手動備份 ) 和 resume(恢復)。例如,這可能是Flink版本的更新,更改Job graph ,更改 parallelism ,分配第二個作業,如紅色/藍色部署,等等。當然,Savepoints必須在終止工作后繼續存在。從概念上講,保存點的生成和恢復成本可能更高,并且更多地關注可移植性和對前面提到的作業更改的支持。
為了能夠在將來升級程序,主要的必要更改是通過uid(String)方法手動指定operator ID 。這些ID用于確定每個運算符的狀態。
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID如果未手動指定ID,則會自動生成這些ID。只要這些ID不變,就可以從保存點自動恢復。生成的ID取決于程序的結構,并且對程序更改很敏感。因此,強烈建議手動分配這些ID。
觸發保存點時,會創建一個新的保存點目錄,其中將存儲數據和元數據。可以通過配置默認目標目錄或使用觸發器命令指定自定義目標目錄來控制此目錄的位置。
注意,checkpoint時的對齊步驟可能增加流式程序的等待時間。通常,這種額外的延遲大約為幾毫秒,但也會見到一些延遲顯著增加的情況。 對于要求所有記錄始終具有超低延遲(幾毫秒)的應用程序,Flink可以在checkpoint期間跳過流對齊。一旦操作算子看到每個輸入流的checkpoint barriers,就會寫 checkpoint 快照。
當跳過對齊時,即使在 checkpoint n的某些 checkpoint barriers 到達之后,操作算子仍繼續處理所有輸入。這樣,操作算子還可以在創建 checkpoint n 的狀態快照之前,繼續處理屬于checkpoint n + 1的數據。 在還原時,這些記錄將作為重復記錄出現,因為它們都包含在 checkpoint n 的狀態快照中,并將作為 checkpoint n 之后數據的一部分進行重復處理。
總結
以上是生活随笔為你收集整理的Flink状态管理与CheckPoint、Savepoint的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么查询信用卡账单
- 下一篇: 信用卡额度被降低了是什么原因