spark sql定义RDD、DataFrame与DataSet
RDD
優點:
- 編譯時類型安全
- 編譯時就能檢查出類型錯誤
- 面向對象的編程風格
- 直接通過類名點的方式來操作數據
缺點:
- 序列化和反序列化的性能開銷
- 無論是集群間的通信, 還是IO操作都需要對對象的結構和數據進行序列化和反序列化.
- GC的性能開銷
- 頻繁的創建和銷毀對象, 勢必會增加GC
DataFrame
DataFrame引入了schema和off-heap
-
schema : RDD每一行的數據, 結構都是一樣的. 這個結構就存儲在schema中. Spark通過schame就能夠讀懂數據, 因此在通信和IO時就只需要序列化和反序列化數據, 而結構的部分就可以省略了.
-
off-heap : 意味著JVM堆以外的內存, 這些內存直接受操作系統管理(而不是JVM)。Spark能夠以二進制的形式序列化數據(不包括結構)到off-heap中, 當要操作數據時, 就直接操作off-heap內存. 由于Spark理解schema, 所以知道該如何操作.
off-heap就像地盤, schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制, 也就不再收GC的困擾了.
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。DataFrame多了數據的結構信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計劃的優化,比如filter下推、裁剪等。
提升執行效率
RDD API是函數式的,強調不變性,在大部分場景下傾向于創建新對象而不是修改老對象。這一特點雖然帶來了干凈整潔的API,卻也使得Spark應用程序在運行期傾向于創建大量臨時對象,對GC造成壓力。在現有RDD API的基礎之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內的數據創建方式,用復用可變對象的方式來減小對象分配和GC的開銷,但這犧牲了代碼的可讀性,而且要求開發者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark SQL在框架內部已經在各種可能的情況下盡量重用對象,這樣做雖然在內部會打破了不變性,但在將數據返回給用戶時,還會重新轉為不可變數據。利用 DataFrame API進行開發,可以免費地享受到這些優化效果。
減少數據讀取
分析大數據,最快的方法就是 ——忽略它。這里的“忽略”并不是熟視無睹,而是根據查詢條件進行恰當的剪枝。
上文討論分區表時提到的分區剪 枝便是其中一種——當查詢的過濾條件中涉及到分區列時,我們可以根據查詢條件剪掉肯定不包含目標數據的分區目錄,從而減少IO。
對于一些“智能”數據格 式,Spark SQL還可以根據數據文件中附帶的統計信息來進行剪枝。簡單來說,在這類數據格式中,數據是分段保存的,每段數據都帶有最大值、最小值、null值數量等 一些基本的統計信息。當統計信息表名某一數據段肯定不包括符合查詢條件的目標數據時,該數據段就可以直接跳過(例如某整數列a某段的最大值為100,而查詢條件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其余列的數據。
執行優化
為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之后又做了一次filter操作。如果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基于關系代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用于執行最終的讀取操作的表掃描節點內。
對于普通開發者而言,查詢優化 器的意義在于,即便是經驗并不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。
此外,通過schema和off-heap, DataFrame解決了RDD的缺點, 但是卻丟了RDD的優點. DataFrame不是類型安全的, API也不是面向對象風格的.
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Run {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)/*** id age* 1 30* 2 29* 3 21*/val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)// API不是面向對象的idAgeDF.filter(idAgeDF.col("age") > 25) // 不會報錯, DataFrame不是編譯時類型安全的idAgeDF.filter(idAgeDF.col("age") > "") } }DataSet
DataSet結合了RDD和DataFrame的優點, 并帶來的一個新的概念Encoder
當序列化數據時, Encoder產生字節碼與off-heap進行交互, 能夠達到按需訪問數據的效果, 而不用反序列化整個對象. Spark還沒有提供自定義Encoder的API, 但是未來會加入.
下面看DataFrame和DataSet在2.0.0-preview中的實現
下面這段代碼, 在1.6.x中創建的是DataFrame:
// 上文DataFrame示例中提取出來的 val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)- 1
但是同樣的代碼在2.0.0-preview中, 創建的雖然還叫DataFrame:
// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實現, 返回值依然是DataFrame def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { sparkSession.createDataFrame(rowRDD, schema) }但是其實卻是DataSet, 因為DataFrame被聲明為Dataset[Row]:
package object sql {// ...省略了不相關的代碼type DataFrame = Dataset[Row] }因此當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.
總結
以上是生活随笔為你收集整理的spark sql定义RDD、DataFrame与DataSet的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: Scala入门到精通—— 第二节Scal