阿里云Spark Shuffle的优化
轉自:大數據技術與架構
本次分享者:辰石,來自阿里巴巴計算平臺事業部EMR團隊技術專家,目前從事大數據存儲以及Spark相關方面的工作。
-
Spark Shuffle介紹
-
Smart Shuffle設計
-
性能分析
Spark Shuffle流程
-
Spark 0.8及以前 Hash Based Shuffle?
-
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制?
-
Spark 0.9 引入ExternalAppendOnlyMap?
-
Spark 1.1 引入Sort Based Shuffle,但默認仍為Hash Based Shuffle?
-
Spark 1.2 默認的Shuffle方式改為Sort Based Shuffle?
-
Spark 1.4 引入Tungsten-Sort Based Shuffle?
-
Spark 1.6 Tungsten-sort并入Sort Based Shuffle?
-
Spark 2.0 Hash Based Shuffle退出歷史舞臺
總結一下, 就是最開始的時候使用的是 Hash Based Shuffle, 這時候每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M x R ,其中M是Map的個數,R是Reduce的個數。這樣會產生大量的小文件,對文件系統壓力很大,而且也不利于IO吞吐量。后面忍不了了就做了優化,把在同一core上運行的多個Mapper 輸出的合并到同一個文件,這樣文件數目就變成了 cores R 個了。
Spark Shuffle實現
Sort-based shuffle介紹
這個方式的選擇是在org.apache.spark.SparkEnv完成的:
// Let the user specify short names forshuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //獲得Shuffle Manager的type,sort為默認
val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)
Hashbased shuffle的每個mapper都需要為每個reducer寫一個文件,供reducer讀取,即需要產生M x R個數量的文件,如果mapper和reducer的數量比較大,產生的文件數會非常多。Hash based shuffle設計的目標之一就是避免不需要的排序(Hadoop Map Reduce被人詬病的地方,很多不需要sort的地方的sort導致了不必要的開銷)。但是它在處理超大規模數據集的時候,產生了大量的DiskIO和內存的消耗,這無疑很影響性能。
Hash based shuffle也在不斷的優化中,正如前面講到的Spark 0.8.1引入的file consolidation在一定程度上解決了這個問題。為了更好的解決這個問題,Spark 1.1 引入了Sort based shuffle。首先,每個Shuffle Map Task不會為每個Reducer生成一個單獨的文件;相反,它會將所有的結果寫到一個文件里,同時會生成一個index文件,Reducer可以通過這個index文件取得它需要處理的數據。
避免產生大量的文件的直接收益就是節省了內存的使用和順序Disk IO帶來的低延時。節省內存的使用可以減少GC的風險和頻率。而減少文件的數量可以避免同時寫多個文件對系統帶來的壓力。
目前writer的實現分為三種, 分為 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。
SortShuffleManager只有BlockStoreShuffleReader這一種ShuffleReader。
Spark-shuffle存在的問題
同步操作
Shuffle數據只有等map task任務結束后可能會觸發多路歸并生成最終數據。
大量的磁盤IO
Shuffle的數據在Merge階段存在大量的磁盤讀寫IO,在sort-merge階段對磁盤IO帶寬要求較高。
計算與網絡的串行
Task任務計算和網絡IO的串行操作。
Smart Shuffle
shuffle數據的pipeline
shuffle數據在map端累積到一定數量發送到reduce端。
避免不必要的網絡IO
根據partition數量的位置,可以調度該reduce任務到相應的節點。
計算和網絡IO的異步化
shuffle數據的生成和shuffle數據的發送可以并行執行。
避免sort-merge減少磁盤IO
shuffle數據是按照partition進行分區,shuffle數據無需sort-merge
Smart Shuffle使用
-
配置spark.shuffle.manager :?org.apache.spark.shuffle.hash.HashShuffleManager
-
配置spark.shuffle.smart.spill.memorySizeForceSpillThreshold:控制shuffle數據占用內存的大小,默認為128M
-
配置spark.shuffle.smart.transfer.blockSize:控制shuffle在網絡傳輸數據塊的大小
性能分析
硬件及軟件資源:
TPC-DS性能:
Smart shuffle TPC-DS性能提升28%:
-
Smart shuffle沒有打來單個query性能的下降
-
單個query最大能夠帶來最大2倍的性能提升
提取Q2和Q49查詢性能分析:
-
Q2在兩種shuffle性能保持一致
-
Q49在Smart shuffle下性能有很大提升
單個查詢對比:
左側為sorted shuffle,右邊為smart shuffle。?Q2查詢相對簡單,shuffle數據也比較少,smart shuffle性能保持不變。
Q2 CPU對比:?左側為sorted shuffle,右側是smart shuffle
磁盤對比:
左側為sorted shuffle,右側是smart shuffle
總結
以上是生活随笔為你收集整理的阿里云Spark Shuffle的优化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark shuffle再补充
- 下一篇: 大数据在未来十年将如何发展