基于Flink的高可靠实时ETL系统
GIAC(GLOBAL INTERNET ARCHITECTURE CONFERENCE)是長(zhǎng)期關(guān)注互聯(lián)網(wǎng)技術(shù)與架構(gòu)的高可用架構(gòu)技術(shù)社區(qū)和msup推出的,面向架構(gòu)師、技術(shù)負(fù)責(zé)人及高端技術(shù)從業(yè)人員的年度技術(shù)架構(gòu)大會(huì),是中國(guó)地區(qū)規(guī)模最大的技術(shù)會(huì)議之一。
今年的第六屆GIAC大會(huì)上,在大數(shù)據(jù)架構(gòu)專(zhuān)題,騰訊數(shù)據(jù)平臺(tái)部實(shí)時(shí)計(jì)算負(fù)責(zé)人施曉罡發(fā)表了《基于Flink的高可靠實(shí)時(shí)ETL系統(tǒng)》的主題演講。以下為嘉賓演講實(shí)錄:
施曉罡畢業(yè)于北京大學(xué),獲得博士學(xué)位,是Apache Flink項(xiàng)目Committer。在SIGMOD, TODS和IPDPS等國(guó)際頂級(jí)會(huì)議和期刊上發(fā)表過(guò)多篇論文,并擔(dān)任KDD,DASFAA等國(guó)際頂級(jí)會(huì)議的程序委員會(huì)委員。
實(shí)時(shí)計(jì)算平臺(tái)Oceanus
近年來(lái),實(shí)時(shí)計(jì)算在騰訊得到了越來(lái)越廣泛的應(yīng)用。為了提高用戶(hù)流計(jì)算任務(wù)持續(xù)集成和持續(xù)發(fā)布的效率,騰訊大數(shù)據(jù)團(tuán)隊(duì)從2017年開(kāi)始圍繞Flink打造了Oceanus,一個(gè)集開(kāi)發(fā)、測(cè)試、部署和運(yùn)維于一體的一站式可視化實(shí)時(shí)計(jì)算平臺(tái)。
Oceanus提供了三種不同的應(yīng)用開(kāi)發(fā)方式,包括畫(huà)布,SQL和Jar,來(lái)滿足不同用戶(hù)的開(kāi)發(fā)需求。通過(guò)這三種方式,不同應(yīng)用場(chǎng)景的用戶(hù)不需要了解底層框架的技術(shù)細(xì)節(jié),可以很快的進(jìn)行實(shí)時(shí)計(jì)算任務(wù)的開(kāi)發(fā),降低了用戶(hù)開(kāi)發(fā)的門(mén)檻。
在完成作業(yè)開(kāi)發(fā)之后,用戶(hù)可以通過(guò)Oceanus對(duì)作業(yè)進(jìn)行測(cè)試、配置和部署。Oceanus為用戶(hù)程序提供了一系列的工具來(lái)協(xié)助作業(yè)測(cè)試。用戶(hù)既可以使用Oceanus提供的一鍵生成功能產(chǎn)生測(cè)試數(shù)據(jù),也可以自己向Oceanus上傳自己的的測(cè)試數(shù)據(jù),通過(guò)對(duì)比預(yù)期結(jié)果和實(shí)際結(jié)果來(lái)驗(yàn)證應(yīng)用邏輯的正確性。Oceanus依托騰訊內(nèi)部的資源調(diào)度系統(tǒng)Gaia來(lái)進(jìn)行資源管理和作業(yè)部署。用戶(hù)可以通過(guò)Oceanus配置作業(yè)所需要的CPU和內(nèi)存資源,并指定作業(yè)需要部署的集群。當(dāng)用戶(hù)完成配置之后,Oceanus會(huì)向Gaia申請(qǐng)對(duì)應(yīng)的資源并將作業(yè)提交到Gaia上運(yùn)行。
Oceanus對(duì)Flink作業(yè)運(yùn)行時(shí)的多個(gè)運(yùn)行指標(biāo)進(jìn)行采集,包括Task Manger的內(nèi)存,I/O和GC等。通過(guò)這些豐富的運(yùn)行指標(biāo),用戶(hù)能夠很好的了解應(yīng)用運(yùn)行的情況,并在出現(xiàn)異常時(shí)能協(xié)助用戶(hù)及時(shí)的定位問(wèn)題。運(yùn)維人員則可以通過(guò)這些采集到的指標(biāo),設(shè)置報(bào)警策略并實(shí)現(xiàn)精細(xì)化的運(yùn)營(yíng)。
而在Oceanus之上,騰訊大數(shù)據(jù)還對(duì)ETL,監(jiān)控告警和在線學(xué)習(xí)等常見(jiàn)的實(shí)時(shí)計(jì)算任務(wù)提供了場(chǎng)景化的支持。例如Oceanus-ML提供端到端的在線機(jī)器學(xué)習(xí),涵蓋數(shù)據(jù)接入,數(shù)據(jù)處理,特征工程,算法訓(xùn)練,模型評(píng)估,模型部署整個(gè)機(jī)器學(xué)習(xí)流程。通過(guò)Oceanus-ML,用戶(hù)可以方便地利用完備的數(shù)據(jù)處理函數(shù),豐富的在線學(xué)習(xí)算法來(lái)構(gòu)建自己的在線學(xué)習(xí)任務(wù),輕松地完成模型訓(xùn)練和評(píng)估,進(jìn)行一鍵部署模型。
而對(duì)ETL場(chǎng)景,Oceanus也提供了Oceanus-ETL產(chǎn)品來(lái)幫助用戶(hù)將應(yīng)用和產(chǎn)品中采集的數(shù)據(jù)實(shí)時(shí)地導(dǎo)入到數(shù)據(jù)倉(cāng)庫(kù)中。目前騰訊大數(shù)據(jù)團(tuán)隊(duì)為騰訊內(nèi)部包括微信、QQ音樂(lè)、騰訊游戲在內(nèi)的多個(gè)業(yè)務(wù)提供了數(shù)據(jù)接入服務(wù),每天處理的消息數(shù)超過(guò)了40萬(wàn)億條,每秒接入的峰值超過(guò)了4億條。
實(shí)時(shí)數(shù)據(jù)接入平臺(tái)Oceanus-ETL
騰訊大數(shù)據(jù)早在2012年起就開(kāi)始了進(jìn)行數(shù)據(jù)接入的工作,并基于Storm構(gòu)建了第一代的騰訊數(shù)據(jù)銀行(TDBank),成為了騰訊大數(shù)據(jù)平臺(tái)的第一線,提供了文件、消息和數(shù)據(jù)庫(kù)等多種接入方式,統(tǒng)一了數(shù)據(jù)接入入口,提供了高效實(shí)時(shí)的分布式數(shù)據(jù)分發(fā)。
而在2017年,騰訊大數(shù)據(jù)基于Flink在易用性、可靠性和性能上的優(yōu)勢(shì),通過(guò)Flink對(duì)TDBank的數(shù)據(jù)接入進(jìn)行了重構(gòu)。相比于Storm,Flink對(duì)state提供了更多的支持。一方面Flink將程序的狀態(tài)保存在本地的內(nèi)存或者RocksDB中,用戶(hù)不需要通過(guò)網(wǎng)絡(luò)遠(yuǎn)程訪問(wèn)狀態(tài)數(shù)據(jù),因此可以獲得較好的作業(yè)性能。而另一方面,Flink通過(guò)Chandy-Lamport算法提供了高效和輕量的檢查點(diǎn)機(jī)制,可以保證在發(fā)生故障時(shí)仍能實(shí)現(xiàn)Exactly Once和At-Least Once的數(shù)據(jù)處理語(yǔ)義。
而隨著騰訊業(yè)務(wù)規(guī)模的不斷增加,對(duì)數(shù)據(jù)接入也提出了更高的要求,需要能夠
保證端到端的“有且僅有一次”和“強(qiáng)一致”的語(yǔ)義
保證ACID事務(wù)和讀寫(xiě)分離,避免下游出現(xiàn)臟讀等錯(cuò)誤
支持對(duì)數(shù)據(jù)進(jìn)行修正和格式變更
為了能夠滿足上述要求,我們今年引入了Iceberg,通過(guò)Iceberg提供的ACID事務(wù)機(jī)制和增量更新能力提供更可靠和更強(qiáng)大的數(shù)據(jù)接入服務(wù)。
基于Flink實(shí)現(xiàn)端到端Exactly Once傳輸
Flink通過(guò)檢查點(diǎn)(Checkpoint)機(jī)制來(lái)進(jìn)行任務(wù)狀態(tài)的備份和恢復(fù)。在任務(wù)發(fā)生故障時(shí),任務(wù)可以從上次備份的狀態(tài)恢復(fù),而不必從頭開(kāi)始重新執(zhí)行。通過(guò)檢查點(diǎn)機(jī)制,Flink可以保證在發(fā)生故障時(shí),仍然可以實(shí)現(xiàn)Exactly Once的數(shù)據(jù)傳輸。
但在整個(gè)數(shù)據(jù)接入的鏈路中,除了Flink之外還包括了上游的中間件和下游的數(shù)據(jù)倉(cāng)庫(kù)等多個(gè)組件。僅僅依靠Flink的檢查點(diǎn)機(jī)制只能夠保證在Flink作業(yè)內(nèi)部的Exactly Once的數(shù)據(jù)傳輸,而并不能保證在整個(gè)數(shù)據(jù)接入鏈路中端到端的Exactly Once的傳輸語(yǔ)義。如果我們將Flink收到的數(shù)據(jù)直接寫(xiě)到下游的存儲(chǔ)系統(tǒng),那么當(dāng)Flink發(fā)生故障并從故障中恢復(fù)時(shí),從上次檢查點(diǎn)之后被寫(xiě)到下游存儲(chǔ)系統(tǒng)中的數(shù)據(jù)將被重復(fù),導(dǎo)致后續(xù)數(shù)據(jù)分析發(fā)生誤差。
而為了保證端到端的Exactly Once數(shù)據(jù)傳輸,TDBank利用了Flink的檢查點(diǎn)機(jī)制實(shí)現(xiàn)了一個(gè)兩階段提交的協(xié)議,并會(huì)對(duì)數(shù)據(jù)接入各個(gè)環(huán)節(jié)產(chǎn)生的指標(biāo)進(jìn)行聚合和對(duì)賬,確保端到端數(shù)據(jù)傳輸?shù)目煽啃浴?/p>
為了保證數(shù)據(jù)鏈路的Exactly Once,我們將Flink收到的數(shù)據(jù)會(huì)先寫(xiě)入到一個(gè)臨時(shí)目錄中,并將寫(xiě)出的文件列表保存起來(lái)。執(zhí)行checkpoint的時(shí)候,我們會(huì)將這些文件列表保存到checkpoint中并記錄下來(lái)。而當(dāng)checkpoint完成時(shí),Flink會(huì)通知所有的節(jié)點(diǎn)。此時(shí)這些節(jié)點(diǎn)就會(huì)將checkpoint中保存的文件移動(dòng)到正式目錄中。
在這種實(shí)現(xiàn)方式中,Flink利用已有的checkpoint機(jī)制實(shí)現(xiàn)了一個(gè)兩階段提交的機(jī)制。所有節(jié)點(diǎn)在執(zhí)行checkpoint時(shí)執(zhí)行了預(yù)提交的操作,將所有數(shù)據(jù)都先寫(xiě)入到一個(gè)可靠的分布式存儲(chǔ)中。當(dāng)checkpoint在JobManager上完成時(shí),即認(rèn)為這個(gè)事務(wù)被提交了。所有節(jié)點(diǎn)在收到checkpoint成功的消息后會(huì)完成最后的事務(wù)提交操作。
如果有節(jié)點(diǎn)在執(zhí)行最后文件移動(dòng)的時(shí)候出現(xiàn)故障,那么Flink作業(yè)將從上次完成的checkpoint中恢復(fù),并從上次完成的checkpoint中獲得完整的文件列表。Flink作業(yè)會(huì)檢查這個(gè)文件列表中的文件,并將所有還未移動(dòng)的文件移動(dòng)到最終的目錄中。
而為了確保數(shù)據(jù)在整個(gè)接入過(guò)程在不會(huì)發(fā)生數(shù)據(jù)丟失和重復(fù),我們會(huì)對(duì)整個(gè)數(shù)據(jù)鏈路中的每個(gè)組件發(fā)送和接收到的數(shù)據(jù)數(shù)目進(jìn)行了采集和對(duì)賬。由于一般的指標(biāo)系統(tǒng)并不能保證指標(biāo)的時(shí)效性和正確性,因此我們也基于Flink實(shí)現(xiàn)了高可靠和強(qiáng)一致性的指標(biāo)聚合。
類(lèi)似于數(shù)據(jù)鏈路,我們也采用Flink的checkpoint機(jī)制來(lái)保證指標(biāo)數(shù)據(jù)的一致性。我們通過(guò)Flink將采集到的指標(biāo)按照分鐘粒度進(jìn)行聚合,并在執(zhí)行checkpoint時(shí)將這些聚合指標(biāo)保存到外部存儲(chǔ)中。在保存聚合指標(biāo)時(shí),除了一般的標(biāo)簽之外,我們還會(huì)帶上寫(xiě)出這些指標(biāo)時(shí)的checkpoint編號(hào)。而當(dāng)checkpoint完成時(shí),每個(gè)節(jié)點(diǎn)還會(huì)將完成的checkpoint編號(hào)也記錄到外部存儲(chǔ)中。當(dāng)我們需要查詢(xún)指標(biāo)時(shí),我們只需要將已完成的checkpoint編號(hào)和聚合指標(biāo)進(jìn)行連接就可以獲得一致性的指標(biāo)結(jié)果。
通過(guò)Flink的checkpoint機(jī)制,我們可以保證數(shù)據(jù)鏈路和指標(biāo)鏈路中數(shù)據(jù)傳輸和指標(biāo)聚合的一致性,確保在整個(gè)數(shù)據(jù)接入鏈路實(shí)現(xiàn)端到端的Exactly Once數(shù)據(jù)傳輸。
基于Iceberg實(shí)現(xiàn)ACID的實(shí)時(shí)數(shù)據(jù)接入
Apache Iceberg是一個(gè)通用的表格式(數(shù)據(jù)組織格式),它可以適配Presto,Spark等引擎提供高性能的讀寫(xiě)和元數(shù)據(jù)管理功能。Iceberg的定位是在計(jì)算引擎之下存儲(chǔ)之上。它是一種數(shù)據(jù)存儲(chǔ)格式,Iceberg稱(chēng)其為"table format"。準(zhǔn)確的說(shuō),它是介于計(jì)算引擎和數(shù)據(jù)存儲(chǔ)格式之間的數(shù)據(jù)組織格式?- 通過(guò)特定的方式將數(shù)據(jù)和元數(shù)據(jù)組織起來(lái),因此稱(chēng)之為數(shù)據(jù)組織格式更為合理。
Iceberg通過(guò)鎖機(jī)制實(shí)現(xiàn)了ACID的能力。在每次元數(shù)據(jù)更新時(shí)它會(huì)從metastore中獲取鎖并進(jìn)行更新。同時(shí)Iceberg保證了線性一致性(Serializable isolation),確保表的修改操作是原子性的,讀操作永遠(yuǎn)不會(huì)讀到部分或是沒(méi)有commit的數(shù)據(jù)。Iceberg提供了樂(lè)觀鎖的機(jī)制降低鎖的影響,并且使用沖突回退和重試機(jī)制來(lái)解決并發(fā)寫(xiě)所造成的沖突問(wèn)題。
基于ACID的能力,Iceberg提供了類(lèi)似于MVCC的讀寫(xiě)分離能力。首先,每次寫(xiě)操作都會(huì)產(chǎn)生一個(gè)新的快照(snapshot),快照始終是往后線性遞增,確保了線性一致性。而讀操作只會(huì)讀取已經(jīng)存在了的快照,對(duì)于正在生成的快照讀操作是不可見(jiàn)的。每一個(gè)快照擁有表在那一時(shí)刻所有的數(shù)據(jù)和元數(shù)據(jù),因此提供了用戶(hù)回溯(time travel)表數(shù)據(jù)的能力。利用Iceberg的time travel能力,用戶(hù)可以讀取那一時(shí)刻的數(shù)據(jù),同時(shí)也提供了用戶(hù)快照回滾和數(shù)據(jù)重放的能力。
相比于Hudi,Delta Lake,Iceberg提供了更為完整的表格式的能力、類(lèi)型的定義和操作的抽象,并與上層數(shù)據(jù)處理引擎和底層數(shù)據(jù)存儲(chǔ)格式的解耦。此外,Iceberg在設(shè)計(jì)之初并沒(méi)有綁定某種特定的存儲(chǔ)引擎,同時(shí)避免了與上層引擎之間的相互調(diào)用,使得Iceberg可以非常容易地?cái)U(kuò)展到對(duì)于不同引擎的支持。
而在數(shù)據(jù)接入中,通過(guò)Iceberg可以保證ACID事務(wù)和強(qiáng)一致性,實(shí)現(xiàn)“有且僅有一次”的寫(xiě)入;讀寫(xiě)分離使交互式查詢(xún)引擎(如Hive和Presto等)可以第一時(shí)間讀到正確的數(shù)據(jù);Row-level update和delete支持通過(guò)計(jì)算引擎進(jìn)行數(shù)據(jù)修正;增量消費(fèi)使得已落地的數(shù)據(jù)可以進(jìn)一步的返回流式處理引擎,并只處理和向后傳遞變化的部分;Iceberg高效的查詢(xún)能力也能省去導(dǎo)入MySQL或ClickHouse等環(huán)節(jié),直接被報(bào)表和BI系統(tǒng)消費(fèi)。
為了能夠使用Iceberg,騰訊大數(shù)據(jù)實(shí)現(xiàn)了支持Iceberg的Flink連接器,允許Flink將數(shù)據(jù)寫(xiě)入到Iceberg中。Flink的Iceberg Sink由兩部分組成,一個(gè)稱(chēng)為Writer,而另一個(gè)是Committer。Writer負(fù)責(zé)將收到的數(shù)據(jù)寫(xiě)到外部的存儲(chǔ)中,形成一系列的DataFile。目前為了簡(jiǎn)化適配并最大限度利用已有邏輯,騰訊內(nèi)部使用Avro作為數(shù)據(jù)的中間格式。后續(xù)社區(qū)將引入一個(gè)Flink內(nèi)建類(lèi)型的轉(zhuǎn)換器,使用Iceberg內(nèi)建的數(shù)據(jù)類(lèi)型作為輸入。當(dāng)Writer執(zhí)行checkpoint時(shí),Writer會(huì)關(guān)閉自己的文件,將構(gòu)建的DataFile發(fā)送給下游的Committer。
Committer在一個(gè)Flink作業(yè)中是全局唯一的。在收到上游所有Writer發(fā)送的DataFile后,Committer會(huì)將這些DataFile寫(xiě)到一個(gè)ManifestFile中,并將ManifestFile保存到checkpoint中。當(dāng)checkpoint完成之后,Committer會(huì)將ManifestFile通過(guò)merge append提交給Iceberg。Iceberg內(nèi)部會(huì)通過(guò)一系列操作完成commit操作,最終讓新加入的數(shù)據(jù)對(duì)下游的數(shù)據(jù)倉(cāng)庫(kù)可見(jiàn)。
騰訊對(duì)Iceberg進(jìn)行了大量的改進(jìn)和優(yōu)化。除了支持了Flink的讀寫(xiě)之外,騰訊還完成了行級(jí)的刪除和更新操作,極大的節(jié)約了數(shù)據(jù)批改和刪除所帶來(lái)的開(kāi)銷(xiāo)。同時(shí),騰訊還對(duì)Spark 3.0中的Data Source V2進(jìn)行了適配,利用Spark 3.0中的SQL和DataFrame可以無(wú)縫的對(duì)接Iceberg。
而在后面的工作中,騰訊會(huì)繼續(xù)增強(qiáng)Iceberg的核心能力,主要包括:
為Flink sink增加update和delete的語(yǔ)義,使延遲到達(dá)的數(shù)據(jù)可以得到正確的處理,以支持CDC的場(chǎng)景;
增加對(duì)Hive的支持;
增加以Merge-On-Read方式進(jìn)行row-level update和delete操作等。
后臺(tái)回復(fù)關(guān)鍵詞【GIAC】可獲取嘉賓分享PPT。
總結(jié)
以上是生活随笔為你收集整理的基于Flink的高可靠实时ETL系统的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微信「看一看」 朋友在看的增强推荐系统
- 下一篇: 基于Kubeflow建立的星辰算力训练平