Spark优化篇:RBO/CBO
?????? ? ??在Spark1.0中所有的Catalyst Optimizer都是基于規則 (rule) 優化的。為了產生比較好的查詢規 則,優化器需要理解數據的特性,于是在Spark2.0中引入了基于代價的優化器 (cost-based optimizer),也就是所謂的CBO。然而,CBO也無法解決很多問題,比如:
- ?數據統計信息普遍缺失,統計信息的收集代價較高;
- 儲存計算分離的架構使得收集到的統計信息可能不再準確;
- Spark部署在某一單一的硬件架構上,cost很難被估計;
- Spark的UDF(User-defined Function)簡單易用,種類繁多,但是對于CBO來說是個黑 盒子,無法估計其cost;
總而言之,由于種種限制,Spark的優化器無法產生最好的Plan。
也許你會想:Spark為什么不解決這個問題呢?這里有很多挑戰,比如:?
- 統計信息的缺失,統計信息的不準確,那么就是默認依據文件大小來預估表的大小,但是文件?往往是壓縮的,尤其是列存儲格式,比如parquet?和?ORC,而Spark是基于行處理,如果數據連續重復,file size可能和真實的行存儲的真實大小,差別非常之大。這也是為何提高?autoBroadcastJoinThreshold,即使不是太大也可能會導致out of memory;?
- Filter復雜、UDFs的使用都會使Spark無法準確估計Join輸入數據量的大小。當你的queryplan異常大和復雜的時候,這點尤其明顯;
- 其中,Spark3.0中基于運行期的統計信息,將Sort Merge Join?轉換為Broadcast Hash Join。
基于RBO優化:
left join case
var appSql: String ="""|select| *|from| tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|and t1.id > 5+5""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).explain(mode = "extended")
基于CBO優化:
CBO?優化主要在物理計劃層面,原理是計算所有可能的物理計劃的代價,并挑選出代價最小的物理執行計劃。充分考慮了數據本身的特點(如大小、分布)以及操作算子的特點(中間結果集的分布及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃。
而每個執行節點的代價,分為兩個部分:?
1、該執行節點對數據集的影響,即該節點輸出數據集的大小與分布;
2、該執行節點操作算子的代價。
每個操作算子的代價相對固定,可用規則來描述。而執行節點輸出數據集的大小與分布,分為兩個部分:
1、初始數據集,也即原始表,其數據集的大小與分布可直接通過統計得到;
2、中間節點輸出數據集的大小與分布可由其輸入數據集的信息與操作本身的特點推算。
需要先執行特定的?SQL?語句來收集所需的表和列的統計信息。?
--表級別統計信息
ANALYZE TABLE 表名 COMPUTE STATISTICS
--生成列級別統計信息
ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列 1,列 2,列 3--顯示統計信息
DESC FORMATTED 表名
--顯示列統計信息
DESC FORMATTED 表名 列名
沒有執行 ANALYZE狀態?
執行 ANALYZE后,發現多了很多spark.sql.statistics信息
?
?使用?CBO
通過?"spark.sql.cbo.enabled"?來開啟,默認是?false。配置開啟?CBO?后,CBO?優化器可以基于表和列的統計信息,進行一系列的估算,最終選擇出最優的查詢計劃。比如:Build?側選擇、優化?Join?類型、優化多表?Join?順序等。
|
參數 |
描述 ? ? ?? | 默認值 |
|
spark.sql.cbo.enabled |
true?表示打開,false?表示關閉。 要使用該功能,需確保相關表和列的統計信息已經生成。 |
false |
|
spark.sql.cbo.joinReorder.enabled? |
使用?CBO?來自動調整連續的?inner join?的順序。 true:表示打開,false:表示關閉 要使用該功能,需確保相關表和列的統計信息已經生成,且 CBO?總開關打開。 |
false |
|
spark.sql.cbo.joinReorder.dp.threshold? |
使用?CBO?來自動調整連續?inner join?的表的個數閾值。 如果超出該閾值,則不會調整?join?順序。 |
10 |
???????
def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("CBO").set("spark.sql.cbo.enabled", "true").set("spark.sql.cbo.joinReorder.enabled", "true").setMaster("local[*]")val sparkSession: SparkSession = Util.SparkSession2hive(sparkConf)var appSql: String ="""|select| t1.name,count(1)|from| tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|group by t1.name""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).show()while (true) {}}
123
總結
以上是生活随笔為你收集整理的Spark优化篇:RBO/CBO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sdut算法分析oj题目整合
- 下一篇: CF1550F Jumping Arou