2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once
目錄
擴展閱讀 End-to-End Exactly-Once
流處理的數據處理語義
At-most-once-最多一次
At-least-once-至少一次
?Exactly-once-精確一次
End-to-End Exactly-Once-端到端的精確一次
注意:精確一次? 有效一次!
補充:流計算系統如何支持一致性語義
???????End-to-End Exactly-Once的實現
???????Source
???????Transformation
???????Sink
???????Flink+Kafka的End-to-End Exactly-Once
???????版本說明
???????兩階段提交-API
???????兩階段提交-簡單流程
???????兩階段提交-詳細流程
代碼示例
Flink+Kafka實現End-to-End Exactly-Once
???????Flink+MySQL實現End-to-End Exactly-Once
深度總結
Exactly-Once
End-to-End Exactly-Once
Flink如何支持End-to-End Exactly-Once的?
兩階段事務提交協議
擴展閱讀 End-to-End Exactly-Once
Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。
流處理的數據處理語義
對于批處理,fault-tolerant(容錯性)很容易做,失敗只需要replay,就可以完美做到容錯。
對于流處理,數據流本身是動態,沒有所謂的開始或結束,雖然可以replay buffer的部分數據,但fault-tolerant做起來會復雜的多
流處理(有時稱為事件處理)可以簡單地描述為是對無界數據或事件的連續處理。流或事件處理應用程序可以或多或少地被描述為有向圖,并且通常被描述為有向無環圖(DAG)。在這樣的圖中,每個邊表示數據或事件流,每個頂點表示運算符,會使用程序中定義的邏輯處理來自相鄰邊的數據或事件。有兩種特殊類型的頂點,通常稱為 sources 和 sinks。sources讀取外部數據/事件到應用程序中,而 sinks 通常會收集應用程序生成的結果。下圖是流式應用程序的示例。有如下特點:
分布式情況下是由多個Source(讀取數據)節點、多個Operator(數據處理)節點、多個Sink(輸出)節點構成
每個節點的并行數可以有差異,且每個節點都有可能發生故障
對于數據正確性最重要的一點,就是當發生故障時,是怎樣容錯與恢復的。
?
流處理引擎通常為應用程序提供了三種數據處理語義:最多一次、至少一次和精確一次。
如下是對這些不同處理語義的寬松定義(一致性由弱到強):
At most noce < At least once < Exactly once < End to End Exactly once
At-most-once-最多一次
有可能會有數據丟失
這本質上是簡單的恢復方式,也就是直接從失敗處的下個數據開始恢復程序,之前的失敗數據處理就不管了。可以保證數據或事件最多由應用程序中的所有算子處理一次。 這意味著如果數據在被流應用程序完全處理之前發生丟失,則不會進行其他重試或者重新發送。
?
?
?
?
At-least-once-至少一次
有可能重復處理數據
應用程序中的所有算子都保證數據或事件至少被處理一次。這通常意味著如果事件在流應用程序完全處理之前丟失,則將從源頭重放或重新傳輸事件。然而,由于事件是可以被重傳的,因此一個事件有時會被處理多次(至少一次),至于有沒有重復數據,不會關心,所以這種場景需要人工干預自己處理重復數據
?Exactly-once-精確一次
?
Exactly-Once 是 Flink、Spark 等流處理系統的核心特性之一,這種語義會保證每一條消息只被流處理系統處理一次。即使是在各種故障的情況下,流應用程序中的所有算子都保證事件只會被『精確一次』的處理。(也有文章將 Exactly-once 翻譯為:完全一次,恰好一次)
Flink實現『精確一次』的分布式快照/狀態檢查點方法受到 Chandy-Lamport 分布式快照算法的啟發。通過這種機制,流應用程序中每個算子的所有狀態都會定期做 checkpoint。如果是在系統中的任何地方發生失敗,每個算子的所有狀態都回滾到最新的全局一致 checkpoint 點。在回滾期間,將暫停所有處理。源也會重置為與最近 checkpoint 相對應的正確偏移量。整個流應用程序基本上是回到最近一次的一致狀態,然后程序可以從該狀態重新啟動。
End-to-End Exactly-Once-端到端的精確一次
Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。
它指的是 Flink 應用從 Source 端開始到 Sink 端結束,數據必須經過的起始點和結束點。
注意:
『exactly-once』和『End-to-End Exactly-Once』的區別:
?
?
?
?
???????注意:精確一次? 有效一次!
有些人可能認為『精確一次』描述了事件處理的保證,其中流中的每個事件只被處理一次。實際上,沒有引擎能夠保證正好只處理一次。在面對任意故障時,不可能保證每個算子中的用戶定義邏輯在每個事件中只執行一次,因為用戶代碼被部分執行的可能性是永遠存在的。
那么,當引擎聲明『精確一次』處理語義時,它們能保證什么呢?如果不能保證用戶邏輯只執行一次,那么什么邏輯只執行一次?當引擎聲明『精確一次』處理語義時,它們實際上是在說,它們可以保證引擎管理的狀態更新只提交一次到持久的后端存儲。
事件的處理可以發生多次,但是該處理的效果只在持久后端狀態存儲中反映一次。因此,我們認為有效地描述這些處理語義最好的術語是『有效一次』(effectively once)
?
???????補充:流計算系統如何支持一致性語義
?
?
?
?
?
?
?
???????End-to-End Exactly-Once的實現
通過前面的學習,我們了解到,Flink內部借助分布式快照Checkpoint已經實現了內部的Exactly-Once,但是Flink 自身是無法保證外部其他系統“精確一次”語義的,所以 Flink 若要實現所謂“端到端(End to End)的精確一次”的要求,那么外部系統必須支持“精確一次”語義;然后借助一些其他手段才能實現。如下:
???????Source
發生故障時需要支持重設數據的讀取位置,如Kafka可以通過offset來實現(其他的沒有offset系統,我們可以自己實現累加器計數)
???????Transformation
也就是Flink內部,已經通過Checkpoint保證了,如果發生故障或出錯時,Flink應用重啟后會從最新成功完成的checkpoint中恢復——重置應用狀態并回滾狀態到checkpoint中輸入流的正確位置,之后再開始執行數據處理,就好像該故障或崩潰從未發生過一般。
- 分布式快照機制
我們在之前的課程中講解過 Flink 的容錯機制,Flink?提供了失敗恢復的容錯機制,而這個容錯機制的核心就是持續創建分布式數據流的快照來實現。
?
同 Spark 相比,Spark 僅僅是針對 Driver 的故障恢復 Checkpoint。而 Flink 的快照可以到算子級別,并且對全局數據也可以做快照。Flink 的分布式快照受到 ?Chandy-Lamport?分布式快照算法啟發,同時進行了量身定做。
?
- Barrier
Flink 分布式快照的核心元素之一是 Barrier(數據柵欄),我們也可以把 Barrier 簡單地理解成一個標記,該標記是嚴格有序的,并且隨著數據流往下流動。每個 Barrier 都帶有自己的 ID,Barrier 極其輕量,并不會干擾正常的數據處理。
?
如上圖所示,假如我們有一個從左向右流動的數據流,Flink 會依次生成 snapshot 1、 snapshot 2、snapshot 3……Flink 中有一個專門的“協調者”負責收集每個 snapshot 的位置信息,這個“協調者”也是高可用的。
?
Barrier 會隨著正常數據繼續往下流動,每當遇到一個算子,算子會插入一個標識,這個標識的插入時間是上游所有的輸入流都接收到 snapshot n。與此同時,當我們的 sink 算子接收到所有上游流發送的 Barrier 時,那么就表明這一批數據處理完畢,Flink 會向“協調者”發送確認消息,表明當前的 snapshot n 完成了。當所有的 sink 算子都確認這批數據成功處理后,那么本次的 snapshot 被標識為完成。
?
這里就會有一個問題,因為 Flink 運行在分布式環境中,一個 operator 的上游會有很多流,每個流的 barrier n 到達的時間不一致怎么辦?這里 Flink 采取的措施是:快流等慢流。
拿上圖的 barrier n 來說,其中一個流到的早,其他的流到的比較晚。當第一個 barrier n到來后,當前的 operator 會繼續等待其他流的 barrier n。直到所有的barrier n 到來后,operator 才會把所有的數據向下發送。
- 異步和增量
按照上面我們介紹的機制,每次在把快照存儲到我們的狀態后端時,如果是同步進行就會阻塞正常任務,從而引入延遲。因此 Flink 在做快照存儲時,可采用異步方式。
此外,由于 checkpoint 是一個全局狀態,用戶保存的狀態可能非常大,多數達 G 或者 T 級別。在這種情況下,checkpoint 的創建會非常慢,而且執行時占用的資源也比較多,因此 Flink 提出了增量快照的概念。也就是說,每次都是進行的全量 checkpoint,是基于上次進行更新的。
?
???????Sink
需要支持冪等寫入或事務寫入(Flink的兩階段提交需要事務支持)
?
???????冪等寫入(Idempotent Writes)
冪等寫操作是指:任意多次向一個系統寫入數據,只對目標系統產生一次結果影響。
例如,重復向一個HashMap里插入同一個Key-Value二元對,第一次插入時這個HashMap發生變化,后續的插入操作不會改變HashMap的結果,這就是一個冪等寫操作。
HBase、Redis和Cassandra這樣的KV數據庫一般經常用來作為Sink,用以實現端到端的Exactly-Once。
需要注意的是,并不是說一個KV數據庫就百分百支持冪等寫。冪等寫對KV對有要求,那就是Key-Value必須是可確定性(Deterministic)計算的。假如我們設計的Key是:name + curTimestamp,每次執行數據重發時,生成的Key都不相同,會產生多次結果,整個操作不是冪等的。因此,為了追求端到端的Exactly-Once,我們設計業務邏輯時要盡量使用確定性的計算邏輯和數據模型。
?
???????事務寫入(Transactional Writes)
Flink借鑒了數據庫中的事務處理技術,同時結合自身的Checkpoint機制來保證Sink只對外部輸出產生一次影響。大致的流程如下:
Flink先將待輸出的數據保存下來暫時不向外部系統提交,等到Checkpoint結束時,Flink上下游所有算子的數據都是一致的時候,Flink將之前保存的數據全部提交(Commit)到外部系統。換句話說,只有經過Checkpoint確認的數據才向外部系統寫入。
如下圖所示,如果使用事務寫,那只把時間戳3之前的輸出提交到外部系統,時間戳3以后的數據(例如時間戳5和8生成的數據)暫時保存下來,等待下次Checkpoint時一起寫入到外部系統。這就避免了時間戳5這個數據產生多次結果,多次寫入到外部系統。
?
在事務寫的具體實現上,Flink目前提供了兩種方式:
1.預寫日志(Write-Ahead-Log,WAL)
2.兩階段提交(Two-Phase-Commit,2PC)
這兩種方式區別主要在于:
1.WAL方式通用性更強,適合幾乎所有外部系統,但也不能提供百分百端到端的Exactly-Once,因為WAL預習日志會先寫內存,而內存是易失介質。
2.如果外部系統自身就支持事務(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。
事務寫的方式能提供端到端的Exactly-Once一致性,它的代價也是非常明顯的,就是犧牲了延遲。輸出數據不再是實時寫入到外部系統,而是分批次地提交。目前來說,沒有完美的故障恢復和Exactly-Once保障機制,對于開發者來說,需要在不同需求之間權衡。
?
???????Flink+Kafka的End-to-End Exactly-Once
在上一小節我們了解到Flink的 End-to-End Exactly-Once需要Checkpoint+事務的提交/回滾操作,在分布式系統中協調提交和回滾的一個常見方法就是使用兩階段提交協議。接下來我們了解下Flink的TwoPhaseCommitSinkFunction是如何支持End-to-End Exactly-Once的
?
???????版本說明
Flink 1.4版本之前,支持Exactly Once語義,僅限于應用內部。
Flink 1.4版本之后,通過兩階段提交(TwoPhaseCommitSinkFunction)支持End-To-End Exactly Once,而且要求Kafka 0.11+。
利用TwoPhaseCommitSinkFunction是通用的管理方案,只要實現對應的接口,而且Sink的存儲支持變亂提交,即可實現端到端的劃一性語義。
?
???????兩階段提交-API
在 Flink 中的Two-Phase-Commit-2PC兩階段提交的實現方法被封裝到了 TwoPhaseCommitSinkFunction 這個抽象類中,只需要實現其中的beginTransaction、preCommit、commit、abort 四個方法就可以實現“精確一次”的處理語義,如FlinkKafkaProducer就實現了該類并實現了這些方法
?
1.beginTransaction,在開啟事務之前,我們在目標文件系統的臨時目錄中創建一個臨時文件,后面在處理數據時將數據寫入此文件;
2.preCommit,在預提交階段,刷寫(flush)文件,然后關閉文件,之后就不能寫入到文件了,我們還將為屬于下一個檢查點的任何后續寫入啟動新事務;
3.commit,在提交階段,我們將預提交的文件原子性移動到真正的目標目錄中,請注意,這會增加輸出數據可見性的延遲;
4.abort,在中止階段,我們刪除臨時文件。
?
???????兩階段提交-簡單流程
?
整個過程可以總結為下面四個階段:
1.一旦 Flink 開始做 checkpoint 操作,那么就會進入 pre-commit “預提交”階段,同時JobManager的Coordinator?會將?Barrier 注入數據流中 ;
2.當所有的 barrier 在算子中成功進行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預提交”階段完成;
3.等所有的算子完成“預提交”,就會發起一個commit“提交”動作,但是任何一個“預提交”失敗都會導致 Flink 回滾到最近的 checkpoint;
?
???????兩階段提交-詳細流程
- 需求
接下來將介紹兩階段提交協議,以及它如何在一個讀寫Kafka的Flink程序中實現端到端的Exactly-Once語義。Kafka經常與Flink一起使用,且Kafka在最近的0.11版本中添加了對事務的支持。這意味著現在通過Flink讀寫Kafaka,并提供端到端的Exactly-Once語義有了必要的支持。
?
在上圖中,我們有:
– 從Kafka讀取的數據源(Flink內置的KafkaConsumer)
– 窗口聚合
– 將數據寫回Kafka的數據輸出端(Flink內置的KafkaProducer)
要使數據輸出端提供Exactly-Once保證,它必須將所有數據通過一個事務提交給Kafka。提交捆綁了兩個checkpoint之間的所有要寫入的數據。這可確保在發生故障時能回滾寫入的數據。
但是在分布式系統中,通常會有多個并發運行的寫入任務的,簡單的提交或回滾是不夠的,因為所有組件必須在提交或回滾時“一致”才能確保一致的結果。
Flink使用兩階段提交協議及預提交階段來解決這個問題。
?
- 預提交-內部狀態
在checkpoint開始的時候,即兩階段提交協議的“預提交”階段。當checkpoint開始時,Flink的JobManager會將checkpoint barrier(將數據流中的記錄分為進入當前checkpoint與進入下一個checkpoint)注入數據流。
brarrier在operator之間傳遞。對于每一個operator,它觸發operator的狀態快照寫入到state backend。
?
數據源保存了消費Kafka的偏移量(offset),之后將checkpoint barrier傳遞給下一個operator。
這種方式僅適用于operator具有『內部』狀態。所謂內部狀態,是指Flink state backend保存和管理的 -例如,第二個operator中window聚合算出來的sum值。當一個進程有它的內部狀態的時候,除了在checkpoint之前需要將數據變更寫入到state backend,不需要在預提交階段執行任何其他操作。Flink負責在checkpoint成功的情況下正確提交這些寫入,或者在出現故障時中止這些寫入。
?
- 預提交-外部狀態
但是,當進程具有『外部』狀態時,需要作些額外的處理。外部狀態通常以寫入外部系統(如Kafka)的形式出現。在這種情況下,為了提供Exactly-Once保證,外部系統必須支持事務,這樣才能和兩階段提交協議集成。
在該示例中的數據需要寫入Kafka,因此數據輸出端(Data Sink)有外部狀態。在這種情況下,在預提交階段,除了將其狀態寫入state backend之外,數據輸出端還必須預先提交其外部事務。
?
當checkpoint barrier在所有operator都傳遞了一遍,并且觸發的checkpoint回調成功完成時,預提交階段就結束了。所有觸發的狀態快照都被視為該checkpoint的一部分。checkpoint是整個應用程序狀態的快照,包括預先提交的外部狀態。如果發生故障,我們可以回滾到上次成功完成快照的時間點。
- 提交階段
下一步是通知所有operator,checkpoint已經成功了。這是兩階段提交協議的提交階段,JobManager為應用程序中的每個operator發出checkpoint已完成的回調。
數據源和widnow operator沒有外部狀態,因此在提交階段,這些operator不必執行任何操作。但是,數據輸出端(Data Sink)擁有外部狀態,此時應該提交外部事務。
?
- 總結
我們對上述知識點總結下:
1.一旦所有operator完成預提交,就提交一個commit。
2.如果只要有一個預提交失敗,則所有其他提交都將中止,我們將回滾到上一個成功完成的checkpoint。
3.在預提交成功之后,提交的commit需要保證最終成功 – operator和外部系統都需要保障這點。如果commit失敗(例如,由于間歇性網絡問題),整個Flink應用程序將失敗,應用程序將根據用戶的重啟策略重新啟動,還會嘗試再提交。這個過程至關重要,因為如果commit最終沒有成功,將會導致數據丟失。
4.完整的實現兩階段提交協議可能有點復雜,這就是為什么Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。
?
代碼示例
Flink+Kafka實現End-to-End Exactly-Once
https://ververica.cn/developers/flink-kafka-end-to-end-exactly-once-analysis/
package cn.lanson.extend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* Kafka --> Flink-->Kafka ?的End-To-End-Exactly-once* 直接使用* FlinkKafkaConsumer ?+ ?Flink的Checkpoint ?+ ?FlinkKafkaProducer*/
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數設置====//===========類型1:必須參數=============//設置Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!env.enableCheckpointing(1000);//設置State狀態存儲介質if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數===========//設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0//設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗//設置是否清理檢查點,表示?Cancel 時是否需要保留當前的?Checkpoint,默認?Checkpoint會在作業被Cancel時被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認的即可===============//設置checkpoint的執行模式為EXACTLY_ONCE(默認)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置checkpoint的超時時間,如果?Checkpoint在?60s內尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鐘//設置同一時間有多少個checkpoint可以同時執行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1//=============重啟策略===========env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2.SourceProperties props_source = new Properties();props_source.setProperty("bootstrap.servers", "node1:9092");props_source.setProperty("group.id", "flink");props_source.setProperty("auto.offset.reset", "latest");props_source.setProperty("flink.partition-discovery.interval-millis", "5000");//會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況//props_source.setProperty("enable.auto.commit", "true");//沒有Checkpoint的時候使用自動提交偏移量到默認主題:__consumer_offsets中//props_source.setProperty("auto.commit.interval.ms", "2000");//kafkaSource就是KafkaConsumerFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props_source);kafkaSource.setStartFromLatest();//kafkaSource.setStartFromGroupOffsets();//設置從記錄的offset開始消費,如果沒有記錄從auto.offset.reset配置開始消費//kafkaSource.setStartFromEarliest();//設置直接從Earliest消費,和auto.offset.reset配置無關kafkaSource.setCommitOffsetsOnCheckpoints(true);//執行Checkpoint的時候提交offset到Checkpoint(Flink用),并且提交一份到默認主題:__consumer_offsets(外部其他系統想用的話也可以獲取到)DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//3.Transformation//3.1切割出每個單詞并直接記為1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {Random random = new Random();int i = random.nextInt(5);if (i > 3) {System.out.println("出bug了...");throw new RuntimeException("出bug了...");}out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);//3.4將聚合結果轉為自定義的字符串格式SingleOutputStreamOperator<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sink//result.print();Properties props_sink = new Properties();props_sink.setProperty("bootstrap.servers", "node1:9092");props_sink.setProperty("transaction.timeout.ms", 1000 * 5 + "");//設置事務超時時間,也可在kafka配置中設置/*FlinkKafkaProducer<String> kafkaSink0 = new FlinkKafkaProducer<>("flink_kafka",new SimpleStringSchema(),props_sink);*/FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2",new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),props_sink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);result.addSink(kafkaSink);//5.executeenv.execute();//測試://1.創建主題?/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2//2.開啟控制臺生產者?/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka//3.開啟控制臺消費者?/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2}
}
?
???????Flink+MySQL實現End-to-End Exactly-Once
https://www.jianshu.com/p/5bdd9a0d7d02
- 需求
1.checkpoint每10s進行一次,此時用FlinkKafkaConsumer實時消費kafka中的消息
2.消費并處理完消息后,進行一次預提交數據庫的操作
3.如果預提交沒有問題,10s后進行真正的插入數據庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的數據放到hdfs中
4.如果預提交出錯,比如在5s的時候出錯了,此時Flink程序就會進入不斷的重啟中,重啟的策略可以在配置中設置,checkpoint記錄的還是上一次成功消費的offset,因為本次消費的數據在checkpoint期間,消費成功,但是預提交過程中失敗了
5.注意此時數據并沒有真正的執行插入操作,因為預提交(preCommit)失敗,提交(commit)過程也不會發生。等將異常數據處理完成之后,再重新啟動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。
?
- 代碼1
package cn.lanson.extend;import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.kafka.clients.CommonClientConfigs;import java.sql.*; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties;public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//方便測試env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setStateBackend(new FsStateBackend("file:///D:/ckp"));//2.SourceString topic = "flink_kafka";Properties props = new Properties();props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");props.setProperty("group.id","flink");props.setProperty("auto.offset.reset","latest");//如果有記錄偏移量從記錄的位置開始消費,如果沒有從最新的數據開始消費props.setProperty("flink.partition-discovery.interval-millis","5000");//開一個后臺線程每隔5s檢查Kafka的分區狀態FlinkKafkaConsumer<ObjectNode> kafkaSource = new FlinkKafkaConsumer<>("topic_in", new JSONKeyValueDeserializationSchema(true), props);kafkaSource.setStartFromGroupOffsets();//從group offset記錄的位置位置開始消費,如果kafka broker 端沒有該group信息,會根據"auto.offset.reset"的設置來決定從哪開始消費kafkaSource.setCommitOffsetsOnCheckpoints(true);//Flink執行Checkpoint的時候提交偏移量(一份在Checkpoint中,一份在Kafka的默認主題中__comsumer_offsets(方便外部監控工具去看))DataStreamSource<ObjectNode> kafkaDS = env.addSource(kafkaSource);//3.transformation//4.SinkkafkaDS.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");//5.executeenv.execute();} }/**自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交。功能:保證kafak to mysql 的Exactly-OnceCREATE TABLE `t_test` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`value` varchar(255) DEFAULT NULL,`insert_time` datetime DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4*/ class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 執行數據入庫操作*/@Overrideprotected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {System.err.println("start invoke.......");String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.err.println("===>date:" + date + " " + objectNode);String value = objectNode.get("value").toString();String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));//執行insert語句ps.execute();//手動制造異常if(Integer.parseInt(value) == 15) System.out.println(1/0);}/*** 獲取連接,開啟手動提交事務(getConnection方法中)*/@Overrideprotected Connection beginTransaction() throws Exception {String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "root");System.err.println("start beginTransaction......."+connection);return connection;}/*** 預提交,這里預提交的邏輯在invoke方法中*/@Overrideprotected void preCommit(Connection connection) throws Exception {System.err.println("start preCommit......."+connection);}/*** 如果invoke執行正常則提交事務*/@Overrideprotected void commit(Connection connection) {System.err.println("start commit......."+connection);DBConnectUtil.commit(connection);}@Overrideprotected void recoverAndCommit(Connection connection) {System.err.println("start recoverAndCommit......."+connection);}@Overrideprotected void recoverAndAbort(Connection connection) {System.err.println("start abort recoverAndAbort......."+connection);}/*** 如果invoke執行異常則回滾事務,下一次的checkpoint操作也不會執行*/@Overrideprotected void abort(Connection connection) {System.err.println("start abort rollback......."+connection);DBConnectUtil.rollback(connection);} }class DBConnectUtil {/*** 獲取連接*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;conn = DriverManager.getConnection(url, user, password);//設置手動提交conn.setAutoCommit(false);return conn;}/*** 提交事務*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 事務回滾*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 關閉連接*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}} }?
?
- 代碼2
package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class DataProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "node1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);try {for (int i = 1; i <= 20; i++) {DataBean data = new DataBean(String.valueOf(i));ProducerRecord record = new ProducerRecord<String, String>("flink_kafka", null, null, JSON.toJSONString(data));producer.send(record);System.out.println("發送數據: " + JSON.toJSONString(data));Thread.sleep(1000);}}catch (Exception e){System.out.println(e);}producer.flush();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class DataBean {private String value;
}
深度總結
Exactly-Once
流數據處理的容錯語義:
At most once --最多一次, 也就是說數據最多被處理一次,有可能會丟失
At least once --至少一次, 也就是說數據至少會被處理一次,有可能會重復
Exactly-Once --精準一次, 也就是說數據只會被處理一次,不會丟也不會重復,注意: ==更準確的理解應該是只會被正確處理一次而不是僅一次==
End-to-End Exactly-Once
Flink不僅僅支持Exactly-Once,而且還支持End-to-End Exactly-Once
End-to-End Exactly-Once : 端到端的Exactly-Once, 也就是說, Flink不光光內部處理數據的時候支持Exactly-Once, 在從Source消費, 到Transformation處理, 再到Sink,整個流數據處理,從一端到另一端 整個流程都支持Exactly-Once !
Flink如何支持End-to-End Exactly-Once的?
-
Source: offset+Checkpoint: 需要數據源支持offset維護,如Kafka(offset) + Flink(使用Checkpoint維護offset) 就是支持
FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase FlinkKafkaConsumerBase<T> implements CheckpointedFunction 源碼中就記錄了主題分區和offset信息 ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates initializeState方法和snapshotState方法 -
Transformation:Checkpoint
Flink官方介紹說的就是支持數據流上的有狀態計算! Flink中的有狀態的Transformation-API都是自動維護狀態到的(到Checkpoint中),如sum/reduce/maxBy..... -
Sink:兩階段事務提交+Checkpoint
Flink+Kafka, Kafka是支持事務的,所以可以使用兩階段事務提交來實現 FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction beginTransaction preCommit commit abort
兩階段事務提交協議
?
整個過程可以總結為下面階段:
1.一旦 Flink 開始做 checkpoint 操作,那么就beginTransaction開啟事務會進入 pre-commit “預提交”階段,同時JobManager的Coordinator 會將 Barrier 注入數據流中 ;
2.當所有的 barrier 在算子中成功進行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預提交”階段完成;
3.等所有的算子完成“預提交”,就會發起一個commit“提交”動作,但是任何一個“預提交”失敗都會導致 Flink 回滾到最近的 checkpoint(abort終止事務);
注意: 兩階段事務提交本身的實現流程較為固定(主要就是4個方法的實現beginTransaction/preCommit/commit/abort), 但是代碼實現細節較為復雜,所以Flink提供了abstract class TwoPhaseCommitSinkFunction抽象類,并提供了Sink到Kafka的具體實現:FlinkKafkaProducer extends TwoPhaseCommitSinkFunction里面已經實現好了相應的方法
注意: 小技巧:
面試時被問到. 需要說出幾個核心名詞: FlinkKafkaProducer 兩階段事務提交/TwoPhaseCommitSinkFunction/beginTransaction/preCommit/commit/abort
總結
以上是生活随笔為你收集整理的2021年大数据Flink(四十四):扩展阅读 End-to-End Exactly-Once的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据常用语言Scala(三十
- 下一篇: 2021年大数据Flink(四十五):