百信银行基于 Apache Hudi 实时数据湖演进方案
本文介紹了百信銀行實時計算平臺的建設(shè)情況,實時數(shù)據(jù)湖構(gòu)建在 Hudi 上的方案和實踐方法,以及實時計算平臺集成 Hudi 和使用 Hudi 的方式。內(nèi)容包括:
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、背景
百信銀行,全稱為 “中信百信銀行股份有限公司”,是首家獲批獨立法人形式的直銷銀行。作為首家國有控股的互聯(lián)網(wǎng)銀行,相比于傳統(tǒng)金融行業(yè),百信銀行對數(shù)據(jù)敏捷性有更高的要求。
數(shù)據(jù)敏捷,不僅要求數(shù)據(jù)的準確性,還要求數(shù)據(jù)到達的實時性,和數(shù)據(jù)傳輸?shù)陌踩浴榱藵M足我行數(shù)據(jù)敏捷性的需求,百信銀行大數(shù)據(jù)部承擔(dān)起了建設(shè)實時計算平臺的職責(zé),保證了數(shù)據(jù)快速,安全且標準得在線送達。
受益于大數(shù)據(jù)技術(shù)的發(fā)展和更新迭代,目前廣為人知的批流一體的兩大支柱分別是:“統(tǒng)一計算引擎” 與 “統(tǒng)一存儲引擎”。
- Flink,作為大數(shù)據(jù)實時計算領(lǐng)域的佼佼者,1.12 版本的發(fā)布讓它進一步提升了統(tǒng)一計算引擎的能力;
- 同時隨著數(shù)據(jù)湖技術(shù) Hudi 的發(fā)展,統(tǒng)一存儲引擎也迎來了新一代技術(shù)變革。
在 Flink 和 Hudi 社區(qū)發(fā)展的基礎(chǔ)上,百信銀行構(gòu)建了實時計算平臺,同時將實時數(shù)據(jù)湖 Hudi 集成到實時計算平臺之上。結(jié)合行內(nèi)數(shù)據(jù)治理的思路,實現(xiàn)了數(shù)據(jù)實時在線、安全可靠、標準統(tǒng)一,且有敏捷數(shù)據(jù)湖的目標。
二、百信銀行基于 Flink 的實時計算平臺設(shè)計與實踐
1. 實時計算平臺的定位
實時計算平臺作為行級實時計算平臺,由大數(shù)據(jù) IaaS 團隊自主研發(fā),是一款實現(xiàn)了實時數(shù)據(jù) ”端到端“ 的在線數(shù)據(jù)加工處理的企業(yè)級產(chǎn)品。
- 其核心功能具備了實時采集、實時計算、實時入庫、復(fù)雜時間處理、規(guī)則引擎、可視化管理、一鍵配置、自主上線,和實時監(jiān)控預(yù)警等。
- 目前其支持的場景有實時數(shù)倉、斷點召回、智能風(fēng)控、統(tǒng)一資產(chǎn)視圖、反欺詐,和實時特征變量加工等。
- 并且,它服務(wù)著行內(nèi)小微、信貸、反欺詐、消金、財務(wù),和風(fēng)險等眾多業(yè)務(wù)線。
截止目前,在線穩(wěn)定運行的有 320+ 的實時任務(wù),且在線運行的任務(wù) QPS 日均達到 170W 左右。
2. 實時計算平臺的架構(gòu)
按照功能來劃分的話,實時計算平臺的架構(gòu)主要分為三層:
■ 1)數(shù)據(jù)采集層
采集層目前主要分為兩個場景:
- 第一個場景是采集 MySQL 備庫的 Binlog 日志到 Kafka 中。我行所使用的數(shù)據(jù)采集方案并沒有采用業(yè)界普遍用的如 Canal,Debezium 等現(xiàn)有的 CDC 方案。
1、因為我們的 MySQL 版本為百信銀行內(nèi)部的版本,Binlog 協(xié)議有所不同,所以現(xiàn)有的技術(shù)方案不能很好的支持兼容我們獲取 Binlog 日志。
2、同時,為了解決我們數(shù)據(jù)源 MySQL 的備庫隨時可能因為多機房切換,而造成采集數(shù)據(jù)丟失的情況。我們自研了讀取 MySQL Binlog 的 Databus 項目,我們也將 Databus 邏輯轉(zhuǎn)化成了 Flink 應(yīng)用程序,并將其部署到了 Yarn 資源框架中,使 Databus 數(shù)據(jù)抽取可以做到高可用,且資源可控。
- 第二個場景是,我們對接了第三方的應(yīng)用,這個第三方應(yīng)用會將數(shù)據(jù)寫入 Kafka,而寫入 Kafka 有兩種方式:
1、一種方式是依據(jù)我們定義的 Json shcema 協(xié)議。
(UMF協(xié)議:{col_name:””,umf_id":"","umf_ts":,"umf_op_":"i/u/d"})
協(xié)議定義了 ”唯一 id”,”時間戳“ 和 ”操作類型“。根據(jù)此協(xié)議,用戶可以指定對該消息的操作類型,分別是 "insert","update" 和 "delete",以便下游對消息進行針對性處理。
■ 2)數(shù)據(jù)計算轉(zhuǎn)換層
消費 Kafka 數(shù)據(jù)進行一層轉(zhuǎn)換邏輯,支持用戶自定義函數(shù),將數(shù)據(jù)標準化,做敏感數(shù)據(jù)的脫敏加密等。
■ 3)數(shù)據(jù)存儲層
數(shù)據(jù)存儲到 HDFS,Kudu,TiDB,Kafka,Hudi,MySQL 等儲存介質(zhì)中。
在上圖所示的架構(gòu)圖中,我們可以看到整體實時計算平臺支持的主要功能有:
- 開發(fā)層面:
1、支持標準化的 DataBus 采集功能,該功能對于支持 MySQL Binglog 同步到 Kafka 做了同步適配,不需要用戶干預(yù)配置過多。用戶只需要指定數(shù)據(jù)源 MySQL 的實例就可以完成到 Kafka 的標準化同步。
2、支持用戶可視化編輯 FlinkSQL。
3、支持用戶自定義 Flink UDF 函數(shù)。
4、支持復(fù)雜事件處理(CEP)。
5、支持用戶上傳打包編譯好 Flink 應(yīng)用程序。 - 運維層面:
1、支持不同類型任務(wù)的狀態(tài)管理,支持savepoint。
2、支持端到端的延遲監(jiān)控,告警。
在實時計算平臺升級迭代的過程中,社區(qū) Flink 版本之間存在一些向下不兼容的情況。為了平滑的升級 Flink 版本,我們對計算引擎的多版本模塊進行統(tǒng)一的抽象,將多版本之間做了嚴格的 JVM 級別隔離,使版本之間不會產(chǎn)生 Jar 包沖突,Flink Api 不兼容的情況。
如上圖所示,我們將不同的 Flink 版本封裝到一個獨立的虛擬機中,使用 Thrift Server 啟動一個獨立的 JVM 虛擬機,每個版本的 Flink 都會有一個獨立的 Thrift Server。在使用的過程中,只要用戶顯示指定的 Flink 版本,Flink 應(yīng)用程序就會被指定的 Thrift Server 啟動。同時,我們也將實時計算的后端服務(wù)嵌入一個常用的 Flink 版本,避免因為啟動 Thrift Server 而占用過多的啟動時間。
同時為了滿足金融系統(tǒng)高可用和多備的需求,實時計算平臺也開發(fā)了多 Hadoop 集群的支持,支持實時計算任務(wù)在失敗后可以遷移到備集群上去。整體的方案是,支持多集群 checkpoint,savepoint,支持任務(wù)失敗后,可以在備機房重啟實時任務(wù)。
三、百信銀行實時計算平臺與實時數(shù)據(jù)湖集成實踐
在介紹本內(nèi)容之前,我們先來了解一些我行目前在數(shù)據(jù)湖的現(xiàn)狀。目前的實時數(shù)據(jù)湖,我行依然采用主流的 Lambda 架構(gòu)來構(gòu)建數(shù)據(jù)倉庫。
1. Lambda
Lambda 架構(gòu)下,數(shù)倉的缺點:
- 同樣的需求,開發(fā)和維護兩套代碼邏輯:批和流兩套邏輯代碼都需要開發(fā)和維護,并且需要維護合并的邏輯,且需同時上線;
- 計算和存儲資源占用多:同樣的計算邏輯計算兩次,整體資源占用會增多;
- 數(shù)據(jù)具有二義性:兩套計算邏輯,實時數(shù)據(jù)和批量數(shù)據(jù)經(jīng)常對不上,準確性難以分辨;
- 重用 Kafka 消息隊列:Kafka 保留往往按照天或者月保留,不能全量保留數(shù)據(jù),無法使用現(xiàn)有的 adhoc 查詢引擎分析。
2. Hudi
為了解決 Lambda 架構(gòu)的痛點,我行準備了新一代的數(shù)據(jù)湖技術(shù)架構(gòu),同時我們也花大量的時間調(diào)研了現(xiàn)有的數(shù)據(jù)湖技術(shù),最終選擇 Hudi 作為我們的存儲引擎。
- Update / Delete 記錄:Hudi 使用細粒度的文件/記錄級別索引,來支持 Update / Delete 記錄,同時還提供寫操作的事務(wù)保證,支持 ACID 語義。查詢會處理最后一個提交的快照,并基于此輸出結(jié)果;
- 變更流:Hudi 對獲取數(shù)據(jù)變更提供了流的支持,可以從給定的時間點獲取給定表中已 updated / inserted / deleted 的所有記錄的增量流,可以查詢不同時間的狀態(tài)數(shù)據(jù);
- 技術(shù)棧統(tǒng)一:可以兼容我們現(xiàn)有的 adhoc 查詢引擎 presto,spark。
- 社區(qū)更新迭代速度快:已經(jīng)支持 Flink 兩種不同方式的的讀寫操作,如 COW 和 MOR。
在新的架構(gòu)中可以看到,我們將實時和批處理貼源層的數(shù)據(jù)全部寫到 Hudi 存儲中,并重新寫入到新的數(shù)據(jù)湖層 datalake(Hive 的數(shù)據(jù)庫)。出于歷史的原因,為了兼容之前的數(shù)據(jù)倉庫的模型,我們依然保留之前的 ODS 層,歷史的數(shù)倉模型保持不變,只不過 ODS 貼源層的數(shù)據(jù)需要從 datalake 層獲取。
- 首先,我們可以看到,對于新的表的入倉邏輯,我們通過實時計算平臺使用 Flink 寫入到 datalake 中(新的貼源層,Hudi 格式存儲),數(shù)據(jù)分析師和數(shù)據(jù)科學(xué)家,可以直接使用 datalake 層的數(shù)據(jù)進行數(shù)據(jù)分析和機器學(xué)習(xí)建模。如果數(shù)據(jù)倉庫的模型需要使用 datalake 的數(shù)據(jù)源,需要一層轉(zhuǎn)換 ODS 的邏輯,這里的轉(zhuǎn)換邏輯分為兩種情況:
1、第一種,對于增量模型,用戶只需要將最新 datalake 的分區(qū)使用快照查詢放到 ODS 中即可。
2、第二種,對于全量模型,用戶需要把 ODS 前一天的快照和 datalake 最新的快照查詢的結(jié)果進行一次合并,形成最新的快照再放到 ODS 當前的分區(qū)中,以此類推。
我們這么做的原因是,對于現(xiàn)有的數(shù)倉模型不用改造,只是把 ODS 的數(shù)據(jù)來源換成 datalake,時效性強。同時滿足了數(shù)據(jù)分析和數(shù)據(jù)科學(xué)家準實時獲取數(shù)據(jù)的訴求。
- 另外,對于原始的 ODS 存在的數(shù)據(jù),我們開發(fā)了將 ODS 層的數(shù)據(jù)進行了一次初始化入 datalake 的腳本。
1、如果 ODS 層數(shù)據(jù)每天是全量的快照,我們只將最新的一次快照數(shù)據(jù)初始化到 datalake 的相同分區(qū),然后實時入 datalake 的鏈路接入;
2、如果 ODS 層的數(shù)據(jù)是增量的,我們暫時不做初始化,只在 datalake 中重新建一個實時入湖的鏈路,然后每天做一次增量日切到 ODS 中。 - 最后,如果是一次性入湖的數(shù)據(jù),我們使用批量入湖的工具導(dǎo)入到 datalake 中即可。
整體湖倉轉(zhuǎn)換的邏輯如圖:
3. 技術(shù)挑戰(zhàn)
- 在我們調(diào)研的初期,Hudi 對 Flink 的支持不是很成熟,我們對 Spark - StrunctStreaming 做了大量的開發(fā)和測試。從我們 PoC 測試結(jié)果上看,
1、如果使用無分區(qū)的 COW 寫入的方式,在千萬級寫入量的時候會發(fā)現(xiàn)寫入越來越慢;
2、后來我們將無分區(qū)的改為增量分區(qū)的方式寫入,速度提升了很多。
之所以會產(chǎn)生這個問題,是因為 spark 在寫入時會讀取 basefile 文件索引,文件越大越多,讀取文件索引就會越慢,因此會產(chǎn)生寫入越來越慢的情況。
- 同時,隨著 Flink 對 hudi 支持越來越好,我們的目標是打算將 Hudi 入湖的功能集成到實時計算平臺。因此,我們把實時計算平臺對 Hudi 做了集成和測試,期間也遇到一些問題,典型的問題有:
1、類沖突
2、不能找到 class 文件
3、rocksdb 沖突
為了解決這些不兼容的問題,我們將對 Hudi 的依賴,重新構(gòu)造了一個獨立的模塊,這個工程只是把 Hudi 的依賴打包成一個 shade package。
4、當有依賴沖突時,我們會把 Flink 模塊相關(guān)或者 Hudi 模塊相關(guān)的沖突依賴 exclude 掉。5、而如果有其他依賴包找不到的情況,我們會把相關(guān)的依賴通過 pom 文件引入進來。- 在使用 Hudi on Flink 的方案中,也遇到了相關(guān)的問題,比如,checkpoint 太大導(dǎo)致 checkpoint 時間過長而引起的失敗。這個問題,我們設(shè)置狀態(tài)的 TTL 時間,把全量 checkpoint 改為增量 checkpoint,且提高并行度來解決。
- COW 和 MOR 的選擇。目前我們使用的 Hudi 表以 COW 居多,之所以選擇 COW,
1、第一是因為我們目前歷史存量 ODS 的數(shù)據(jù)都是一次性導(dǎo)入到 datalake 數(shù)據(jù)表中,不存在寫放大的情況。
2、另外一個原因是,COW 的工作流比較簡單,不會涉及到 compaction 這樣的額外操作。
如果是新增的 datalake 數(shù)據(jù),并且存在大量的 update,并且實時性要求較高的情況下,我們更多的選擇 MOR 格式來寫,尤其寫 QPS 比較大的情況下,我們會采用異步 compaction 的操作,避免寫放大。除了這種情況外,我們還是會更傾向以 COW 的格式來寫。
四、百信銀行實時數(shù)據(jù)湖的未來
在我行實時數(shù)據(jù)湖的架構(gòu)中,我們的目標是將實時數(shù)倉的整個鏈路構(gòu)建在Hudi之上,架構(gòu)體系如圖:
我們整體的目標規(guī)劃是替代 kafka,把 Hudi 作為中間存儲,將數(shù)倉建設(shè)在 Hudi 之上,并以 Flink 作為流批一體計算引擎。這樣做的好處有:
- MQ 不再擔(dān)任實時數(shù)據(jù)倉庫存儲的中間存儲介質(zhì),而 Hudi 存儲在 HDFS 上,可以存儲海量數(shù)據(jù)集;
- 實時數(shù)據(jù)倉庫中間層可以使用 OLAP 分析引擎查詢中間結(jié)果數(shù)據(jù);
- 真正意義上的批流一體,數(shù)據(jù) T+1 延遲的問題得到解決;
- 讀時 Schema 不再需要嚴格定義 Schema 類型,支持 schema evolution;
- 支持主鍵索引,數(shù)據(jù)查詢效率數(shù)倍增加,并且支持 ACID 語義,保證數(shù)據(jù)不重復(fù)不丟失;
- Hudi 具有 Timeline 的功能,可以更多存儲數(shù)據(jù)中間的狀態(tài)數(shù)據(jù),數(shù)據(jù)完備性更強。
五、總結(jié)
本文介紹了百信銀行實時計算平臺的建設(shè)情況,實時數(shù)據(jù)湖構(gòu)建在 Hudi 上的方案和實踐方法,以及實時計算平臺集成 Hudi 和使用 Hudi 的方式。
在使用 Hudi 的過程中,也遇到一些問題,由衷感謝社區(qū)同學(xué)的幫助。特別感謝社區(qū) Danny chan,leesf 解疑答惑。在實時數(shù)據(jù)湖架構(gòu)體系下,構(gòu)建我們實時數(shù)倉,流批一體方案還是在摸索中。
僅以此篇,希望能給其他正在建設(shè)實時計算平臺,和使用 Hudi 構(gòu)建實時數(shù)據(jù)湖的同學(xué)提供一些參考。我們也誠懇邀請對實時計算平臺和實時數(shù)據(jù)湖有濃厚興趣的同學(xué)加入我們,投遞簡歷的方式如下。
原文鏈接:https://developer.aliyun.com/article/783950?
版權(quán)聲明:本文內(nèi)容由阿里云實名注冊用戶自發(fā)貢獻,版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識產(chǎn)權(quán)保護指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進行舉報,一經(jīng)查實,本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。 與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的百信银行基于 Apache Hudi 实时数据湖演进方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 宜搭数字工厂,让订单周期缩减三分之一
- 下一篇: Flink on Zeppelin 系列