Apache Iceberg 快速入门
導言
本文主要介紹如何快速的通過Spark訪問 Iceberg table。
如果想及時了解Spark、Hadoop或者HBase相關的文章,歡迎關注微信公眾號:iteblog_hadoop
Spark通過DataSource和DataFrame API訪問Iceberg table,或者進行Catalog相關的操作。由于Spark Data Source V2 API還在持續的演進和修改中,所以Iceberg在不同的Spark版本中的使用方式有所不同。
版本對比
| 基于DataFrame | ||
| - 讀數據 | 支持 | 支持 |
| - 讀元數據 | 支持 | 支持 |
| - 追加(append) | 支持 | 支持 |
| - 覆蓋(Overwrite) | 支持 | 支持 |
| - V2 source專屬操作,如create, overwrite | 不支持 | 支持 |
| 基于Spark SQL | ||
| - SELECT | 通過DataFrame的temporary view | 支持 |
| - DDL | 不支持(僅能通過Iceberg API) | 支持(通過Catalog) |
| - DML | 不支持 | 支持 |
Spark 2.4 環境的使用
配置
Hive MetaStore
Iceberg 內部支持 Hive 和 Hadoop 兩種 catalog:
| Hive catalog | Hive MetaStore | 1級,即DB |
| Hadoop catalog | 文件系統上的某個文件 | 多級,對應多級目錄 |
后文以Hive catalog為主做介紹。Hive catalog需要Hive MetaStore的支持。注意其有多種配置方式,其中內嵌的Derby數據庫僅僅用于實驗和學習,不能用于生產環境。
Spark
<SPARK_HOME>/conf/spark-defaults.conf需要加入如下配置,使Iceberg能夠訪問Hive MetaStore:
| spark.hadoop.hive.metastore.uris?????????? thrift://<HiveMetaStore>:9083 spark.hadoop.hive.metastore.warehouse.dir? hdfs://<NameNode>:8020/path |
部署
如何使用社區正式發布的版本:
| spark-shell --packages org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating |
如何本地打包,并把Iceberg放入Spark的classpath:
| git clone https://github.com/apache/incubator-iceberg.git cd incubator-iceberg # master branch supports Spark 2.4.4 ./gradlew assemble spark-shell --jars <iceberg-git-working-directory>/spark-runtime/build/libs/iceberg-spark-runtime-<version>.jar |
讀Iceberg table
通過DataFrame
Spark 2.4只能讀寫已經存在的Iceberg table。在后續的操作前,需要先通過Iceberg API來創建table。具體如下:
| import org.apache.iceberg.catalog.TableIdentifier; val name = TableIdentifier.of("default", "person"); import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; val schema = new Schema( ??????Types.NestedField.required(1, "id", Types.IntegerType.get()), ??????Types.NestedField.required(2, "name", Types.StringType.get()), ??????Types.NestedField.required(3, "age", Types.IntegerType.get()) ????); import org.apache.iceberg.PartitionSpec; val spec = PartitionSpec.unpartitioned import org.apache.iceberg.hive.HiveCatalog; val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration); val table = catalog.createTable(name, schema, spec); |
讀取是通過DataFrameReader并指定iceberg作為format來訪問Iceberg table,隨后Iceberg內部的邏輯會根據path來判斷訪問的是Hive catalog下的table,還是用文件系統的路徑表示的Hadoop table。
| // Table managed by Hive catalog spark.read ?????.format("iceberg") ?????.load("db.table") // Hadoop table, identified by a path spark.read ?????.format("iceberg") ?????.load("hdfs://<NameNode>:8020/<path_to_table>") |
Iceberg會判斷path中是否含有"/"。如果是,則認為是一個用路徑表示Hadoop table;否則,會去Hive catalog中尋找。
利用time travel回溯某一個snapshot的數據
在讀取時,通過option指定as-of-timestamp或者snapshot-id來訪問之前某一個snapshot中的數據:
| // Time travel to October 26, 1986 at 01:21:00 spark.read ?????.format("iceberg") ?????.option("as-of-timestamp", "499162860000") ?????.load("db.table") // Time travel to snapshot with ID 10963874102873L spark.read ?????.format("iceberg") ?????.option("snapshot-id", 10963874102873L) ?????.load("db.table") |
snapshot-id的獲取方法,可以參考后文中訪問元數據中snapshot的部分,或者直接查看元數據文件的內容。
在DataFrame基礎上使用SQL SELECT
在DataFrame的基礎上,創建local temporary view后,也可以通過SQL SELECT來讀取Iceberg table的內容:
| val df = spark.read ??????????????.format("iceberg") ??????????????.load("db.table") df.createOrReplaceTempView("view") spark.sql("""SELECT * FROM view""") ?????.show() |
寫Iceberg table
Spark 2.4可以通過DataFrameWriter并指定iceberg作為format來寫入Iceberg table,并支持append和overwrite兩種模式:
| // Append df.write ??.format("iceberg") ??.mode("append") ??.save("db.table") // Overwrite df.write ??.format("iceberg") ??.mode("overwrite") ??.save("db.table") |
有如下幾點需要注意:
訪問Iceberg table的元數據
Iceberg支持通過DataFrameReader訪問table的元數據,如snapshot,manifest等。對于Hive table,可以在原table name后面加.history、.snapshots等表示要訪問元數據;對于用路徑來表示的Hadoop table,需要在原路徑后面加#history等。例如:
| // Read snapshot history of db.table spark.read ?????.format("iceberg") ?????.load("db.table.history") |
結果
| +-------------------------+---------------------+---------------------+---------------------+ | made_current_at???????? | snapshot_id???????? | parent_id?????????? | is_current_ancestor | +-------------------------+---------------------+---------------------+---------------------+ | 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL??????????????? | true??????????????? | | 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true??????????????? | | 2019-02-09 16:24:30.13? | 296410040247533544? | 5179299526185056830 | false?????????????? | | 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true??????????????? | | 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true??????????????? | | 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true??????????????? | +-------------------------+---------------------+---------------------+---------------------+ |
又如:
| // Read snapshot list of db.table spark.read ?????.format("iceberg") ?????.load("db.table.snapshots") // Read manifest files of db.table spark.read ?????.format("iceberg") ?????.load("db.table.manifests") // Read data file list of db.tabe spark.read ?????.format("iceberg") ?????.load("db.table.files") |
可以進一步將history和snapshot按照snapshot id做join,來查找snapshot id對應的application id:
| spark.read ?????.format("iceberg") ?????.load("db.table.history") ?????.createOrReplaceTempView("history") spark.read ?????.format("iceberg") ?????.load("db.table.snapshots") ?????.createOrReplaceTempView("snapshots") |
| SELECT ????h.made_current_at, ????s.operation, ????h.snapshot_id, ????h.is_current_ancestor, ????s.summary['spark.app.id'] FROM history h JOIN snapshots s ??ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at |
結果如下:
| -------------------------+-----------+----------------+---------------------+----------------------------------+ | made_current_at???????? | operation | snapshot_id??? | is_current_ancestor | summary[spark.app.id]??????????? | +-------------------------+-----------+----------------+---------------------+----------------------------------+ | 2019-02-08 03:29:51.215 | append??? | 57897183625154 | true??????????????? | application_1520379288616_155055 | | 2019-02-09 16:24:30.13? | delete??? | 29641004024753 | false?????????????? | application_1520379288616_151109 | | 2019-02-09 16:32:47.336 | append??? | 57897183625154 | true??????????????? | application_1520379288616_155055 | | 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true??????????????? | application_1520379288616_152431 | +-------------------------+-----------+----------------+---------------------+----------------------------------+ |
Spark 3.0 環境的使用
Iceberg在Spark 3.0中,作為V2 Data Source,除了上述Spark 2.4所有的訪問能力外,還可以通過V2 Data Source專屬的DataFrame API訪問;同時,受益于external catalog的支持,Spark SQL的DDL功能也可以操作Iceberg table,并且DML語句支持也更加豐富。
配置external catalog
在<SPARK_HOME>/conf/spark-defaults.conf加入如下配置:
| spark.sql.catalog.catalog-name=com.example.YourCatalogClass |
通過V2 Data Source專屬DataFrame API訪問
| df.writeTo("catalog-name.db.table") ??.overwritePartitions() |
通過Spark SQL訪問
相較于Spark 2.4,Spark 3.0可以省去DataFrameReader和創建local temporary view的步驟,直接通過Spark SQL進行操作:
| -- Create table CREATE TABLE catalog-name.db.tabe ????(id INT, data STRING) ????USING iceberg ????PARTITIONED BY (id) -- Insert INSERT INTO catalog-name.db.table ????VALUES (1, 'a'), (2, 'b'), (3, 'c') -- Delete DELETE FROM catalog-name.db.table ????WHERE id <> 1 -- Update UPDATE catalog-name.db.table ????SET data = 'C' WHERE id = 3 -- Create table as select CREATE TABLE catalog-name.db.table ????USING iceberg ????AS SELECT id, data ???????FROM catalog-name.db.table1 ???????WHERE id <= 2 |
我們作為社區中spark-3分支的維護者,正在持續推進新功能的開發和合入,讓更多的人受益。
總結
本文作為Iceberg的快速入門,介紹了如何通過Spark訪問Iceberg table,以及不同Spark版本的支持情況:
- Spark 2.4可以通過DataFrame讀取或修改已經存在的Iceberg table中的數據,但建表、刪表等DDL操作只能通過Iceberg API完成;
- Spark 3.0訪問Iceberg table的能力是Spark 2.4的超集,可以通過Spark SQL配合catalog,進行SELECT、DDL和DML等更多的操作。
隨著Iceberg自身功能的完善(如向量化讀取,merge on read等),以及上下游對接和生態的豐富,Iceberg作為優秀的表格式抽象,在大數據領域必然會有更好的發展。
本文原文:https://mp.weixin.qq.com/s/vvsnHrbzxJ3Gno1XtzHO7g
總結
以上是生活随笔為你收集整理的Apache Iceberg 快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 异步编程:从 Future 到
- 下一篇: 密歇根州立大学联合字节提出AutoEmb