Spark SQL之queryExecution运行流程解析Logical Plan(三)
生活随笔
收集整理的這篇文章主要介紹了
Spark SQL之queryExecution运行流程解析Logical Plan(三)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.整體運行流程
使用下列代碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀態,理解SparkSQL整體執行流程
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._// Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table. val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")(1)查看teenagers的Schema信息
scala> teenagers.printSchema root|-- name: string (nullable = true)|-- age: integer (nullable = false)(2)查看運行流程
scala> teenagers.queryExecution res3: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == 'Project [unresolvedalias('name),unresolvedalias('age)]'Filter (('age >= 13) && ('age <= 19)) 'UnresolvedRelation [people], None== Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter ((age#1 >= 13) && (age#1 <= 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true- 1
QueryExecution中表示的是整體Spark SQL運行流程,從上面的輸出結果可以看到,一個SQL語句要執行需要經過下列步驟:
== (1)Parsed Logical Plan == 'Project [unresolvedalias('name),unresolvedalias('age)]'Filter (('age >= 13) && ('age <= 19)) 'UnresolvedRelation [people], None== (2)Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter ((age#1 >= 13) && (age#1 <= 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== (3)Optimized Logical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== (4)Physical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]//啟動動態字節碼生成技術(bytecode generation,CG),提升查詢效率 Code Generation: true2.全表查詢運行流程
執行語句:
val all= sqlContext.sql("SELECT * FROM people")運行流程:
scala> all.queryExecution res9: org.apache.spark.sql.SQLContext#QueryExecution = //注意*號被解析為unresolvedalias(*) == Parsed Logical Plan == 'Project [unresolvedalias(*)]'UnresolvedRelation [people], None== Analyzed Logical Plan == //unresolvedalias(*)被analyzed為Schema中所有的字段 //UnresolvedRelation [people]被analyzed為Subquery people name: string, age: int Project [name#0,age#1] Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan == LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan == Scan PhysicalRDD[name#0,age#1]Code Generation: true3. filter查詢運行流程
執行語句:
scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19") filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]- 1
執行流程:
scala> filterQuery.queryExecution res0: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == 'Project [unresolvedalias(*)]'Filter (('age >= 13) && ('age <= 19)) 'UnresolvedRelation [people], None== Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] //多出了Filter,后同 Filter ((age#1 >= 13) && (age#1 <= 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20== Optimized Logical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20== Physical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true- 1
4. join查詢運行流程
執行語句:
val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")查看整體執行流程
scala> joinQuery.queryExecution res0: org.apache.spark.sql.SQLContext#QueryExecution = //注意Filter //Join Inner == Parsed Logical Plan == 'Project [unresolvedalias(*)]'Filter ('a.age = 'b.age)'Join Inner, None'UnresolvedRelation [people], Some(a)'UnresolvedRelation [people], Some(b)== Analyzed Logical Plan == name: string, age: int, name: string, age: int Project [name#0,age#1,name#2,age#3]Filter (age#1 = age#3)Join Inner, NoneSubquery aSubquery peopleLogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22Subquery bSubquery peopleLogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan == Project [name#0,age#1,name#2,age#3]Join Inner, Some((age#1 = age#3))LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...//查看其Physical Plan scala> joinQuery.queryExecution.sparkPlan res16: org.apache.spark.sql.execution.SparkPlan = TungstenProject [name#0,age#1,name#2,age#3]SortMergeJoin [age#1], [age#3]Scan PhysicalRDD[name#0,age#1]Scan PhysicalRDD[name#2,age#3]- 1
前面的例子與下面的例子等同,只不過其運行方式略有不同,執行語句:
scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age") innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]- 1
查看整體執行流程:
scala> innerQuery.queryExecution res2: org.apache.spark.sql.SQLContext#QueryExecution = //注意Join Inner //另外這里面沒有Filter == Parsed Logical Plan == 'Project [unresolvedalias(*)]'Join Inner, Some(('a.age = 'b.age))'UnresolvedRelation [people], Some(a)'UnresolvedRelation [people], Some(b)== Analyzed Logical Plan == name: string, age: int, name: string, age: int Project [name#0,age#1,name#4,age#5]Join Inner, Some((age#1 = age#5))Subquery aSubquery peopleLogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22Subquery bSubquery peopleLogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22//注意Optimized Logical Plan與Analyzed Logical Plan //并沒有進行特別的優化,突出這一點是為了比較后面的子查詢 //其Analyzed和Optimized間的區別 == Optimized Logical Plan == Project [name#0,age#1,name#4,age#5]Join Inner, Some((age#1 = age#5))LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...//查看其Physical Plan scala> innerQuery.queryExecution.sparkPlan res14: org.apache.spark.sql.execution.SparkPlan = TungstenProject [name#0,age#1,name#6,age#7]SortMergeJoin [age#1], [age#7]Scan PhysicalRDD[name#0,age#1]Scan PhysicalRDD[name#6,age#7]- 1
5. 子查詢運行流程
執行語句:
scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19") subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]- 1
查看整體執行流程:
scala> subQuery.queryExecution res4: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == 'Project [unresolvedalias(*)]'Filter ('a.age <= 19)'Subquery a 'Project [unresolvedalias(*)] 'Filter ('age >= 13) 'UnresolvedRelation [people], None== Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter (age#1 <= 19) Subquery a Project [name#0,age#1] Filter (age#1 >= 13) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22//這里需要注意Optimized與Analyzed間的區別 //Filter被進行了優化 == Optimized Logical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan == Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true6. 聚合SQL運行流程
執行語句:
scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name") aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]- 1
運行流程查看:
scala> aggregateQuery.queryExecution res6: org.apache.spark.sql.SQLContext#QueryExecution = //注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))] //即group by a.name被 parsed為unresolvedalias('a.name) == Parsed Logical Plan == 'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]'Filter ('a.age <= 19)'Subquery a'Project [unresolvedalias(*)]'Filter ('age >= 13)'UnresolvedRelation [people], None== Analyzed Logical Plan == name: string, _c1: bigint Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]Filter (age#1 <= 19)Subquery aProject [name#0,age#1]Filter (age#1 >= 13)Subquery peopleLogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan == Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]Filter ((age#1 >= 13) && (age#1 <= 19))LogicalRDD [name#0,age#1], MapPartitions...//查看其Physical Plan scala> aggregateQuery.queryExecution.sparkPlan res10: org.apache.spark.sql.execution.SparkPlan = TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])Filter ((age#1 >= 13) && (age#1 <= 19))Scan PhysicalRDD[name#0,age#1]其它SQL語句,大家可以使用同樣的方法查看其執行流程,以掌握Spark SQL背后實現的基本思想。
總結
以上是生活随笔為你收集整理的Spark SQL之queryExecution运行流程解析Logical Plan(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark SQL 之SQLContex
- 下一篇: Spark SQL之案例实战(四)