Spark SQL Catalyst源代码分析Optimizer
??/**?Spark SQL源代碼分析系列*/
? 前幾篇文章介紹了Spark SQL的Catalyst的核心運(yùn)行流程、SqlParser,和Analyzer?以及核心類庫(kù)TreeNode,本文將具體解說Spark SQL的Optimizer的優(yōu)化思想以及Optimizer在Catalyst里的表現(xiàn)方式,并加上自己的實(shí)踐。對(duì)Optimizer有一個(gè)直觀的認(rèn)識(shí)。
? Optimizer的主要職責(zé)是將Analyzer給Resolved的Logical Plan依據(jù)不同的優(yōu)化策略Batch。來(lái)對(duì)語(yǔ)法樹進(jìn)行優(yōu)化。優(yōu)化邏輯計(jì)劃節(jié)點(diǎn)(Logical Plan)以及表達(dá)式(Expression),也是轉(zhuǎn)換成物理運(yùn)行計(jì)劃的前置。例如以下圖:
??
一、Optimizer
? Optimizer這個(gè)類是在catalyst里的optimizer包下的唯一一個(gè)類。Optimizer的工作方式事實(shí)上相似Analyzer,由于它們都繼承自RuleExecutor[LogicalPlan],都是運(yùn)行一系列的Batch操作:
??
? Optimizer里的batches包括了3類優(yōu)化策略:1、Combine Limits 合并Limits ?2、ConstantFolding 常量合并 3、Filter Pushdown 過濾器下推,每一個(gè)Batch里定義的優(yōu)化伴隨對(duì)象都定義在Optimizer里了:
object Optimizer extends RuleExecutor[LogicalPlan] {val batches =Batch("Combine Limits", FixedPoint(100),CombineLimits) ::Batch("ConstantFolding", FixedPoint(100),NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions) ::Batch("Filter Pushdown", FixedPoint(100),CombineFilters,PushPredicateThroughProject,PushPredicateThroughJoin,ColumnPruning) :: Nil }
? 另外提一點(diǎn),Optimizer里不但對(duì)Logical Plan進(jìn)行了優(yōu)化,并且對(duì)Logical Plan中的Expression也進(jìn)行了優(yōu)化,所以有必要了解一下Expression相關(guān)類。主要是用到了references和outputSet,references主要是Logical Plan或Expression節(jié)點(diǎn)的所依賴的那些Expressions,而outputSet是Logical Plan所有的Attribute的輸出:
? 如:Aggregate是一個(gè)Logical Plan, 它的references就是group by的表達(dá)式 和 aggreagate的表達(dá)式的并集去重。
case class Aggregate(groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)extends UnaryNode {override def output = aggregateExpressions.map(_.toAttribute)override def references =(groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet }??
二、優(yōu)化策略具體解釋
? Optimizer的優(yōu)化策略不僅有對(duì)plan進(jìn)行transform的,也有對(duì)expression進(jìn)行transform的,究其原理就是遍歷樹。然后應(yīng)用優(yōu)化的Rule,可是注意一點(diǎn),對(duì)Logical Plantransfrom的是先序遍歷(pre-order),而對(duì)Expression transfrom的時(shí)候是后序遍歷(post-order):2.1、Batch:?Combine Limits
假設(shè)出現(xiàn)了2個(gè)Limit,則將2個(gè)Limit合并為一個(gè),這個(gè)要求一個(gè)Limit是還有一個(gè)Limit的grandChild。 /*** Combines two adjacent [[Limit]] operators into one, merging the* expressions into one single expression.*/ object CombineLimits extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll為當(dāng)前Limit,le為其expression。 nl是ll的grandChild,ne是nl的expressionLimit(If(LessThan(ne, le), ne, le), grandChild) //expression比較,假設(shè)ne比le小則表達(dá)式為ne,否則為le} }給定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")?scala> query.queryExecution.analyzed res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Limit 10Project [key#13,value#14]Limit 100Project [key#13,value#14]MetastoreRelation default, temp_shengli, None
子查詢里limit100,外層查詢limit10,這里我們當(dāng)然能夠在子查詢里不必查那么多。由于外層僅僅須要10個(gè),所以這里會(huì)合并Limit10。和Limit100 為 Limit 10。
2.2、Batch:?ConstantFolding
? 這個(gè)Batch里包括了Rules:NullPropagation,ConstantFolding。BooleanSimplification,SimplifyFilters。SimplifyCasts。SimplifyCaseConversionExpressions。
2.2.1、Rule:NullPropagation
? 這里先提一下Literal字面量。它事實(shí)上是一個(gè)能匹配隨意基本類型的類。(為下文做鋪墊)
object Literal {def apply(v: Any): Literal = v match {case i: Int => Literal(i, IntegerType)case l: Long => Literal(l, LongType)case d: Double => Literal(d, DoubleType)case f: Float => Literal(f, FloatType)case b: Byte => Literal(b, ByteType)case s: Short => Literal(s, ShortType)case s: String => Literal(s, StringType)case b: Boolean => Literal(b, BooleanType)case d: BigDecimal => Literal(d, DecimalType)case t: Timestamp => Literal(t, TimestampType)case a: Array[Byte] => Literal(a, BinaryType)case null => Literal(null, NullType)} }? 注意Literal是一個(gè)LeafExpression,核心方法是eval,給定Row。計(jì)算表達(dá)式返回值:
case class Literal(value: Any, dataType: DataType) extends LeafExpression {override def foldable = truedef nullable = value == nulldef references = Set.emptyoverride def toString = if (value != null) value.toString else "null"type EvaluatedType = Anyoverride def eval(input: Row):Any = value }? 如今來(lái)看一下NullPropagation都做了什么。
??NullPropagation是一個(gè)能將Expression Expressions替換為等價(jià)的Literal值的優(yōu)化。并且能夠避免NULL值在SQL語(yǔ)法樹的傳播。
/*** Replaces [[Expression Expressions]] that can be statically evaluated with* equivalent [[Literal]] values. This rule is more specific with* Null value propagation from bottom to top of the expression tree.*/ object NullPropagation extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //假設(shè)count(null)則轉(zhuǎn)化為count(0)case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//假設(shè)sum(null)則轉(zhuǎn)化為sum(0)</span>case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)case e @ Coalesce(children) => {val newChildren = children.filter(c => c match {case Literal(null, _) => falsecase _ => true})if (newChildren.length == 0) {Literal(null, e.dataType)} else if (newChildren.length == 1) {newChildren(0)} else {Coalesce(newChildren)}}case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValuecase e @ In(Literal(v, _), list) if (list.exists(c => c match {case Literal(candidate, _) if candidate == v => truecase _ => false})) => Literal(true, BooleanType)// Put exceptional cases above if anycase e: BinaryArithmetic => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: BinaryComparison => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}case e: StringRegexExpression => e.children match {case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)case _ => e}}} }給定SQL:?val query = sql("select count(null) from temp_shengli where key is not null")
scala> query.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [COUNT(null) AS c0#5L] //這里count的是nullFilter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None調(diào)用NullPropagation
scala> NullPropagation(query.queryExecution.analyzed) res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [CAST(0, LongType) AS c0#5L] //優(yōu)化后為0了Filter IS NOT NULL key#7MetastoreRelation default, temp_shengli, None
2.2.2、Rule:ConstantFolding?
? 常量合并是屬于Expression優(yōu)化的一種,對(duì)于能夠直接計(jì)算的常量,不用放到物理運(yùn)行里去生成對(duì)象來(lái)計(jì)算了,直接能夠在計(jì)劃里就計(jì)算出來(lái): object ConstantFolding extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先對(duì)plan進(jìn)行transformcase q: LogicalPlan => q transformExpressionsDown { //對(duì)每一個(gè)plan的expression進(jìn)行transform// Skip redundant folding of literals.case l: Literal => lcase e if e.foldable => Literal(e.eval(null), e.dataType) //調(diào)用eval方法計(jì)算結(jié)果}}}給定SQL:?val query = sql("select 1+2+3+4 from temp_shengli")scala> query.queryExecution.analyzed res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(((1 + 2) + 3) + 4) AS c0#21] //這里還是常量表達(dá)式MetastoreRelation default, src, None優(yōu)化后:
scala> query.queryExecution.optimizedPlan res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [10 AS c0#21] //優(yōu)化后。直接合并為10MetastoreRelation default, src, None
2.2.3、BooleanSimplification
?這個(gè)是對(duì)布爾表達(dá)式的優(yōu)化,有點(diǎn)像java布爾表達(dá)式中的短路推斷。只是這個(gè)寫的倒是非常優(yōu)雅。
?看看布爾表達(dá)式2邊能不能通過僅僅計(jì)算1邊,而省去計(jì)算還有一邊而提高效率,稱為簡(jiǎn)化布爾表達(dá)式。
?解釋請(qǐng)看我寫的凝視:
/*** Simplifies boolean expressions where the answer can be determined without evaluating both sides.* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus* is only safe when evaluations of expressions does not result in side effects.*/ object BooleanSimplification extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case q: LogicalPlan => q transformExpressionsUp {case and @ And(left, right) => //假設(shè)布爾表達(dá)式是AND操作,即exp1 and exp2(left, right) match { //(左邊表達(dá)式。右邊表達(dá)式)case (Literal(true, BooleanType), r) => r // 左邊true。返回右邊的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>case (l, Literal(true, BooleanType)) => l //右邊true,返回左邊的bool值case (Literal(false, BooleanType), _) => Literal(false)//左邊都false,右邊隨便。反正是返回falsecase (_, Literal(false, BooleanType)) => Literal(false)//僅僅要有1邊是false了,都是falsecase (_, _) => and}case or @ Or(left, right) =>(left, right) match {case (Literal(true, BooleanType), _) => Literal(true) //僅僅要左邊是true了,不用推斷右邊都是truecase (_, Literal(true, BooleanType)) => Literal(true) //僅僅要有一邊是true,都返回truecase (Literal(false, BooleanType), r) => r //希望右邊r是truecase (l, Literal(false, BooleanType)) => lcase (_, _) => or}}} }
2.3 Batch:?Filter Pushdown
Filter Pushdown下包括了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruningPs:感覺Filter Pushdown的名字起的有點(diǎn)不能涵蓋所有比方ColumnPruning列裁剪。2.3.1、Combine Filters
?合并兩個(gè)相鄰的Filter,這個(gè)和上述Combine Limit差點(diǎn)兒相同。合并2個(gè)節(jié)點(diǎn),就能夠降低樹的深度從而降低反復(fù)運(yùn)行過濾的代價(jià)。/*** Combines two adjacent [[Filter]] operators into one, merging the* conditions into one conjunctive predicate.*/ object CombineFilters extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)} }給定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")?
優(yōu)化前:我們看到一個(gè)filter 是還有一個(gè)filter的grandChild
scala> query.queryExecution.analyzed res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27]Filter (key#27 > 80) //filter>80Project [key#27]Filter (key#27 > 100) //filter>100MetastoreRelation default, src, None優(yōu)化后:事實(shí)上filter也能夠表達(dá)為一個(gè)復(fù)雜的boolean表達(dá)式
scala> query.queryExecution.optimizedPlan res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27]Filter ((key#27 > 100) && (key#27 > 80)) //合并為1個(gè)MetastoreRelation default, src, None
2.3.2 ?Filter Pushdown?
? Filter Pushdown,過濾器下推。
? 原理就是更早的過濾掉不須要的元素來(lái)降低開銷。
? 給定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")
? 生成的邏輯計(jì)劃為:
scala> scala> query.queryExecution.analyzed res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =? Project [key#31]Filter (key#31 > 100) //先select key, value,然后再FilterProject [key#31,value#32]MetastoreRelation default, src, None?優(yōu)化后的計(jì)劃為:
query.queryExecution.optimizedPlan res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#31]Filter (key#31 > 100) //先f(wàn)ilter,然后再selectMetastoreRelation default, src, None
2.3.3、ColumnPruning
? 列裁剪用的比較多,就是降低不必要select的某些列。? 列裁剪在3種地方能夠用:? 1、在聚合操作中,能夠做列裁剪? 2、在join操作中,左右孩子能夠做列裁剪? 3、合并相鄰的Project的列object ColumnPruning extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform {// Eliminate attributes that are not needed to calculate the specified aggregates.case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => 假設(shè)project的outputSet中減去a.references的元素假設(shè)不同,那么就將Aggreagte的child替換為a.referencesa.copy(child = Project(a.references.toSeq, child))// Eliminate unneeded attributes from either side of a Join.case Project(projectList, Join(left, right, joinType, condition)) =>//?消除join的left 和 right孩子的不必要屬性,將join的左右子樹的列進(jìn)行裁剪// Collect the list of off references required either above or to evaluate the condition.val allReferences: Set[Attribute] =projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)/** Applies a projection only when the child is producing unnecessary attributes */def prunedChild(c: LogicalPlan) =if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {Project(allReferences.filter(c.outputSet.contains).toSeq, c)} else {c}Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))// Combine adjacent Projects.case Project(projectList1, Project(projectList2, child)) => //合并相鄰Project的列// Create a map of Aliases to their values from the child projection.// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).val aliasMap = projectList2.collect {case a @ Alias(e, _) => (a.toAttribute: Expression, a)}.toMap// Substitute any attributes that are produced by the child projection, so that we safely// eliminate it.// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'// TODO: Fix TransformBase to avoid the cast below.val substitutedProjection = projectList1.map(_.transform {case a if aliasMap.contains(a) => aliasMap(a)}).asInstanceOf[Seq[NamedExpression]]Project(substitutedProjection, child)// Eliminate no-op Projectscase Project(projectList, child) if child.output == projectList => child} }分別舉三個(gè)樣例來(lái)相應(yīng)三種情況進(jìn)行說明:1、在聚合操作中,能夠做列裁剪給定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")優(yōu)化前:
res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51,value#52] //優(yōu)化前默認(rèn)select key 和 value兩列MetastoreRelation default, temp_shengli, None優(yōu)化后:
scala> ColumnPruning1(query.queryExecution.analyzed) MetastoreRelation default, temp_shengli, None res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]Project [key#51] //優(yōu)化后。列裁剪掉了value,僅僅select keyMetastoreRelation default, temp_shengli, None
2、在join操作中,左右孩子能夠做列裁剪
給定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b ?on a.key =b.key ")
沒有優(yōu)化之前:
scala> query.queryExecution.analyzed res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [value#42 AS qween#39]Join Inner, Some((key#41 = key#43))Project [key#41,value#42] //這里多select了一列,即valueMetastoreRelation default, temp_shengli, NoneProject [key#43,value#44] //這里多select了一列。即valueMetastoreRelation default, temp_shengli, None優(yōu)化后:(ColumnPruning2是我自己調(diào)試用的)
scala> ColumnPruning2(query.queryExecution.analyzed) allReferences is -> Set(key#35, key#37) MetastoreRelation default, temp_shengli, None MetastoreRelation default, temp_shengli, None res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#35 AS qween#33]Join Inner, Some((key#35 = key#37))Project [key#35] //經(jīng)過列裁剪之后,left Child僅僅須要select key這一個(gè)列MetastoreRelation default, temp_shengli, NoneProject [key#37] //經(jīng)過列裁剪之后。right Child僅僅須要select key這一個(gè)列MetastoreRelation default, temp_shengli, None3、合并相鄰的Project的列,裁剪
給定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ") ?
優(yōu)化前:
scala> query.queryExecution.analyzed res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(c#56 + 1) AS c0#57]Project [(1 + 1) AS c#56]MetastoreRelation default, temp_shengli, None優(yōu)化后:
scala> query.queryExecution.optimizedPlan res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(2 AS c#56 + 1) AS c0#57] //將子查詢里的c 代入到 外層select里的c,直接計(jì)算結(jié)果MetastoreRelation default, temp_shengli, None
三、總結(jié):
? 本文介紹了Optimizer在Catalyst里的作用即將Analyzed Logical Plan 經(jīng)過對(duì)Logical Plan和Expression進(jìn)行Rule的應(yīng)用transfrom,從而達(dá)到樹的節(jié)點(diǎn)進(jìn)行合并和優(yōu)化。當(dāng)中基本的優(yōu)化的策略總結(jié)起來(lái)是合并、列裁剪、過濾器下推幾大類。
? Catalyst應(yīng)該在不斷迭代中,本文僅僅是基于spark1.0.0進(jìn)行研究。興許假設(shè)新增加的優(yōu)化策略也會(huì)在興許補(bǔ)充進(jìn)來(lái)。
? 歡迎大家討論。共同進(jìn)步!
——EOF——
原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明:
轉(zhuǎn)載自:OopsOutOfMemory盛利的Blog。作者:?OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/38121259
注:本文基于署名-非商業(yè)性使用-禁止演繹 2.5 中國(guó)大陸(CC BY-NC-ND 2.5 CN)協(xié)議,歡迎轉(zhuǎn)載、轉(zhuǎn)發(fā)和評(píng)論,可是請(qǐng)保留本文作者署名和文章鏈接。如若須要用于商業(yè)目的或者與授權(quán)方面的協(xié)商,請(qǐng)聯(lián)系我。
總結(jié)
以上是生活随笔為你收集整理的Spark SQL Catalyst源代码分析Optimizer的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: javascript中的console.
- 下一篇: 生产环境中配置的samba