Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式數(shù)據(jù)入庫的場(chǎng)景為基礎(chǔ),介紹引入 Iceberg 作為落地格式和嵌入 Flink sink 的收益,并分析了當(dāng)前可實(shí)現(xiàn)的框架及要點(diǎn)。
應(yīng)用場(chǎng)景
流式數(shù)據(jù)入庫,是大數(shù)據(jù)和數(shù)據(jù)湖的典型應(yīng)用場(chǎng)景。上游的流式數(shù)據(jù),如日志,或增量修改,通過數(shù)據(jù)總線,經(jīng)過必要的處理后,匯聚并存儲(chǔ)于數(shù)據(jù)湖,供下游的應(yīng)用(如報(bào)表或者商業(yè)智能分析)使用。
上述的應(yīng)用場(chǎng)景通常有如下的痛點(diǎn),需要整個(gè)流程不斷的優(yōu)化:
- 支持流式數(shù)據(jù)寫入,并保證端到端的不重不丟(即 exactly-once);
- 盡量減少中間環(huán)節(jié),能支持更實(shí)時(shí)(甚至是 T+0)的讀取或?qū)С?#xff0c;給下游提供更實(shí)時(shí)更準(zhǔn)確的基礎(chǔ)數(shù)據(jù);
- 支持 ACID,避免臟讀等錯(cuò)誤發(fā)生;
- 支持修改已落地的數(shù)據(jù),雖然大數(shù)據(jù)和數(shù)據(jù)湖長(zhǎng)于處理靜態(tài)的或者緩慢變化的數(shù)據(jù),即讀多寫少的場(chǎng)景,但方便的修改功能可以提升用戶體驗(yàn),避免用戶因?yàn)闃O少的修改,手動(dòng)更換整個(gè)數(shù)據(jù)文件,甚至是重新導(dǎo)出;
- 支持修改表結(jié)構(gòu),如增加或者變更列;而且變更不要引起數(shù)據(jù)的重新組織。
引入 Iceberg 作為 Flink sink
為了解決上述痛點(diǎn),我們引入了 Iceberg 作為數(shù)據(jù)落地的格式。Iceberg 支持 ACID 事務(wù)、修改和刪除、獨(dú)立于計(jì)算引擎、支持表結(jié)構(gòu)和分區(qū)方式動(dòng)態(tài)變更等特性,很好的滿足我們的需求。
同時(shí),為了支持流式數(shù)據(jù)的寫入,我們引入 Flink 作為流式處理框架,并將 Iceberg 作為 Flink sink。
下文主要介紹 Flink Iceberg sink 的實(shí)現(xiàn)框架和要點(diǎn)。但在這之前,需要先介紹一些實(shí)現(xiàn)中用到的 Flink 基本概念。
Flink 基本概念
從 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 來統(tǒng)一的處理流和批數(shù)據(jù)。
Stream, Transformation 和 Operator
一個(gè) Flink 程序由 stream 和 transformation 組成:
- Stream: Transformation 之間的中間結(jié)果數(shù)據(jù);
- Transformation:對(duì)(一個(gè)或多個(gè))輸入 stream 進(jìn)行操作,輸出(一個(gè)或多個(gè))結(jié)果 stream。
當(dāng) Flink 程序執(zhí)行時(shí),其被映射成 Streaming Dataflow,由如下的部分組成:
- Source (operator):接收外部輸入給 Flink;
- Transformation (operator):中間對(duì) stream 做的任何操作;
- Sink (operator):Flink 輸出給外部。
下圖為 Flink 官網(wǎng)的示例,展示了一個(gè)以 Kafka 作為輸入 Source,經(jīng)過中間兩個(gè) transformation,最終通過 sink 輸出到 Flink 之外的過程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基于 snapshot 的恢復(fù)機(jī)制,保證程序 state 的一致性,實(shí)現(xiàn)容錯(cuò)。
Checkpoint 是對(duì)分布式的數(shù)據(jù)流,以及所有 operator 的 state,打 snapshot 的過程。
■ State
一個(gè) operator 的 state,即它包含的所有用于恢復(fù)當(dāng)前狀態(tài)的信息,可分為兩類:
- 系統(tǒng) state:如 operator 中對(duì)數(shù)據(jù)的緩存。
- 用戶自定義 state:和用戶邏輯相關(guān),可以利用 Flink 提供的 managed state,如 ValueState、ListState,來存儲(chǔ)。
State 的存儲(chǔ)位置,可以分為:
- Local:內(nèi)存,或者本地磁盤
- State backend:遠(yuǎn)端的持久化存儲(chǔ),如 HDFS。
如下圖所示:
■ Checkpoint
Flink 做 checkpoint 的過程如下:
如下圖所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作為一個(gè)系統(tǒng)標(biāo)記,被插入到數(shù)據(jù)流中,隨真實(shí)數(shù)據(jù)一起,按照數(shù)據(jù)流的方向,從上游向下游傳遞。
由于每個(gè) barrier 唯一對(duì)應(yīng) checkpoint id,所以數(shù)據(jù)流中的 record 實(shí)際被 barrier 分組,如下圖所示,barrier n 和 barrier n-1 之間的 record,屬于 checkpoint n。
Barrier 的作用是在分布式的數(shù)據(jù)流中,將 operator 的多個(gè)輸入流按照 checkpoint對(duì)齊(align),如下圖所示:
Flink Iceberg sink
了解了上述 Flink 的基本概念,這些概念又是如何被應(yīng)用和映射到 Flink Iceberg sink 當(dāng)中的呢?
總體框架
如圖,Flink Iceberg sink 有兩個(gè)主要模塊和兩個(gè)輔助模塊組成:
實(shí)現(xiàn)要點(diǎn)
■ Writer
■ Committer
試用 Flink Iceberg sink
社區(qū)上?https://github.com/apache/incubator-iceberg/pull/856?提供了可以試用的原型代碼。下載該 patch 放入 master 分支,編譯并構(gòu)建即可。如下的程序展示了如何將該 sink 嵌入到 Flink 數(shù)據(jù)流中:
// Configurate catalog org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration(); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS); hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table Schema schema = new Schema(... ); PartitionSpec partitionSpec = builderFor(schema)... TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME); // If needed, check the existence of table by loadTable() and drop it // before creating it icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing env.enableCheckpointing(...);// Add Source DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink Configuration conf = new Configuration(); conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS); conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME); conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1); appender.append(dataStream);// Trigger the execution env.execute("Sink Test");后續(xù)規(guī)劃
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作為中間格式;以及在各種失敗的情況下是否仍能保證端到端的 exactly-once;按固定時(shí)長(zhǎng)做 checkpoint,在高低峰時(shí)生成不同大小的 DataFile,是否對(duì)后續(xù)讀不友好等。這些問題都在我們的后續(xù)規(guī)劃中,也會(huì)全數(shù)貢獻(xiàn)給社區(qū)。
參考資料:
[1] Iceberg 官網(wǎng):
https://iceberg.apache.org/
[2] Flink 1.10文 檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 設(shè)計(jì)文檔:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容錯(cuò)機(jī)制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
?
?
# 社區(qū)活動(dòng)推薦 #
普惠全球開發(fā)者,這一次,格外與眾不同!首個(gè) Apache 頂級(jí)項(xiàng)目在線會(huì)議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內(nèi)外一線廠商,經(jīng)典 Flink 應(yīng)用場(chǎng)景,最新功能、未來規(guī)劃一覽無余。點(diǎn)擊下方鏈接可了解更多大會(huì)詳情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Iceberg 在基于 Flink 的流式数据入库场景中的应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云峰会|数据库也能自动驾驶?DAS全
- 下一篇: 菜鸟网络宣布推出物流加速上云行动“鲲鹏计