阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践
湖倉一體(LakeHouse)是大數據領域的重要發(fā)展方向,提供了流批一體和湖倉結合的新場景。阿里云AnalyticDB?for?MySQL基于?Apache?Hudi?構建了新一代的湖倉平臺,提供日志、CDC等多種數據源一鍵入湖,在離線計算引擎融合分析等能力。本文將主要介紹AnalyticDB?for?MySQL基于Apache?Hudi實現多表CDC全增量入湖的經驗與實踐。
1.?背景簡介
1.1.?多表CDC入湖背景介紹
客戶在使用數據湖、傳統數據倉庫的過程中,常常會遇到以下業(yè)務痛點:
- 
全量建倉或直連分析對源庫壓力較大,需要卸載線上壓力規(guī)避故障 
- 
建倉延遲較長(T+1天),需要T+10m的低延遲入湖 
- 
海量數據在事務庫或傳統數倉中存儲成本高,需要低成本歸檔 
- 
傳統數據湖存在不支持更新/小文件較多等缺點 
- 
自建大數據數據平臺運維成本高,需要產品化、云原生、一體化的方案 
- 
常見數倉的存儲不開放,需要自建能力、開源可控 
- 
其他痛點和需求…… 
針對這些業(yè)務痛點,AnalyticDB?MySQL?數據管道組件(AnalyticDB?Pipeline?Service)?基于Apache?Hudi?實現了多表CDC全增量入湖,提供入湖和分析過程中高效的全量數據導入,增量數據實時寫入、ACID事務和多版本、小文件自動合并優(yōu)化、元信息校驗和自動進化、高效的列式分析格式、高效的索引優(yōu)化、超大分區(qū)表存儲等等能力,很好地解決了上述提到的客戶痛點。
1.2. Apache Hudi簡介
AnalyticDB?MySQL選擇了Apache?Hudi作為CDC入湖以及日志入湖的存儲底座。回顧?Hudi?的出現主要針對性解決Uber大數據系統中存在的以下痛點:
- 
HDFS的可擴展性限制。大量的小文件會使得HDFS的Name?Node壓力很大,NameNode節(jié)點成為HDFS的瓶頸。 
- 
HDFS上更快的數據處理。Uber不再滿足于T+1的數據延遲。 
- 
支持Hadoop?+?Parquet的更新與刪除。Uber的數據大多按天分區(qū),舊數據不再修改,T+1?Snapshot讀源端的方式不夠高效,需要支持更新于刪除提高導入效率。 
- 
更快的ETL和數據建模。原本模式下,下游的數據處理任務也必須全量地讀取數據湖的數據,Uber希望提供能力使得下游可以只讀取感興趣的增量數據。 
基于以上的設計目標,Uber公司構建了Hudi(Hadoop?Upserts?Deletes?and?Incrementals)并將其捐贈給Apache基金會。從名字可以看出,Hudi最初的核心能力是高效的更新刪除,以及增量讀取Api。Hudi和“數據湖三劍客”中的其他兩位(Iceberg,DeltaLake)整體功能和架構類似,都大體由以下三個部分組成:
- 
需要存儲的原始數據(Data?Objects) 
- 
用于提供upsert功能的索引數據?(Auxiliary?Data) 
- 
以及用于管理數據集的元數據(Metadata) 
在存儲的原始數據層面,Lakehouse一般采用開源的列存格式(Parquet,ORC等),這方面沒有太大的差異。?在輔助數據層面,Hudi提供了比較高效的寫入索引(Bloomfilter,?Bucket?Index)?,使得其更加適合CDC大量更新的場景。
1.3.?業(yè)界方案簡介
阿里云AnalyticDB團隊在基于Hudi構建多表CDC入湖之前,也調研了業(yè)界的一些實現作為參考,這里簡單介紹一下一些業(yè)界的解決方案。
1.3.1.?Spark/Flink?+?Hudi?單表入湖
使用Hudi實現單表端到端CDC數據入湖的整體架構如圖所示:
圖中的第一個組件是Debezium?deployment,它由?Kafka?集群、Schema?Registry(Confluence?或?Apicurio)和?Debezium?連接器組成。會源源不斷讀取數據庫的binlog數據并將其寫入到Kafka中。
圖中的下游則是Hudi的消費端,這里我們選用Hudi提供的DeltaStreamer組件,他可以消費Kafka中的數據并寫入到Hudi數據湖中。業(yè)界實現類似單表CDC入湖,可以將上述方案中的binlog源從Debezium?+?Kafka替換成Flink?CDC?+?Kafka等等,入湖使用的計算引擎也可以根據實際情況使用Spark/Flink。
這種方式可以很好地同步CDC的數據,但是存在一個問題就是每一張表都需要創(chuàng)建一個單獨的入湖鏈路,如果想要同步數據庫中的多張表,則需要創(chuàng)建多個同步鏈路。這樣的實現存在幾個問題:
- 
同時存在多條入湖鏈路提高了運維難度 
- 
動態(tài)增加刪除庫表比較麻煩 
- 
對于數據量小/更新不頻繁的表,也需要單獨創(chuàng)建一條同步鏈路,造成了資源浪費。 
目前,Hudi也支持一條鏈路多表入湖,但還不夠成熟,不足以應用于生產,具體的使用可以參考這篇文檔。
1.3.2.?Flink?VVP?多表入湖
阿里云實時計算Flink版(即Flink?VVP)?是一種全托管Serverless的Flink云服務,開箱即用,計費靈活。具備一站式開發(fā)運維管理平臺,支持作業(yè)開發(fā)、數據調試、運行與監(jiān)控、自動調優(yōu)、智能診斷等全生命周期能力。
阿里云Flink產品提供了多表入湖的能力(binlog?->?flink?cdc?->?下游消費端),支持在一個Flink任務中同時消費多張表的binlog并寫入下游消費端:
- 
Flink?SQL執(zhí)行create?table?as?table,可以把MySQL庫下所有匹配正則表達式的表同步到Hudi單表,是多對一的映射關系,會做分庫分表的合并。 
- 
Flink?SQL執(zhí)行create?database?as?database,可以把?MySQL庫下所有的表結構和表數據一鍵同步到下游數據庫,暫時不支持hudi表,計劃支持中。 
啟動任務后的拓撲如下,一個源端binlog?source算子將數據分發(fā)到下游所有Hudi?Sink算子上。
通過Flink?VVP可以比較簡單地實現多表CDC入湖,然而,這個方案仍然存在以下的一些問題:
- 
沒有成熟的產品化的入湖管理界面,如增刪庫表,修改配置等需要直接操作Flink作業(yè),添加統一的庫表名前綴需要寫sql?hint。(VVP更多的還是一個全托管Flink平臺而不是一個數據湖產品) 
- 
只提供了Flink的部署形態(tài),在不進行額外比較復雜的配置的情況下,Compaction/Clean等TableService必須運行在鏈路內,影響寫入的性能和穩(wěn)定性。 
綜合考慮后,我們決定采用類似Flink?VVP多表CDC入湖的方案,在AnalyticDB?MySQL上提*品化的多表CDC全增量入湖的功能。
2.?基于Flink?CDC?+?Hudi?實現多表CDC入湖
2.1.?整體架構
AnalyticDB?MySQL多表CDC入湖的主要設計目標如下:
- 
支持一鍵啟動入湖任務消費多表數據寫入Hudi,降低客戶管理成本。 
- 
提*品化管理界面,用戶可以通過界面啟停編輯入湖任務,提供庫表名統一前綴,主鍵映射等產品化功能。 
- 
盡可能降低入湖成本,減少入湖過程中需要部署的組件。 
基于這樣的設計目標,我們初步選擇了以Flink?CDC作為binlog和全量數據源,并且不經過任何中間緩存,直接寫入Hudi的技術方案。
Flink?CDC?是?Apache?Flink?的一個Source?Connector,可以從?MySQL等數據庫讀取快照數據和增量數據。在Flink?CDC?2.0?中,實現了全程無鎖讀取,全量階段并發(fā)讀取以及斷點續(xù)傳的優(yōu)化,更好地達到了“流批一體”。
使用了Flink?CDC的情況下,我們不需要擔心全量增量的切換,可以使用統一的Hudi?Upsert接口進行數據消費,Flink?CDC會負責多表全增量切換和位點管理,降低了任務管理的負擔。而Hudi并不支持原生消費多表數據,所以需要開發(fā)一套代碼,將Flink?CDC的數據寫入到下游多個Hudi表。
這樣實現的好處是:
- 
鏈路短,需要維護的組件少,成本低(不需要依賴獨立部署的binlog源組件如kafka,阿里云DTS等) 
- 
業(yè)界有方案可參考,Flink?CDC?+?Hudi?單表入湖是一個比較成熟的解決方案,阿里云VVP也已經支持了Flink多表寫入Hudi。 
下面詳細介紹一下?AnalyticDB?MySQL?基于這樣架構選型的一些實踐經驗。
2.2.?Flink?CDC+?Hudi?支持動態(tài)Schema變更
目前通過Flink將CDC數據寫入Hudi的流程為
- 
數據消費:源端使用CDC?Client消費binlog數據,并進行反序列化,過濾等操作。 
- 
數據轉換:將CDC格式根據特定Schema數據轉換為Hudi支持的格式,比如Avro格式、Parquet格式、Json格式。 
- 
數據寫入:將數據寫入Hudi,部署在TM的多個Hudi?Write?Client,使用相同的Schema將數據寫入目標表。 
- 
數據提交:由部署在Flink?Job?Manager的Hudi?Coordinator進行單點提交,Commit元數據包括本次提交的文件、寫入Schema等信息。 
其中,步驟2-4都要用到使用寫入Schema,在目前的實現中都是在任務部署前確定好的。同時在任務運行時沒有提供動態(tài)變更Schema的能力。
針對這個問題,我們設計實現了一套可以動態(tài)無干預更新Flink?Hudi入湖鏈路Schema的方案。整體思路為在Flink?CDC中識別DDL?binlog事件,遇到DDL事件時,停止消費增量數據,等待savepoint完成后以新的schema重新啟動任務。
這樣實現的好處是可以動態(tài)更新鏈路中的Schema,不需要人工干預。缺點是需要停止所有庫表的消費再重啟,DDL頻繁的情況下對鏈路性能的影響很大。
2.3.?Flink多表讀寫性能調優(yōu)
2.3.1.?Flink?CDC?+?Hudi?Bucket?Index?全量導入調優(yōu)
這里首先簡單介紹一下Flink?CDC?2.0?全量讀取?+?全增量切換的流程。在全量階段,Flink?CDC會將單表根據并行度劃分為多個chunk并分發(fā)到TaskManager并行讀取,全量讀取完成后可以在保證一致性的情況下,實現無鎖切換到增量,真正做到“流批一體”。
在導入的過程中,我們發(fā)現了兩個問題:
1)全量階段寫入的數據為log文件,但為加速查詢,需要compact成Parquet,帶來寫放大
由于全量和增量的切換Hudi是沒有感知的,所以為了實現去重,在全量階段我們也必須使用Hudi的Upsert接口,而Hudi?Bucket?Index的Uspert會產生log文件,需要進行一次Compaction才能得到parquet文件,造成一定的寫放大。并且如果全量導入的過程中compaction多次,寫放大會更加嚴重。
那么能不能犧牲讀取性能,只寫入log文件呢??答案也是否定的,log文件增多不僅會降低讀取性能,也會降低oss?file?listing的性能,使得寫入也變慢(寫入的時候會list當前file?slice中的log和base文件)
解決方法:調大Ckp間隔或者全量增量使用不同的compaction策略解決(全量階段不做compaction)
2)Flink?全量導入表之間為串行,而寫Hudi的最大并發(fā)為Bucket數,有時無法充分利用集群并發(fā)資源
Flink?CDC全量導入的是表內并行,表之間串行。導入單表的時候,如果讀+寫的并發(fā)小于集群的并發(fā)數,會造成資源浪費,在集群可用資源較多的時候,可能需要適當調高Hudi的Bucket數以提高寫入并發(fā)?。而小表并不需要很大的并發(fā)即可導入完成,在串行導入多個小表的時候一般會有資源浪費情況。如果可以支持小表并發(fā)導入,全量導入的性能會有比較好的提升。
解決辦法:適當的調大Hudi?bucket數來提高導入性能。
2.3.2.?Flink?CDC?+?Hudi?Bucket?Index?增量調優(yōu)
1)?Checkpoint?反壓調優(yōu)
在全增量導入的過程中,我們發(fā)現鏈路Hudi?Ckp經常反壓引起寫入抖動:
可以發(fā)現寫入流量的波動非常大。
我們詳細排查了寫入鏈路,發(fā)現反壓主要是因為Hudi?Ckp時會flush數據,在流量比較大時候,可能需要在一個ckp間隔內flush?3G數據,造成寫入停頓。
解決這個問題的思路就是調小Hudi?Stream?Write的buffer大小(即write.task.max.size)將Checkpoint窗口期間flush數據的壓力平攤到平時。
從上圖可以看到,調整了buffer?size后,因checkpoint造成了反壓引起的寫入流量變化得到了很好的緩解。
為了緩解Ckp的反壓,我們還做了其他的一些優(yōu)化:
- 
調小Hudi?bucket?number,減少Ckp期間需要flush的文件個數(這個和全量階段調大bucket數是沖突的,需要權衡選擇) 
- 
使用鏈路外Spark作業(yè)及時運行Compaction,避免積累log文件過多導致寫log時list?files的開銷過大 
2)?提供合適的寫入Metrics幫助排查性能問題
在調優(yōu)flink鏈路的過程中,我們發(fā)現了flink?hudi寫入相關的metrics缺失的比較嚴重,排查時需要通過比較麻煩的手段分析性能(如觀察現場日志,dump內存、做cpu?profiling等)。于是,我們在內部開發(fā)了一套Flink?Stream?Write的?Metrics?指標幫助我們可以快速的定位性能問題。
指標主要包括:
- 
當前Stream?Write算子占據的buffer大小 
- 
Flush?Buffer耗時 
- 
請求OSS創(chuàng)建文件耗時 
- 
當前活躍的寫入文件數 
- 
.... 
Stream?Write/Append?Write?占據的堆內內存Buffer大小統計:
Parquet/Avro?log?Flush到磁盤耗時:
通過指標值的變化可以幫助快速定位問題,比如上圖Hudi?flush的耗時有一個上揚的趨勢,我們很快定位發(fā)現了因為Compaction做得不及時,導致log文件積壓,使得file?listing速度減慢。在調大Compaction資源后,Flush耗時可以保持平穩(wěn)。
Flink-Hudi?Metrics相關的代碼我們也在持續(xù)貢獻到社區(qū),具體可以參考HUDI-2141。
3)?Compaction調優(yōu)
為了簡化配置,我們一開始采用了在鏈路內Compaction的方案,但是我們很快就發(fā)現了Compaction對寫入資源的搶占非常嚴重,并且負載不穩(wěn)定,很大影響了寫入鏈路的性能和穩(wěn)定性。如下圖,Compaction和GC幾乎吃滿了Task?Manager的Cpu資源。
于是,我們采用了TableService和寫入鏈路分離部署的策略,使用Spark離線任務運行TableService,使得TableService和寫入鏈路相互不影響。并且,Table?Service的消耗的是Serverless資源,按需收費。寫入鏈路因為不用做Compaction,可以保持一個比較小的資源,整體來看資源利用率和性能穩(wěn)定性都得到了很好的提升。
為了方便管理數據庫內多表的TableService,我們開發(fā)了一個可以在單個Spark任務內運行多表的多個TableService的實用工具,目前已經貢獻到社區(qū),可以參見PR。
3.?Flink?CDC?Hudi?多表入湖總結
經過我們多輪的開發(fā)和調優(yōu),Flink?CDC?多表寫入?Hudi?達到了一個基本可用的狀態(tài)。其中,我們認為比較關鍵的穩(wěn)定性/性能優(yōu)化是
- 
將Compaction從寫入鏈路獨立出去,提高寫入和Compaction的資源利用率 
- 
開發(fā)了一套Flink?Hudi?Metrics系統,結合源碼和日志精細化調優(yōu)Hoodie?Stream?Write。 
但是,這套架構方案仍然存在以下的一些無法簡單解決的問題:
- 
Flink?Hudi不支持schema?evolution。Hudi轉換Flink?Row到HoodieRecord所用的schema在拓撲被創(chuàng)建時固定,這意味著每次DDL都需要重啟Flink鏈路,影響增量消費。而支持不停止任務動態(tài)變更Schema在Flink?Hudi場景經POC,改造難度比較大。 
- 
多表同步需要較大的資源開銷,對于沒有數據的表,仍然需要維護他們的算子,造成不必要的開銷。 
- 
新增同步表和摘除同步表需要重啟鏈路。Flink任務拓撲在任務啟動時固定,新增表/刪除表都需要更改拓撲重啟鏈路,影響增量消費。 
- 
直接讀取源庫/binlog對源庫壓力大,多并發(fā)讀取binlog容易打掛源庫,也使得binlog?client不穩(wěn)定。并且由于沒有中間緩存,一旦binlog位點過期,數據需要重新導入。 
- 
全量同步同一時刻只能并發(fā)同步一張表,對于小表的導入不夠高效,大表也有可能因為并發(fā)設置較小而利用不滿資源。 
- 
Hudi的Bucket數對全量導入和增量Upsert寫入的性能影響很大,但是使用Flink?CDC?+?Hudi的框架目前沒辦法為數據庫里不同的表決定不同的Bucket數,使得這個值難以權衡。 
如果繼續(xù)基于這套方案實現多表CDC入湖,我們也可以嘗試從下面的一些方向著手:
- 
優(yōu)化Flink?CDC全量導入,支持多表并發(fā)導入,支持導入時對源表數據量進行sample以動態(tài)決定Hudi的Bucket?Index?Number。解決上述問題5,問題6。 
- 
引入Hudi的Consistent?Hashing?Bucket?Index,從Hudi端解決bucket?index數無法動態(tài)變更的問題,參考HUDI-6329。解決上述問題5,問題6。 
- 
引入一個新的binlog緩存組件(自己搭建或者使用云上成熟產品),下游多個鏈路從緩存隊列中讀取binlog,而不是直接訪問源庫。解決上述問題4。 
- 
Flink支持動態(tài)拓撲,或者Hudi支持動態(tài)變更Schema。解決上述問題1,2,3。 
不過,基于經過內部討論和驗證,我們認為繼續(xù)基于Flink?+?Hudi框架實現多表CDC全增量入湖難度較大,針對這個場景,應該更換為Spark引擎。主要的一些考慮如下。
- 
上述討論的Flink-Hudi優(yōu)化方向,工程量和難度都比較大,有些涉及到了核心機制的變動。 
- 
團隊內部對Spark全增量多表入湖有一定的積累,線上已經有了長期穩(wěn)定運行的客戶案例。 
- 
基于Spark引擎的功能豐富度更好,如Spark微批語義可以支持隱式的動態(tài)Schema變更,Table?Service也更適合使用Spark批作業(yè)運行。 
在我們后續(xù)的實踐中,也證實了我們的判斷是正確的。引擎更換為Spark后,多表CDC全增量入湖的功能豐富程度,擴展性,性能和穩(wěn)定性都得到了很好的提升。我們將在之后的文章中介紹我們基于Spark+Hudi實現多表CDC全增量的實踐,也歡迎讀者們關注。
4.?參考資料
- 
Flink?CDC?+?Hudi?海量數據入湖在順豐的實踐 
- 
Change?Data?Capture?with?Debezium?and?Apache?Hudi 
- 
使用?Flink?Hudi?構建流式數據湖平臺 
- 
基于?Apache?Hudi?的湖倉一體技術在?Shopee?的實踐 
- 
深入解讀?Flink?CDC?增量快照框架 
- 
CDC一鍵入湖:當?Apache?Hudi?DeltaStreamer?遇見?Serverless?Spark 
總結
以上是生活随笔為你收集整理的阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 孔武有力硬如刚是什么意思
- 下一篇: iTunes升级iOS出现未知错误300
