Flink+Iceberg搭建实时数据湖实战
點擊上方藍色字體,選擇“設為星標”
回復"面試"獲取更多驚喜
全網(wǎng)最全大數(shù)據(jù)面試提升手冊!
第一部分:Iceberg 核心功能原理剖析 :
Apache Iceberg
摘自官網(wǎng):
Apache?Iceberg?is?an?open?table?format?for?huge?analytic?datasets.可以看到 Founders 對 Iceberg 的定位是面向海量數(shù)據(jù)分析場景的高效存儲格式。海量數(shù)據(jù)分析的場景,類比于 Hive 是 Hdfs 的封裝一樣,本質上解決的還是數(shù)倉場景的痛點問題。
Iceberg 在最開始,也確實是在數(shù)倉場景朝著更快更好用的 format 目標不斷演進,比如支持 schema 變更,文件粒度的 Filter 優(yōu)化等,但隨著和流式計算 Flink 引擎的生態(tài)打通,Delete/Update/Merge 語義的出現(xiàn),場景就會變得多樣化起來。
背景
過去業(yè)界更多是使用 Hive/Spark on HDFS 作為離線數(shù)據(jù)倉庫的載體,在越來越趨于實時化和快速迭代的場景中,逐漸暴露出以下缺點:
不支持 Row-Level-Update,對于更新的操作需要 overwrite 整張 Hive 表,成本極高
不支持讀寫分離,用戶的讀取操作會被另一個用戶的寫入操作所影響(尤其是流式讀取的場景)
不支持版本回滾和快照,需要保存大量歷史數(shù)據(jù)
不支持增量讀取,每次掃描全表或分區(qū)所有數(shù)據(jù)
性能低,只能裁剪到 Hive Partition 粒度
不支持 Schema 變更
.....
基本概念
如上圖所示,iceberg 將 hdfs 上的文件進行了 snapshot、manifest list、manifest、data files 的分層。
Snapshot:用戶的每次 commit(每次寫入的 spark job) 會產生一個新的 snapshot
Manifest List:維護當前 snapshot 中所有的 manifest
Manifest:維護當前 Manifest 下所有的 data files
Data File:存儲數(shù)據(jù)的文件,后續(xù) Iceberg 引入了 Delete File,用于存儲要刪除的數(shù)據(jù),文件結構上也是與 Data File 處在同一層
核心功能剖析
Time Travel 和增量讀取
Time Travel 指的是用戶可以任意讀取歷史時刻的相關數(shù)據(jù),以 Spark 的代碼為例:
//?time?travel?to?October?26,?1986?at?01:21:00 spark.read.option("as-of-timestamp",?"499162860000").format("iceberg").load("path/to/table")上述代碼即是在讀取 timestamp=499162860000 時,該 Iceberg 表的數(shù)據(jù),那么底層原理是什么樣子的呢?
從「基本概念」中的文件結構可以看到,用戶每次新的寫入都會產生一個 snapshot,那么 Iceberg 只需要存儲用戶每次 commit 產生的 metadata,比如時間戳等信息,就能找到對應時刻的 snapshot,并且解析出 Data Files。
增量讀取也同理,通過 start 和 end 的時間戳取到時間范圍內的 snapshot,并讀取所有的 Data Files 作為原始數(shù)據(jù)。
Fast Scan & Data Filtering
上面提到 Hive 的查詢性能低下,其中一個原因是數(shù)據(jù)計算時,只能下推到 Partition 層面,粒度太粗。而 Iceberg 在細粒度的 Plan 上做了一系列的優(yōu)化,當一個 Query 進入 Iceberg 后:
根據(jù) timestamp 找到對應的 snapshot(默認最新)
根據(jù) Query 的 Partition 信息從指定 snapshot 中過濾出符合條件的 manifest 文件集合
從 manifest 文件集合中取出所有的 Data Files 對象(只包含元信息)
根據(jù) Data File 的若干個屬性,進行更細粒度的數(shù)據(jù)過濾,包括 column-level value counts, null counts, lower bounds, and upper bounds 等
Delete 實現(xiàn)
為了上線 Row-Level Update 的功能,Iceberg 提供了 Delete 的實現(xiàn),通過 Delete + Insert 我們可以達到 Update 的目的。在引入 Delete 實現(xiàn)時,引入了兩個概念:
Delete File:用于存儲刪除的數(shù)據(jù)(分為 position delete 和 equality delete)
Sequence Number:是 Data File 和 Delete File 的共有屬性之一,主要用于區(qū)分 Insert 和 Delete 的先后順序,否則會出現(xiàn)數(shù)據(jù)一致性的問題
position & equality delete
Iceberg 引入了 equality_ids 概念,用戶建表時可以指定 Table 的 equality_ids 來標識未來 Delete 操作對應的 Key,比如 GDPR 場景,我們需要根據(jù) user_id 來隨機刪除用戶的相關數(shù)據(jù),就可以把 equality_ids 設置為 user_id。
兩種 Delete 操作對應不同的 Delete File,其存儲字段也不同:
position delete:包括三列,file_path(要刪除的數(shù)據(jù)所在的 Data File)、pos(行數(shù))、row(數(shù)據(jù))
equality delete:包括 equality_ids 中的字段
顯而易見,存儲 Delete File 的目的是將來讀取數(shù)據(jù)時,進行實時的 Join,而 position delete 在 Join 時能精準定位到文件,并且只需要行號的比較,肯定是更加高效的。所以在 Delete 操作寫入時,Iceberg 會將正在寫入的數(shù)據(jù)文件信息存儲到內存中,來保證將 DELETE 操作盡量走 position delete 的鏈路。示意圖如下所示:
按照時間順序,依次寫入三條 INSERT 和 DELETE 數(shù)據(jù),假設 Iceberg Writer 在寫入 a1 和 b1 的 INSERT 數(shù)據(jù)后,就關閉并新開啟了一個文件,那么此時寫入的記錄 c1 和對應的行號會被記錄在內存中。此時 Writer 接收到 user_id=c1 的數(shù)據(jù)后,便能直接從內存中找到 user_id=c1 的數(shù)據(jù)是在 fileA 中的第一行,此時寫下一個 Position Delete File;而 user_id=a1 的 DELETE 數(shù)據(jù),由于文件已經(jīng)關閉,內存中沒有記錄其信息,所以寫下一個 Equality Delete File。
Sequence Number
引入 DELETE 操作后,如果在讀取時進行合并,則涉及到一個問題,如果用戶對同一個 equality_id 的數(shù)據(jù)進行插入、刪除、再插入,那么讀取時該如何保證把第一次插入的數(shù)據(jù)給刪掉,讀取第二次插入的數(shù)據(jù)?
這里的處理方式是將 Data File 和 Delete File 放在一起按寫入順序編號,在讀取時,DELETE 只對小于當前 Sequence Number 的 Data File 生效。如果遇到相同記錄的并發(fā)寫入的時候怎么辦?這里就要利用 Iceberg 自身的事務機制了,Iceberg Writer 在寫入前會檢查相關 meta 以及 Sequence Number,如果寫入后不符合預期則會采取樂觀鎖的形式進行重試。
Schema Evolution
Iceberg 的 schema evolution 是其特色之一,支持以下操作:
增加字段
刪除字段
重命名字段
修改字段
改變字段順序
關于 schema 的變更也依賴上面文件結構,由于每次寫入時,都會產生 snapshot -> manifest -> data file 的層級,同樣,讀取時也會從 snapshot 開始讀取并路由到對應的底層 data file。所以 Iceberg 只需要每次寫入時在 manifest 中記錄下 schema 的情況,并在讀取時進行對應的轉換即可。
第二部分:Flink+Iceberg環(huán)境搭建:
1. Flink SQL Client配置Iceberg
Flink集群需要使用Scala 2.12版本的
將Iceberg的依賴包下載放到Flink集群所有服務器的lib目錄下,然后重啟Flink
Iceberg默認支持Hadoop Catalog。如果需要使用Hive Catalog,需要將flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服務器的lib目錄下,然后重啟Flink
然后啟動SQL Client就可以了
2.Java/Scala pom.xml配置
添加如下依賴
<dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink</artifactId><version>0.13.0</version><scope>provided</scope></dependency>3.Catalog
3.1 Hive Catalog
注意:測試的時候,從Hive中查詢表數(shù)據(jù),查詢不到。但是從Trino查詢可以查詢到數(shù)據(jù)
使用Hive的metastore保存元數(shù)據(jù),HDFS保存數(shù)據(jù)庫表的數(shù)據(jù)
Flink?SQL>?create?catalog?hive_catalog?with( >?'type'='iceberg', >?'catalog-type'='hive', >?'property-version'='1', >?'cache-enabled'='true', >?'uri'='thrift://hive1:9083', >?'client'='5', >?'warehouse'='hdfs://nnha/user/hive/warehouse', >?'hive-conf-dir'='/root/flink-1.14.3/hive_conf' >?); [INFO]?Execute?statement?succeed.Flink?SQL>property-version: 為了向后兼容,以防property格式改變。當前設置為1即可
cache-enabled: 是否開啟catalog緩存,默認開啟
clients: 在hive metastore中,hive_catalog供客戶端訪問的連接池大小,默認是2
warehouse: 是Flink集群所在的HDFS路徑, hive_catalog下的數(shù)據(jù)庫表存放數(shù)據(jù)的位置
hive-conf-dir: hive集群的配置目錄。只能是Flink集群的本地路徑,從hive-site.xml解析出來的HDFS路徑,是Flink集群所在HDFS路徑
warehouse的優(yōu)先級比hive-conf-dir的優(yōu)先級高
如果Hive中已經(jīng)存在要創(chuàng)建的數(shù)據(jù)庫,則創(chuàng)建的表path會位于Hive的warehouse下
3.2 HDFS Catalog
用HDFS保存元數(shù)據(jù)和數(shù)據(jù)庫表的數(shù)據(jù)。warehouse是Flink集群所在的HDFS路徑
Flink?SQL>?create?catalog?hadoop_catalog?with?( >?'type'='iceberg', >?'catalog-type'='hadoop', >?'property-version'='1', >?'cache-enabled'='true', >?'warehouse'='hdfs://nnha/user/iceberg/warehouse' >?); [INFO]?Execute?statement?succeed.Flink?SQL>通過配置conf/sql-cli-defaults.yaml實現(xiàn)永久catalog。但測試的時候并未生效
[root@flink1?~]#?cat?/root/flink-1.14.3/conf/sql-cli-defaults.yaml? catalogs:-?name:?hadoop_catalogtype:?icebergcatalog-type:?hadoopproperty-version:?1cache-enabled:?truewarehouse:?hdfs://nnha/user/iceberg/warehouse[root@flink1?~]# [root@flink1?~]#?chown?501:games?/root/flink-1.14.3/conf/sql-cli-defaults.yaml下面我們重點以Hadoop Catalog為例,進行測試講解
4.數(shù)據(jù)庫和表相關DDL命令
4.1 創(chuàng)建數(shù)據(jù)庫
Catalog下面默認都有一個default數(shù)據(jù)庫
Flink?SQL>?create?database?hadoop_catalog.iceberg_db; [INFO]?Execute?statement?succeed.Flink?SQL>?use?hadoop_catalog.iceberg_db; [INFO]?Execute?statement?succeed.Flink?SQL>會在HDFS目錄上創(chuàng)建iceberg_db子目錄
如果刪除數(shù)據(jù)庫,會刪除HDFS上的iceberg_db子目錄
4.2 創(chuàng)建表(不支持primary key等)
Flink?SQL>?create?table?hadoop_catalog.iceberg_db.my_user?( >?user_id?bigint?comment?'用戶ID', >?user_name?string, >?birthday?date, >?country?string >?)?comment?'用戶表'? >?partitioned?by?(birthday,?country)?with?( >?'write.format.default'='parquet', >?'write.parquet.compression-codec'='gzip' >?); [INFO]?Execute?statement?succeed.Flink?SQL>目前表不支持計算列、primay key, Watermark
不支持計算分區(qū)。但是iceberg支持計算分區(qū)
因為Iceberg支持primary key。設置屬性'format-version' = '2'和'write.upsert.enabled' = 'true',同時表添加primary key,也是可以支持upsert的。可以實現(xiàn)insert、update、delete的功能
創(chuàng)建表生成的文件信息如下:
查看v1.metadata.json,可以看到"current-snapshot-id" : -1
Flink?SQL>?create?table?hadoop_catalog.iceberg_db.my_user_copy? >?like?hadoop_catalog.iceberg_db.my_user; [INFO]?Execute?statement?succeed.Flink?SQL>復制的表擁有相同的表結構、分區(qū)、表屬性
4.3 修改表
修改表屬性
Flink?SQL>?alter?table?hadoop_catalog.iceberg_db.my_user_copy? >?set( >?'write.format.default'='avro', >?'write.avro.compression-codec'='gzip' >?); [INFO]?Execute?statement?succeed.Flink?SQL>目前Flink只支持修改iceberg的表屬性
重命名表
Flink?SQL>?alter?table?hadoop_catalog.iceberg_db.my_user_copy? >?rename?to?hadoop_catalog.iceberg_db.my_user_copy_new; [ERROR]?Could?not?execute?SQL?statement.?Reason: java.lang.UnsupportedOperationException:?Cannot?rename?Hadoop?tablesFlink?SQL>Hadoop Catalog中的表不支持重命名表
4.4 刪除表
Flink?SQL>?drop?table?hadoop_catalog.iceberg_db.my_user_copy; [INFO]?Execute?statement?succeed.Flink?SQL>會刪除HDFS上的my_user_copy子目錄
5.插入數(shù)據(jù)到表
5.1 insert into
insert into … values …
insert into … select …
HDFS目錄結果如下
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet5.2 insert overwrite(只有Batch模式支持,且overwrite粒度為partition)
只支持Flink Batch模式,不支持Streaming模式
insert overwrite替換多個整個分區(qū),而不是一行數(shù)據(jù)。如果不是分區(qū)表,則替換的是整個表,如下所示:
Flink?SQL>?set?'execution.runtime-mode'?=?'batch'; [INFO]?Session?property?has?been?set.Flink?SQL> Flink?SQL>?insert?overwrite?hadoop_catalog.iceberg_db.my_user?values?(4,?'wang_wu',?date?'2022-02-02',?'japan'); [INFO]?Submitting?SQL?update?statement?to?the?cluster... [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster: Job?ID:?63cf6c27060ec9ebdce75b785cc3fa3aFlink?SQL>?set?'sql-client.execution.result-mode'?=?'tableau'; [INFO]?Session?property?has?been?set.Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |?user_id?|?user_name?|???birthday?|?country?| +---------+-----------+------------+---------+ |???????1?|?zhang_san?|?2022-02-01?|???china?| |???????4?|???wang_wu?|?2022-02-02?|???japan?| |???????2?|?zhang_san?|?2022-02-01?|???china?| +---------+-----------+------------+---------+ 3?rows?in?setbirthday=2022-02-02/country=japan分區(qū)下的數(shù)據(jù)如下,insert overwrite也是新增一個文件
birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194einsert ovewrite … partition替換指定分區(qū)
Flink?SQL>?insert?overwrite?hadoop_catalog.iceberg_db.my_user?partition?(birthday?=?'2022-02-02',?country?=?'japan')?select?5,?'zhao_liu'; [INFO]?Submitting?SQL?update?statement?to?the?cluster... [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster: Job?ID:?97e9ba4131028c53461e739b34108ae0Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |?user_id?|?user_name?|???birthday?|?country?| +---------+-----------+------------+---------+ |???????1?|?zhang_san?|?2022-02-01?|???china?| |???????5?|??zhao_liu?|?2022-02-02?|???japan?| |???????2?|?zhang_san?|?2022-02-01?|???china?| +---------+-----------+------------+---------+ 3?rows?in?setFlink?SQL>6.查詢數(shù)據(jù)
Batch模式
Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |?user_id?|?user_name?|???birthday?|?country?| +---------+-----------+------------+---------+ |???????1?|?zhang_san?|?2022-02-01?|???china?| |???????5?|??zhao_liu?|?2022-02-02?|???japan?| |???????2?|?zhang_san?|?2022-02-01?|???china?| +---------+-----------+------------+---------+ 3?rows?in?setFlink?SQL>streaming模式
查看最新的snapshot-id
[root@flink1?conf]#?hadoop?fs?-cat?hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text 5我們前面創(chuàng)建表 + 兩次insert + 兩次insert overwrite,所以最新的版本號為5。然后我們查看該版本號對于的metadata json文件
[root@flink1?~]#?hadoop?fs?-cat?hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json {"format-version"?:?1,"table-uuid"?:?"84a5e90d-7ae9-4dfd-aeab-c74f07447513","location"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user","last-updated-ms"?:?1644761481488,"last-column-id"?:?4,"schema"?:?{"type"?:?"struct","schema-id"?:?0,"fields"?:?[?{"id"?:?1,"name"?:?"user_id","required"?:?false,"type"?:?"long"},?{"id"?:?2,"name"?:?"user_name","required"?:?false,"type"?:?"string"},?{"id"?:?3,"name"?:?"birthday","required"?:?false,"type"?:?"date"},?{"id"?:?4,"name"?:?"country","required"?:?false,"type"?:?"string"}?]},"current-schema-id"?:?0,"schemas"?:?[?{"type"?:?"struct","schema-id"?:?0,"fields"?:?[?{"id"?:?1,"name"?:?"user_id","required"?:?false,"type"?:?"long"},?{"id"?:?2,"name"?:?"user_name","required"?:?false,"type"?:?"string"},?{"id"?:?3,"name"?:?"birthday","required"?:?false,"type"?:?"date"},?{"id"?:?4,"name"?:?"country","required"?:?false,"type"?:?"string"}?]}?],"partition-spec"?:?[?{"name"?:?"birthday","transform"?:?"identity","source-id"?:?3,"field-id"?:?1000},?{"name"?:?"country","transform"?:?"identity","source-id"?:?4,"field-id"?:?1001}?],"default-spec-id"?:?0,"partition-specs"?:?[?{"spec-id"?:?0,"fields"?:?[?{"name"?:?"birthday","transform"?:?"identity","source-id"?:?3,"field-id"?:?1000},?{"name"?:?"country","transform"?:?"identity","source-id"?:?4,"field-id"?:?1001}?]}?],"last-partition-id"?:?1001,"default-sort-order-id"?:?0,"sort-orders"?:?[?{"order-id"?:?0,"fields"?:?[?]}?],"properties"?:?{"write.format.default"?:?"parquet","write.parquet.compression-codec"?:?"gzip"},"current-snapshot-id"?:?138573494821828246,"snapshots"?:?[?{"snapshot-id"?:?8012517928892530314,"timestamp-ms"?:?1644761130111,"summary"?:?{"operation"?:?"append","flink.job-id"?:?"8f228ae49d34aafb4b2887db3149e3f6","flink.max-committed-checkpoint-id"?:?"9223372036854775807","added-data-files"?:?"2","added-records"?:?"2","added-files-size"?:?"2487","changed-partition-count"?:?"2","total-records"?:?"2","total-files-size"?:?"2487","total-data-files"?:?"2","total-delete-files"?:?"0","total-position-deletes"?:?"0","total-equality-deletes"?:?"0"},"manifest-list"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro","schema-id"?:?0},?{"snapshot-id"?:?453371561664052237,"parent-snapshot-id"?:?8012517928892530314,"timestamp-ms"?:?1644761150082,"summary"?:?{"operation"?:?"append","flink.job-id"?:?"813b7a17c21ddd003e1a210b1366e0c5","flink.max-committed-checkpoint-id"?:?"9223372036854775807","added-data-files"?:?"2","added-records"?:?"2","added-files-size"?:?"2487","changed-partition-count"?:?"2","total-records"?:?"4","total-files-size"?:?"4974","total-data-files"?:?"4","total-delete-files"?:?"0","total-position-deletes"?:?"0","total-equality-deletes"?:?"0"},"manifest-list"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro","schema-id"?:?0},?{"snapshot-id"?:?6410282459040239217,"parent-snapshot-id"?:?453371561664052237,"timestamp-ms"?:?1644761403566,"summary"?:?{"operation"?:?"overwrite","replace-partitions"?:?"true","flink.job-id"?:?"f7085f68e5ff73c1c8aa1f4f59996068","flink.max-committed-checkpoint-id"?:?"9223372036854775807","added-data-files"?:?"1","deleted-data-files"?:?"2","added-records"?:?"1","deleted-records"?:?"2","added-files-size"?:?"1244","removed-files-size"?:?"2459","changed-partition-count"?:?"1","total-records"?:?"3","total-files-size"?:?"3759","total-data-files"?:?"3","total-delete-files"?:?"0","total-position-deletes"?:?"0","total-equality-deletes"?:?"0"},"manifest-list"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro","schema-id"?:?0},?{"snapshot-id"?:?138573494821828246,"parent-snapshot-id"?:?6410282459040239217,"timestamp-ms"?:?1644761481488,"summary"?:?{"operation"?:?"overwrite","replace-partitions"?:?"true","flink.job-id"?:?"d434d6d4f658d61732d7e9a0a85279fc","flink.max-committed-checkpoint-id"?:?"9223372036854775807","added-data-files"?:?"1","deleted-data-files"?:?"1","added-records"?:?"1","deleted-records"?:?"1","added-files-size"?:?"1251","removed-files-size"?:?"1244","changed-partition-count"?:?"1","total-records"?:?"3","total-files-size"?:?"3766","total-data-files"?:?"3","total-delete-files"?:?"0","total-position-deletes"?:?"0","total-equality-deletes"?:?"0"},"manifest-list"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro","schema-id"?:?0}?],"snapshot-log"?:?[?{"timestamp-ms"?:?1644761130111,"snapshot-id"?:?8012517928892530314},?{"timestamp-ms"?:?1644761150082,"snapshot-id"?:?453371561664052237},?{"timestamp-ms"?:?1644761403566,"snapshot-id"?:?6410282459040239217},?{"timestamp-ms"?:?1644761481488,"snapshot-id"?:?138573494821828246}?],"metadata-log"?:?[?{"timestamp-ms"?:?1644760911017,"metadata-file"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"},?{"timestamp-ms"?:?1644761130111,"metadata-file"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"},?{"timestamp-ms"?:?1644761150082,"metadata-file"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"},?{"timestamp-ms"?:?1644761403566,"metadata-file"?:?"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"}?] }[root@flink1?~]#可以看到 "current-snapshot-id" : 138573494821828246,,表示當前的snapshot-id
Flink?SQL>?set?'execution.runtime-mode'?=?'streaming'; [INFO]?Session?property?has?been?set.Flink?SQL> Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user? >?/*+?options( >?'streaming'='true',? >?'monitor-interval'='5s' >?)*/?; +----+----------------------+--------------------------------+------------+--------------------------------+ |?op?|??????????????user_id?|??????????????????????user_name?|???birthday?|????????????????????????country?| +----+----------------------+--------------------------------+------------+--------------------------------+ |?+I?|????????????????????5?|???????????????????????zhao_liu?|?2022-02-02?|??????????????????????????japan?| |?+I?|????????????????????2?|??????????????????????zhang_san?|?2022-02-01?|??????????????????????????china?| |?+I?|????????????????????1?|??????????????????????zhang_san?|?2022-02-01?|??????????????????????????china?|可以看到最新snapshot對應的數(shù)據(jù)
Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user? >?/*+?options( >?'streaming'='true',? >?'monitor-interval'='5s', >?'start-snapshot-id'='138573494821828246' >?)*/?; +----+----------------------+--------------------------------+------------+--------------------------------+ |?op?|??????????????user_id?|??????????????????????user_name?|???birthday?|????????????????????????country?| +----+----------------------+--------------------------------+------------+--------------------------------+這里只能指定最后一個insert overwrite操作的snapshot id,及其后面的snapshot id,否則后臺會報異常,且程序一直處于restarting的狀態(tài):
java.lang.UnsupportedOperationException:?Found?overwrite?operation,?cannot?support?incremental?data?in?snapshots?(8012517928892530314,?138573494821828246]在本示例中snapshot id: 138573494821828246,是最后一個snapshot id,同時也是最后一個insert overwrite操作的snapshot id。如果再insert兩條數(shù)據(jù),則只能看到增量的數(shù)據(jù)
Flink?SQL>?insert?into?hadoop_catalog.iceberg_db.my_user( >?user_id,?user_name,?birthday,?country >?)?values(6,?'zhang_san',?date?'2022-02-01',?'china'); [INFO]?Submitting?SQL?update?statement?to?the?cluster... [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster: Job?ID:?8eb279e61aed66304d78ad027eaf8d30Flink?SQL>?insert?into?hadoop_catalog.iceberg_db.my_user( >?user_id,?user_name,?birthday,?country >?)?values(7,?'zhang_san',?date?'2022-02-01',?'china'); [INFO]?Submitting?SQL?update?statement?to?the?cluster... [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster: Job?ID:?70a050e455d188d0d3f3adc2ba367fb6Flink?SQL>?select?*?from?hadoop_catalog.iceberg_db.my_user? >?/*+?options( >?'streaming'='true',? >?'monitor-interval'='30s', >?'start-snapshot-id'='138573494821828246' >?)*/?; +----+----------------------+--------------------------------+------------+--------------------------------+ |?op?|??????????????user_id?|??????????????????????user_name?|???birthday?|????????????????????????country?| +----+----------------------+--------------------------------+------------+--------------------------------+ |?+I?|????????????????????6?|??????????????????????zhang_san?|?2022-02-01?|??????????????????????????china?| |?+I?|????????????????????7?|??????????????????????zhang_san?|?2022-02-01?|??????????????????????????china?|streaming模式支持讀取增量snapshot數(shù)據(jù)
如果不指定start-snapshot-id,則先讀取當前snapshot全量數(shù)據(jù),再讀取增量數(shù)據(jù)。如果指定start-snapshot-id,讀取該snapshot-id之后的增量數(shù)據(jù),即不讀取該snapshot-id的數(shù)據(jù)
monitor-interval:表示監(jiān)控新提交的數(shù)據(jù)文件的時間間隔,默認1s
如果這個文章對你有幫助,不要忘記?「在看」?「點贊」?「收藏」?三連啊喂!
2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學習指南(勝天半子篇)
互聯(lián)網(wǎng)最壞的時代可能真的來了
我在B站讀大學,大數(shù)據(jù)專業(yè)
我們在學習Flink的時候,到底在學習什么?
193篇文章暴揍Flink,這個合集你需要關注一下
Flink生產環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
我們在學習Spark的時候,到底在學習什么?
在所有Spark模塊中,我愿稱SparkSQL為最強!
硬剛Hive | 4萬字基礎調優(yōu)面試小總結
數(shù)據(jù)治理方法論和實踐小百科全書
標簽體系下的用戶畫像建設小指南
4萬字長文 | ClickHouse基礎&實踐&調優(yōu)全視角解析
【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結
我寫過的關于成長/面試/職場進階的文章
當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」
總結
以上是生活随笔為你收集整理的Flink+Iceberg搭建实时数据湖实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle direct path r
- 下一篇: judgement_mna_2016(3