flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理
周凱波(寶牛)
阿里巴巴技術(shù)專家,四川大學(xué)碩士,2010年畢業(yè)后加入阿里搜索事業(yè)部,從事搜索離線平臺(tái)的研發(fā)工作,參與將搜索后臺(tái)數(shù)據(jù)處理架構(gòu)從MapReduce到Flink的重構(gòu)。目前在阿里計(jì)算平臺(tái)事業(yè)部,專注于基于Flink的一站式計(jì)算平臺(tái)的建設(shè)。
文章目錄:?
1.?Apache Flink 應(yīng)用程序中的 Exactly-Once 語(yǔ)義
2.?Flink 應(yīng)用程序端到端的 Exactly-Once 語(yǔ)義
3.?示例 Flink 應(yīng)用程序啟動(dòng)預(yù)提交階段
4.?在 Flink 中實(shí)現(xiàn)兩階段提交 Operator
5.?總結(jié)
Apache Flink 自2017年12月發(fā)布的1.4.0版本開始,為流計(jì)算引入了一個(gè)重要的里程碑特性:TwoPhaseCommitSinkFunction:
https://issues.apache.org/jira/browse/FLINK-7210
它提取了兩階段提交協(xié)議的通用邏輯,使得通過(guò) Flink 來(lái)構(gòu)建端到端的 Exactly-Once 程序成為可能。同時(shí)支持一些數(shù)據(jù)源(source)和輸出端(sink),包括 Apache Kafka? 0.11及更高版本。它提供了一個(gè)抽象層,用戶只需要實(shí)現(xiàn)少數(shù)方法就能實(shí)現(xiàn)端到端的 Exactly-Once 語(yǔ)義。
有關(guān) TwoPhaseCommitSinkFunction 的使用詳見文檔:?TwoPhaseCommitSinkFunction。或者可以直接閱讀 Kafka 0.11 sink 的文檔:?kafka。
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html
接下來(lái)會(huì)詳細(xì)分析這個(gè)新功能以及Flink的實(shí)現(xiàn)邏輯,分為如下幾點(diǎn):
描述 Flink checkpoint?機(jī)制是如何保證 Flink 程序結(jié)果的 Exactly-Once 的;
顯示 Flink 如何通過(guò)兩階段提交協(xié)議與數(shù)據(jù)源和數(shù)據(jù)輸出端交互,以提供端到端的 Exactly-Once 保證;
通過(guò)一個(gè)簡(jiǎn)單的示例,了解如何使用 TwoPhaseCommitSinkFunction 實(shí)現(xiàn) Exactly-Once 的文件輸出。
Flink 應(yīng)用程序中的 Exactly-Once 語(yǔ)義
當(dāng)我們說(shuō)『Exactly-Once』時(shí),指的是每個(gè)輸入的事件只影響最終結(jié)果一次。即使機(jī)器或軟件出現(xiàn)故障,既沒(méi)有重復(fù)數(shù)據(jù),也不會(huì)丟數(shù)據(jù)。
Flink 很久之前就提供了 Exactly-Once 語(yǔ)義。在過(guò)去幾年中,我們對(duì) Flink 的?checkpoint?機(jī)制有過(guò)深入的描述,這是 Flink 有能力提供 Exactly-Once 語(yǔ)義的核心。Flink 文檔還提供了該功能的全面概述:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html
在繼續(xù)之前,先看下對(duì) checkpoint 機(jī)制的簡(jiǎn)要介紹,這對(duì)理解后面的主題至關(guān)重要。
一次 checkpoint 是以下內(nèi)容的一致性快照:
應(yīng)用程序的當(dāng)前狀態(tài)
輸入流的位置
Flink 可以配置一個(gè)固定的時(shí)間點(diǎn),定期產(chǎn)生 checkpoint,將 checkpoint 的數(shù)據(jù)寫入持久存儲(chǔ)系統(tǒng),例如 S3 或 HDFS 。將 checkpoint 數(shù)據(jù)寫入持久存儲(chǔ)是異步發(fā)生的,這意味著 Flink 應(yīng)用程序在 checkpoint 過(guò)程中可以繼續(xù)處理數(shù)據(jù)。
如果發(fā)生機(jī)器或軟件故障,重新啟動(dòng)后,Flink 應(yīng)用程序?qū)淖钚碌?checkpoint 點(diǎn)恢復(fù)處理; Flink 會(huì)恢復(fù)應(yīng)用程序狀態(tài),將輸入流回滾到上次 checkpoint 保存的位置,然后重新開始運(yùn)行。這意味著 Flink 可以像從未發(fā)生過(guò)故障一樣計(jì)算結(jié)果。
在 Flink 1.4.0 之前,Exactly-Once 語(yǔ)義僅限于 Flink 應(yīng)用程序內(nèi)部,并沒(méi)有擴(kuò)展到 Flink 數(shù)據(jù)處理完后發(fā)送的大多數(shù)外部系統(tǒng)。Flink 應(yīng)用程序與各種數(shù)據(jù)輸出端進(jìn)行交互,開發(fā)人員需要有能力自己維護(hù)組件的上下文來(lái)保證 Exactly-Once 語(yǔ)義。
為了提供端到端的 Exactly-Once 語(yǔ)義 - 也就是說(shuō),除了 Flink 應(yīng)用程序內(nèi)部, Flink 寫入的外部系統(tǒng)也需要能滿足 Exactly-Once 語(yǔ)義 - 這些外部系統(tǒng)必須提供提交或回滾的方法,然后通過(guò) Flink 的 checkpoint 機(jī)制來(lái)協(xié)調(diào)。
分布式系統(tǒng)中,協(xié)調(diào)提交和回滾的常用方法是兩階段提交協(xié)議。在下一節(jié)中,我們將討論 Flink 的 TwoPhaseCommitSinkFunction 是如何利用兩階段提交協(xié)議來(lái)提供端到端的 Exactly-Once 語(yǔ)義。
Flink 應(yīng)用程序端到端的 Exactly-Once 語(yǔ)義
我們將介紹兩階段提交協(xié)議,以及它如何在一個(gè)讀寫 Kafka 的 Flink 程序中實(shí)現(xiàn)端到端的 Exactly-Once 語(yǔ)義。Kafka 是一個(gè)流行的消息中間件,經(jīng)常與 Flink 一起使用。Kafka 在最近的 0.11 版本中添加了對(duì)事務(wù)的支持。這意味著現(xiàn)在通過(guò) Flink 讀寫 Kafka,并提供端到端的 Exactly-Once 語(yǔ)義有了必要的支持:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-011
Flink 對(duì)端到端的 Exactly-Once 語(yǔ)義的支持不僅局限于 Kafka ,您可以將它與任何一個(gè)提供了必要的協(xié)調(diào)機(jī)制的源/輸出端一起使用。例如?Pravega,來(lái)自 DELL/EMC 的開源流媒體存儲(chǔ)系統(tǒng),通過(guò) Flink 的 TwoPhaseCommitSinkFunction 也能支持端到端的 Exactly-Once 語(yǔ)義。
exactly-once-two-phase-commit-1
在今天討論的這個(gè)示例程序中,我們有:
從 Kafka 讀取的數(shù)據(jù)源(Flink 內(nèi)置的?KafkaConsumer)
窗口聚合
將數(shù)據(jù)寫回 Kafka 的數(shù)據(jù)輸出端(Flink 內(nèi)置的?KafkaProducer)
要使數(shù)據(jù)輸出端提供 Exactly-Once 保證,它必須將所有數(shù)據(jù)通過(guò)一個(gè)事務(wù)提交給 Kafka。提交捆綁了兩個(gè) checkpoint 之間的所有要寫入的數(shù)據(jù)。這可確保在發(fā)生故障時(shí)能回滾寫入的數(shù)據(jù)。但是在分布式系統(tǒng)中,通常會(huì)有多個(gè)并發(fā)運(yùn)行的寫入任務(wù)的,簡(jiǎn)單的提交或回滾是不夠的,因?yàn)樗薪M件必須在提交或回滾時(shí)“一致”才能確保一致的結(jié)果。Flink 使用兩階段提交協(xié)議及預(yù)提交階段來(lái)解決這個(gè)問(wèn)題。
在 checkpoint 開始的時(shí)候,即兩階段提交協(xié)議的“預(yù)提交”階段。當(dāng) checkpoint 開始時(shí),Flink 的 JobManager 會(huì)將 checkpoint barrier(將數(shù)據(jù)流中的記錄分為進(jìn)入當(dāng)前 checkpoint 與進(jìn)入下一個(gè) checkpoint)注入數(shù)據(jù)流。
brarrier 在 operator 之間傳遞。對(duì)于每一個(gè) operator,它觸發(fā) operator 的狀態(tài)快照寫入到 state backend。
exactly-once-two-phase-commit-2
數(shù)據(jù)源保存消費(fèi) Kafka 的偏移量(offset),之后將 checkpoint barrier 傳遞給下一個(gè) operator。
這種方式僅適用于 operator 具有『內(nèi)部』狀態(tài)。所謂內(nèi)部狀態(tài),是指 Flink statebackend 保存和管理的 -例如,第二個(gè) operator 中 window 聚合算出來(lái)的 sum 值。當(dāng)一個(gè)進(jìn)程有它的內(nèi)部狀態(tài)的時(shí)候,除了在 checkpoint 之前需要將數(shù)據(jù)變更寫入到 state backend ,不需要在預(yù)提交階段執(zhí)行任何其他操作。Flink 負(fù)責(zé)在 checkpoint 成功的情況下正確提交這些寫入,或者在出現(xiàn)故障時(shí)中止這些寫入。
exactly-once-two-phase-commit-3
示例 Flink 應(yīng)用程序啟動(dòng)預(yù)提交階段
但是,當(dāng)進(jìn)程具有『外部』狀態(tài)時(shí),需要作些額外的處理。外部狀態(tài)通常以寫入外部系統(tǒng)(如 Kafka)的形式出現(xiàn)。在這種情況下,為了提供 Exactly-Once 保證,外部系統(tǒng)必須支持事務(wù),這樣才能和兩階段提交協(xié)議集成。
在本文示例中的數(shù)據(jù)需要寫入 Kafka,因此數(shù)據(jù)輸出端(Data Sink)有外部狀態(tài)。在這種情況下,在預(yù)提交階段,除了將其狀態(tài)寫入 state backend 之外,數(shù)據(jù)輸出端還必須預(yù)先提交其外部事務(wù)。
exactly-once-two-phase-commit-4
當(dāng) checkpoint barrier 在所有 operator 都傳遞了一遍,并且觸發(fā)的 checkpoint 回調(diào)成功完成時(shí),預(yù)提交階段就結(jié)束了。所有觸發(fā)的狀態(tài)快照都被視為該 checkpoint 的一部分。checkpoint 是整個(gè)應(yīng)用程序狀態(tài)的快照,包括預(yù)先提交的外部狀態(tài)。如果發(fā)生故障,我們可以回滾到上次成功完成快照的時(shí)間點(diǎn)。
下一步是通知所有 operator,checkpoint 已經(jīng)成功了。這是兩階段提交協(xié)議的提交階段,JobManager 為應(yīng)用程序中的每個(gè) operator 發(fā)出 checkpoint 已完成的回調(diào)。
數(shù)據(jù)源和 window operator 沒(méi)有外部狀態(tài),因此在提交階段,這些 operator 不必執(zhí)行任何操作。但是,數(shù)據(jù)輸出端(Data Sink)擁有外部狀態(tài),此時(shí)應(yīng)該提交外部事務(wù)。
exactly-once-two-phase-commit-5
我們對(duì)上述知識(shí)點(diǎn)總結(jié)下:
一旦所有 operator 完成預(yù)提交,就提交一個(gè) commit。
如果至少有一個(gè)預(yù)提交失敗,則所有其他提交都將中止,我們將回滾到上一個(gè)成功完成的 checkpoint 。
在預(yù)提交成功之后,提交的 commit 需要保證最終成功 - operator 和外部系統(tǒng)都需要保障這點(diǎn)。如果 commit 失敗(例如,由于間歇性網(wǎng)絡(luò)問(wèn)題),整個(gè) Flink 應(yīng)用程序?qū)⑹?#xff0c;應(yīng)用程序?qū)⒏鶕?jù)用戶的重啟策略重新啟動(dòng),還會(huì)嘗試再提交。這個(gè)過(guò)程至關(guān)重要,因?yàn)槿绻?commit 最終沒(méi)有成功,將會(huì)導(dǎo)致數(shù)據(jù)丟失。
因此,我們可以確定所有 operator 都同意 checkpoint 的最終結(jié)果:所有 operator 都同意數(shù)據(jù)已提交,或提交被中止并回滾。
在 Flink 中實(shí)現(xiàn)兩階段提交 Operator
完整的實(shí)現(xiàn)兩階段提交協(xié)議可能有點(diǎn)復(fù)雜,這就是為什么 Flink 將它的通用邏輯提取到抽象類 TwoPhaseCommitSinkFunction 中的原因。
接下來(lái)基于輸出到文件的簡(jiǎn)單示例,說(shuō)明如何使用 TwoPhaseCommitSinkFunction 。用戶只需要實(shí)現(xiàn)四個(gè)函數(shù),就能為數(shù)據(jù)輸出端實(shí)現(xiàn) Exactly-Once 語(yǔ)義:
beginTransaction?- 在事務(wù)開始前,我們?cè)谀繕?biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件。隨后,我們可以在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件。
preCommit?- 在預(yù)提交階段,我們刷新文件到存儲(chǔ),關(guān)閉文件,不再重新寫入。我們還將為屬于下一個(gè) checkpoint 的任何后續(xù)文件寫入啟動(dòng)一個(gè)新的事務(wù)。
commit?- 在提交階段,我們將預(yù)提交階段的文件原子地移動(dòng)到真正的目標(biāo)目錄。需要注意的是,這會(huì)增加輸出數(shù)據(jù)可見性的延遲。
abort?- 在中止階段,我們刪除臨時(shí)文件。
我們知道,如果發(fā)生任何故障,Flink 會(huì)將應(yīng)用程序的狀態(tài)恢復(fù)到最新的一次 checkpoint 點(diǎn)。一種極端的情況是,預(yù)提交成功了,但在這次 commit 的通知到達(dá) operator 之前發(fā)生了故障。在這種情況下,Flink 會(huì)將 operator 的狀態(tài)恢復(fù)到已經(jīng)預(yù)提交,但尚未真正提交的狀態(tài)。
我們需要在預(yù)提交階段保存足夠多的信息到 checkpoint 狀態(tài)中,以便在重啟后能正確的中止或提交事務(wù)。在這個(gè)例子中,這些信息是臨時(shí)文件和目標(biāo)目錄的路徑。
TwoPhaseCommitSinkFunction 已經(jīng)把這種情況考慮在內(nèi)了,并且在從 checkpoint 點(diǎn)恢復(fù)狀態(tài)時(shí),會(huì)優(yōu)先發(fā)出一個(gè) commit 。我們需要以冪等方式實(shí)現(xiàn)提交,一般來(lái)說(shuō),這并不難。在這個(gè)示例中,我們可以識(shí)別出這樣的情況:臨時(shí)文件不在臨時(shí)目錄中,但已經(jīng)移動(dòng)到目標(biāo)目錄了。
在 TwoPhaseCommitSinkFunction 中,還有一些其他邊界情況也會(huì)考慮在內(nèi),請(qǐng)參考?Flink 文檔了解更多信息:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
總結(jié)
總結(jié)下本文涉及的一些要點(diǎn):
Flink 的 checkpoint 機(jī)制是支持兩階段提交協(xié)議并提供端到端的 Exactly-Once 語(yǔ)義的基礎(chǔ)。
這個(gè)方案的優(yōu)點(diǎn)是: Flink 不像其他一些系統(tǒng)那樣,通過(guò)網(wǎng)絡(luò)傳輸存儲(chǔ)數(shù)據(jù) - 不需要像大多數(shù)批處理程序那樣將計(jì)算的每個(gè)階段寫入磁盤。
Flink 的 TwoPhaseCommitSinkFunction 提取了兩階段提交協(xié)議的通用邏輯,基于此將 Flink 和支持事務(wù)的外部系統(tǒng)結(jié)合,構(gòu)建端到端的 Exactly-Once 成為可能。
從?Flink 1.4.0?開始,Pravega 和 Kafka 0.11 producer 都提供了 Exactly-Once 語(yǔ)義;Kafka 在0.11版本首次引入了事務(wù),為在 Flink 程序中使用 Kafka producer 提供 Exactly-Once 語(yǔ)義提供了可能性。
Kafaka 0.11 producer?的事務(wù)是在 TwoPhaseCommitSinkFunction 基礎(chǔ)上實(shí)現(xiàn)的,和 at-least-once producer 相比只增加了非常低的開銷。
這是個(gè)令人興奮的功能,期待 Flink TwoPhaseCommitSinkFunction 在未來(lái)支持更多的數(shù)據(jù)接收端。
via:
https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka
作者:Piotr Nowojski
翻譯:周凱波
大家工作學(xué)習(xí)遇到HBase技術(shù)問(wèn)題,把問(wèn)題發(fā)布到HBase技術(shù)社區(qū)論壇http://hbase.group,歡迎大家論壇上面提問(wèn)留言討論。想了解更多HBase技術(shù)關(guān)注HBase技術(shù)社區(qū)公眾號(hào)(微信號(hào):hbasegroup),非常歡迎大家積極投稿。
本群為HBase+Spark技術(shù)交流討論,整合最優(yōu)質(zhì)的專家資源和技術(shù)資料會(huì)定期開展線下技術(shù)沙龍,專家技術(shù)直播,專家答疑活動(dòng)
點(diǎn)擊鏈接釘釘入群:https://dwz.cn/Fvqv066s或掃碼進(jìn)群
本群為Cassandra技術(shù)交流討論,整合最優(yōu)質(zhì)的專家資源和技術(shù)資料會(huì)定期開展線下技術(shù)沙龍,專家技術(shù)直播,專家答疑活動(dòng)
Cassandra 社區(qū)釘釘大群:https://c.tb.cn/F3.ZRTY0o
Cassandra 技術(shù)社區(qū)微信公眾號(hào):
總結(jié)
以上是生活随笔為你收集整理的flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: android 闪退解决方案,Andro
- 下一篇: linux 微信 开源,Makefile