spark shuffle再补充
?
shuffle概覽
一個spark的RDD有一組固定的分區組成,每個分區有一系列的記錄組成。對于由窄依賴變換(例如map和filter)返回的RDD,會延續父RDD的分區信息,以pipeline的形式計算。每個對象僅依賴于父RDD中的單個對象。諸如coalesce之類的操作可能導致任務處理多個輸入分區,但轉換仍然被認為是窄依賴的,因為一個父RDD的分區只會被一個子RDD分區繼承。
Spark還支持寬依賴的轉換,例如groupByKey和reduceByKey。在這些依賴項中,計算單個分區中的記錄所需的數據可以來自于父數據集的許多分區中。要執行這些轉換,具有相同key的所有元組必須最終位于同一分區中,由同一任務處理。為了滿足這一要求,Spark產生一個shuffle,它在集群內部傳輸數據,并產生一個帶有一組新分區的新stage。
可以看下面的代碼片段:
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()上面的代碼片段只有一個action操作,count,從輸入textfile到action經過了三個轉換操作。這段代碼只會在一個stage中運行,因為,三個轉換操作沒有shuffle,也即是三個轉換操作的每個分區都是只依賴于它的父RDD的單個分區。
但是,下面的單詞統計就跟上面有很大區別:
val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_,?1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >=?1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_,?1)).reduceByKey(_ + _) charCounts.collect()這段代碼里有兩個reducebykey操作,三個stage。
下面圖更復雜,因為有一個join操作:
粉框圈住的就是整個DAG的stage劃分。
在每個stage的邊界,父stage的task會將數據寫入磁盤,子stage的task會將數據通過網絡讀取。由于它們會導致很高的磁盤和網絡IO,所以shuffle代價相當高,應該盡量避免。父stage的數據分區往往和子stage的分區數不同。觸發shuffle的操作算子往往可以指定分區數的,也即是numPartitions代表下個stage會有多少個分區。就像mr任務中reducer的數據是非常重要的一個參數一樣,shuffle的時候指定分區數也將在很大程度上決定一個應用程序的性能。
優化shuffle
通常情況可以選擇使用產生相同結果的action和transform相互替換。但是并不是產生相同結果的算子就會有相同的性能。通常避免常見的陷阱并選擇正確的算子可以顯著提高應用程序的性能。
當選擇轉換操作的時候,應最小化shuffle次數和shuffle的數據量。shuffle是非常消耗性能的操作。所有的shuffle數據都會被寫入磁盤,然后通過網絡傳輸。repartition , join, cogroup, 和 ?*By 或者 *ByKey 類型的操作都會產生shuffle。我們可以對一下幾個操作算子進行優化:
1. groupByKey某些情況下可以被reducebykey代替。
2. reduceByKey某些情況下可以被 aggregatebykey代替。
3. flatMap-join-groupBy某些情況下可以被cgroup代替。
?
no shuffle
在某些情況下,前面描述的轉換操作不會導致shuffle。當先前的轉換操作已經使用了和shuffle相同的分區器分區數據的時候,spark就不會產生shuffle。
舉個例子:
rdd1?= someRdd.reduceByKey(...)rdd2?= someOtherRdd.reduceByKey(...)rdd3?= rdd1.join(rdd2)由于使用redcuebykey的時候沒有指定分區器,所以都是使用的默認分區器,會導致rdd1和rdd2都采用的是hash分區器。兩個reducebykey操作會產生兩個shuffle過程。如果,數據集有相同的分區數,執行join操作的時候就不需要進行額外的shuffle。由于數據集的分區相同,因此rdd1的任何單個分區中的key集合只能出現在rdd2的單個分區中。因此,rdd3的任何單個輸出分區的內容僅取決于rdd1中單個分區的內容和rdd2中的單個分區,并且不需要第三個shuffle。
例如,如果someRdd有四個分區,someOtherRdd有兩個分區,而reduceByKeys都使用三個分區,運行的任務集如下所示:
如果rdd1和rdd2使用不同的分區器或者相同的分區器不同的分區數,僅僅一個數據集在join的過程中需要重新shuffle
?
在join的過程中為了避免shuffle,可以使用廣播變量。當executor內存可以存儲數據集,在driver端可以將其加載到一個hash表中,然后廣播到executor。然后,map轉換可以引用哈希表來執行查找。
增加shuffle
有時候需要打破最小化shuffle次數的規則。
當增加并行度的時候,額外的shuffle是有利的。例如,數據中有一些文件是不可分割的,那么該大文件對應的分區就會有大量的記錄,而不是說將數據分散到盡可能多的分區內部來使用所有已經申請cpu。在這種情況下,使用reparition重新產生更多的分區數,以滿足后面轉換算子所需的并行度,這會提升很大性能。
使用reduce和aggregate操作將數據聚合到driver端,也是修改區數的很好的例子。
在對大量分區執行聚合的時候,在driver的單線程中聚合會成為瓶頸。要減driver的負載,可以首先使用reducebykey或者aggregatebykey執行一輪分布式聚合,同時將結果數據集分區數減少。實際思路是首先在每個分區內部進行初步聚合,同時減少分區數,然后再將聚合的結果發到driver端實現最終聚合。典型的操作是treeReduce 和 treeAggregate。
當聚合已經按照key進行分組時,此方法特別適用。例如,假如一個程序計算語料庫中每個單詞出現的次數,并將結果使用map返回到driver。一種方法是可以使用聚合操作完成在每個分區計算局部map,然后在driver中合并map??梢杂胊ggregateByKey以完全分布的方式進行統計,然后簡單的用collectAsMap將結果返回到driver。
?
轉自:https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzA3MDY0NTMxOQ==&scene=124#wechat_redirect
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的spark shuffle再补充的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _22 _创建DataFra
- 下一篇: 阿里云Spark Shuffle的优化