1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等
1.15.Flink state(狀態)管理與恢復
1.15.1.什么是state
1.15.2.狀態(State)
1.15.3.Keyed State
1.15.4.Operator State
1.15.4.1.Snapshotting Operator State
1.15.5.狀態容錯
1.15.6.狀態容錯-生成快照
1.15.7.狀態容錯–恢復快照
1.15.8.checkPoint簡介
1.15.8.1.Barriers
1.15.8.2.Recovery
1.15.9.CheckPoint的配置
1.15.10.狀態的持久性
1.15.11.State Backend(狀態的后端存儲)
1.15.11.1.修改State Backend的兩種方式
1.15.12.Restart Strategies(重啟策略)
1.15.12.1.重啟策略值固定間隔(Fixed delay)
1.15.12.2.重啟策略之失敗率(Failure rate)
1.15.12.3.重啟策略之無重啟 (No restart)
1.15.12.4.保存多個Checkpoint
1.15.12.5.從Checkpoint進行恢復
1.15.12.6.savePoint
1.15.12.7.checkPoint vs savePoint
1.15.12.8.savePoint的使用
1.15.Flink state(狀態)管理與恢復
1.15.1.什么是state
雖然數據流中的許多操作一次只查看單個事件(例如an event parser),但有些操作記住跨多個事件的信息(例如window operators)。這些操作稱為有狀態操作。
一些有狀態操作的例子:
?當應用程序搜索某些事件模式時,狀態將存儲到目前為止遇到的事件序列。
?當按分鐘/小時/天聚合事件時,狀態將保存掛起的聚合。
?當通過stream of data points訓練機器學習的模型時,state保留著當前版本的模型參數。
?當歷史數據需要被管理的時候,該狀態允許對過去發生的事件進行有效訪問。
使用checkpoint和savepoints保證容錯的時候需要知道state。
state的狀態,還有助于對Flink應用程序進行調整,這意味著Flink可以在并行實例間重新分布狀態。
queryable state允許您在運行時從Flink外部訪問狀態。
在使用狀態時,讀取有關Flink的state backends也可能很有用。Flink提供了不同的state backend,用于指定狀態存儲的方式和位置。
1.15.2.狀態(State)
?我們前面寫的word count的例子,沒有包含狀態管理。如果一個task在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義上(at least once,exactly once),Flink引入了state和checkpoint。
?首先區分一下兩個概念
- ?state一般指一個具體的task/operator的狀態【state數據默認保存在java的堆內存中】
- ?而checkpoint【可以理解為checkpoint是把state數據持久化存儲了】,則表示了一個Flink Job在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。
- ?注意:task是Flink中執行的基本單位。Operator指算子(transformation)
?State可以被記錄,在失敗的情況下數據還可以恢復
?Flink中有兩種基本類型的State - ?Keyed State
- ?Operator State
?Keyed State和Operator State,可以以兩種形式存在: - ?原始狀態(raw state)
- ?托管狀態(managed state)
?托管狀態是由Flink框架管理的狀態
?而原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
?通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。
1.15.3.Keyed State
?顧名思義,就是基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
stream.keyBy(…)
?保存state的數據結構
- ?ValueState:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。
- ?ListState:即key上的狀態值為一個列表??梢酝ㄟ^add方法往列表中附加值;也可以通過get()方法返回一個Iterable來遍歷狀態值。
- ?ReducingState:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態值。
- ?MapState<UK,UV>:即狀態值為一個map。用戶通過put或putAll方法添加元素。
?需要注意的是,以上所述的State對象,僅僅用于與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。相當于我們只是持有了這個狀態的句柄。
Keyed state在嵌入式的key/value存儲中進行維護。該狀態是與有狀態操作符(stateful operators)讀取的流一起嚴格分區和分發的。因此只有在keyed Stream流中訪問key/value的state,對齊流和狀態的鍵可以確保所有的狀態更新都是本地操作,從而保證沒有事務開銷的一致性。這種對齊還允許Flink透明地重新分配狀態和調整流分區。
Keyed State被進一步組織成所謂的Key Groups。Key Groups是Flink能夠重新分配keyed State的原子單元(atomic unit),key Group的數量和定義的最大的并行度的數量是一致的。在執行每個并行的keyed operator的實例,在執行期間,key operator 的每個并行實例與一個或多個key Group的鍵一起執行。
1.15.4.Operator State
?與Key無關的State,與Operator綁定的state,整個operator只對應一個state
?保存state的數據結構
ListState
?舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。
1.15.4.1.Snapshotting Operator State
當operators包含任何形式的狀態,該狀態也必須是快照的一部分。
operators在從輸入流接收到所有快照barriers的時間點,以及向輸出流發出barriers 之前,對其狀態進行快照。屆時,將在進行barriers 之前從記錄對狀態進行所有更新,并且沒有依賴于應用barriers之后的記錄來進行狀態更新。由于快照的狀態可能很大,因此將其存儲在可配置狀態backend中。默認情況下,這是JobManager的內存,但對于生產用途,應配置分布式可靠存儲(例如HDFS)。存儲狀態后,operator 確認檢查點,將快照operator 發送到輸出流中,然后繼續。
現在生成的快照包含:
?對于每個并行流數據源,啟動快照時流中的偏移量/位置
?對于每個運算符,一個指向作為快照一部分存儲的狀態的指針
?為了保證state的容錯性,Flink需要對state進行checkpoint。
?Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常。
?Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提:
- ?持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或文件系統(比如HDFS,S3,GFS等)
- ?用于state的持久化存儲,例如分布式文件系統(比如HDFS,S3,GFS等)
1.15.5.狀態容錯
?依靠checkPoint機制
?保證exectly-once
- ?只能保證Flink系統內的exactly-once
- ?對于source和sink需要依賴外部的組件一同保證
1.15.6.狀態容錯-生成快照
1.15.7.狀態容錯–恢復快照
1.15.8.checkPoint簡介
Flink容錯機制的核心部分是繪制分布式數據流和operator state的一致快照。這些快照充當一致的檢查點,如果發生故障,系統可以回退到這些檢查點。Flink繪制這些快照的機制在“Lightweight Asynchronous Snapshots for Distributed Dataflows”(https://arxiv.org/abs/1506.08603)中進行了描述。它受用于分布式快照的標準Chandy-Lamport算法的啟發, 并且專門針對Flink的執行模型進行了量身定制。
請記住,與檢查點有關的所有操作都可以異步完成。檢查點barriers 不會在鎖定步驟中傳播,并且操作可以異步快照其狀態。
1.15.8.1.Barriers
stream barriers是Flink分布式快照中的核心元素。這些barriers將注入到數據流中,并與記錄一起作為數據流的一部分流動。Barriers 從不overtake 記錄,它們嚴格按照順序進行。barrier將數據流中的記錄分為進入當前快照的記錄集和進入下一個快照的記錄集。每個barrier都帶有快照的ID,快照的記錄已推送到快照的前面。Barriers不會中斷流的流動,因此非常輕便。來自不同快照的多個barriers可以同時出現在流中,這意味著各種快照可能同時發生。
Stream barriers在 stream sources處注入并行數據流中。快照n的barriers 被注入的點(我們稱其為 S n)是快照覆蓋數據的source stream中的位置。例如,在Apache Kafka中,此位置將是分區中最后一條記錄的offset 。該位置S n 被報告給 checkpoint coordinator (Flink的JobManager)。
barriers 然后順流而下。當中間operator 從其所有輸入流中收到快照n的barrier時,它會將快照n的barrier 發射到其所有輸出流中。sink operator(streaming dag的末尾)從其所有輸入流接收到barrier n之后,便將快照n確認給checkpoint coordinator。所有接收器都確認快照后,就認為快照已完成。
一旦快照n完成,作業將不再向源請求來自Sn之前的記錄,因為此時這些記錄(及其后代記錄)將通過整個數據流拓撲。
接收到多個輸入流的操作員需要在快照barriers上對齊輸入流。上圖說明了這一點:
?operator一旦從傳入流接收到快照barrier n,就無法處理該流中的任何其他記錄,直到它也從其他輸入接收到barrier n為止。否則,它將混合屬于快照n的記錄和屬于快照n + 1的記錄。
?報告barrier n的流被暫時擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區中。
?一旦最后一個流接收到barriers n,operator 將發出所有未決的傳出記錄,然后自身發出快照n barriers 。
?最后,operator 將狀態異步寫入到state backend。
1.15.8.2.Recovery
這種機制下的恢復非常簡單:當出現故障時,Flink選擇最新完成的檢查點k,然后系統重新部署整個分布式數據流,并給每個operator 狀態快照作為檢查點k的一部分。sources 被設置為從位置Sk開始讀取流。例如,在Apache Kafka中,這意味著告訴consumer開始從offset Sk進行獲取數據。
如果state是增量快照,operators 從最新的完整快照的狀態開始,然后對該狀態應用一系列增量快照更新。
1.15.9.CheckPoint的配置
默認checkpoint功能是disabled的,想要使用的時候需要先啟用。
checkpoint開啟之后,默認的checkPointMode是Exactly-once
checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
Exactly-once對于大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)
1.15.10.狀態的持久性
Flink通過結合流重播(stream replay)和 檢查點(checkpointing)來實現容錯。檢查點標記每個輸入流中的特定點以及每個operators的對應狀態。通過恢復operators 的狀態并從檢查點開始重播記錄,可以在保持一致性(一次處理語義)的同時從檢查點恢復流式數據流。
檢查點間隔(checkpoint interval)是在執行過程中權衡容錯開銷與恢復時間(需要重播的記錄數)的一種手段。
容錯機制連續繪制分布式流數據流的快照。對于狀態較小的流應用程序,這些快照非常輕量級(light-weight),可以在不影響性能的情況下頻繁繪制。流應用程序(streaming applications)的狀態通常存儲在分布式文件系統中的可配置位置。
如果發生程序故障(由于機器,網絡或軟件故障),Flink將停止分布式流數據流。然后,系統重新啟動operators ,并將其重置為最新的成功檢查點。輸入流將重置為狀態快照的點。確保作為重新啟動的并行數據流的一部分處理的任何記錄都不會影響以前的檢查點狀態。
**[注意]**默認情況下,檢查點是禁用的。有關如何啟用和配置檢查點的詳細信息,請參見檢查點。
**[注意]**為了使該機制實現其全部保證,數據流源(例如消息隊列或代理)必須能夠將流后退到定義的最近點。Apache Kafka具有此功能,Flink與Kafka的連接器利用了這一功能。有關Flink連接器提供的保證的更多信息,請參見數據源和接收器的容錯保證。
**[注意]**由于Flink的檢查點是通過分布式快照實現的,因此我們可以交替使用快照和檢查點一詞。通常,我們也使用快照一詞來表示檢查點或保存點。
1.15.11.State Backend(狀態的后端存儲)
存儲鍵/值索引的確切數據結構取決于所選擇的state backend。一個state backend存儲數據在內存中的hash map,another state backend使用RocksDB作為鍵/值存儲。除了定義保存狀態的數據結構外,state backends還實現了獲取鍵/值狀態的時間點快照并將該快照存儲為檢查點的一部分的邏輯。可以在不更改應用程序邏輯的情況下配置State backends。
?默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。
?state 的store和checkpoint的位置取決于State Backend的配置。
env.setStateBackend(…)
?一共有三種State Backend
?MemoryStateBackend
- ?state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中。
- ?基于內存的state backend在生產環境下不建議使用
?FsStateBackend
- ?state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中。
- ?可以使用hdfs等分布式文件系統。
?RocksDBStateBackend (RocksDB是一個為更快速存儲而生的,可嵌入的持久型的key-value存儲)
- ?RocksDB跟上面的都略有不同,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接復制到filesystem中。fail over的時候從filesystem中恢復到本地。
- ?RocksDB克服了state受內存限制的缺點,同時又能夠持久化到遠端文件系統中,比較適合在生產中使用。
1.15.11.1.修改State Backend的兩種方式
?第一種:單任務調整
- ?修改當前任務代碼
- ?env.setStateBackend(new FsStateBackend(“hdfs://namenode:9000/flink/checkpointsk”))
- ?或者new MemoryStateBackend()
- ?或者new RocksDBStateBackend(filebackend,true);需要添加第三方依賴
?第二種:全局調整 - ?修改flink-conf.yaml
- ?state.backend: filesystem
- ?state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
- ?注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
注意:
如果想使用RocksDBStateBackend使用,需要引入以下:
1.15.12.Restart Strategies(重啟策略)
?Flink支持不同的重啟策略,以在故障發生時控制作業如何重啟
?集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略。
?常用的重啟策略
- ?固定間隔 (Fixed delay)
- ?失敗率 (Failure rate)
- ?無重啟 (No restart)
?如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
?如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 參數是嘗試重啟次數。
?重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動態指定,會覆蓋全局配置。
1.15.12.1.重啟策略值固定間隔(Fixed delay)
?第一種:全局配置flink-conf.yaml
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s?第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 嘗試重啟的次數Time.of(10, TimeUnit.SECONDS) // 間隔 ));1.15.12.2.重啟策略之失敗率(Failure rate)
?第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s?第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 一個時間段內的最大失敗次數Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段Time.of(10, TimeUnit.SECONDS) // 間隔 ));1.15.12.3.重啟策略之無重啟 (No restart)
?第一種:全局配置 flink-conf.yaml
restart-strategy: none
?第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.noRestart());
1.15.12.4.保存多個Checkpoint
?默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。
?Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數。
- ?state.checkpoints.num-retained: 20
?這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
- ?hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
- ?如果希望回退到某個Checkpoints點,只需要指定對應的某個Checkpoint路徑即可實現。
1.15.12.5.從Checkpoint進行恢復
?如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
?bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
?程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據。
1.15.12.6.savePoint
所有使用檢查點的程序都可以從保存點恢復執行。保存點允許在不丟失任何狀態的情況下更新程序和Flink集群。保存點是手動觸發的檢查點,以程序的快照,寫出來state backend。它們依賴于常規的檢查點機制。
保存點與檢查點類似,除了它們是由用戶觸發的,并且不會在完成新的檢查點時自動過期。
Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷。
全局,一致性快照??梢员4鏀祿磑ffset,operator操作狀態等信息。
可以從應用在過去任意做了savepoint的時刻開始繼續消費。
1.15.12.7.checkPoint vs savePoint
?checkpoint
- ?應用定時觸發,用于保存狀態,會過期
- ?內部應用失敗重啟的時候使用
?savePoint
?用戶手動執行,是指向Checkpoint的指針,不會過期
?在升級的情況下使用
?注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦程序員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用于確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴于程序的結構,并且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID。
1.15.12.8.savePoint的使用
?1:在flink-conf.yaml中配置Savepoint存儲位置
- ?不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置。
- ?state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
?2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】 - ?bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
- ?bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
?從指定的savepoint啟動job - ?bin/flink run -s savepointPath [runArgs]
總結
以上是生活随笔為你收集整理的1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 北邮国安是国企吗
- 下一篇: 开公司独资还是合资好 还是要看个人的判