计算质数通过分区(Partition)提高Spark的运行性能(转载+自己理解)
這篇博客是對[1]的進一步詳細描述
自己的配置是臺式機一臺+筆記本組成spark集群
#---------------------------------------------------------任務目標------------------------------------------------------------------------------------------------
查找質數
比如我們需要從2到2000000之間尋找所有的質數。我們很自然地會想到先找到所有的非質數,剩下的所有數字就是我們要找的質數。
質數的定義:
除了1和自己,沒有其他因子。
#---------------------------------------------------------查找非質數的算法原理舉例-------------------------------------------------------------------------------------------
當n=20
var composite1=composite.map(x => (x, (2 to (n / x))))得到:
Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((2,Range(2, 3, 4, 5, 6, 7, 8, 9, 10)),
(3,Range(2, 3, 4, 5, 6)),
(4,Range(2, 3, 4, 5)),
(5,Range(2, 3, 4)),
(6,Range(2, 3)),
(7,Range(2)),
(8,Range(2)),
(9,Range(2)),
(10,Range(2)),
(11,Range()),
(12,Range()),
(13,Range()),
(14,Range()),
(15,Range()),
(16,Range()),
(17,Range()),
(18,Range()),
(19,Range()),
(20,Range()))
然后執行下面一句:
?var composite2=composite1.flatMap(kv => kv._2.map(_ * kv._1))#這個意思是,上面的結果中,第一個元素和Range里面的每一個數字遍歷相乘,得到的結果都是合數
composite2.collect()
得到合數:
Array[Int] = Array(4, 6, 8, 10, 12, 14, 16, 18, 20, 6, 9, 12, 15, 18, 8, 12, 16, 20, 10, 15, 20, 12, 18, 14, 16, 18, 20)
最后我們從整個數據集中刪除合數,得到的就是質數了:
scala> var prime = sc.parallelize(2 to n, 8).subtract(composite2)
prime: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at subtract at <console>:28
scala> prime.collect()
res3: Array[Int] = Array(17, 2, 3, 19, 11, 5, 13, 7)
實驗結果:
我們最終需要的質數就是(17, 2, 3, 19, 11, 5, 13, 7)
#-------------------------------------------------------------------具體實驗步驟-----------------------------------------------------------------------------------------
spark-shell --master yarn
val n = 2000000 var composite = sc.parallelize(2 to n, 8)//意思是從2-2000000,partitions數量設定為8 composite=composite.map(x => (x, (2 to (n / x)))).flatMap(kv => kv._2.map(_ * kv._1)) //composite是在計算非質數 var prime = sc.parallelize(2 to n, 8).subtract(composite)//從原始數據集中去除非質數,剩下的就是質數了 prime.collect()//從集群中把質數收集回master,然后在master端的spark-shell中打印結果。Desktop->ApplicationMaster跳轉到下面的界面
?
#############################分析下DAG圖#################################################
點擊Jobs->Description下面的任意一個
通過上面的對應關系可以看到,每一個stage都對應下方的部分代碼
###############################分析下stage0###############################################
###############################分析下stage1###############################################
會發現其中一個stack的執行時間嚴重失去平衡,是由slave(Laptop)執行的,
我們去看下頁面的Executors標簽:
可以看到由于前一張圖Slave(Laptop)中task占據了大量時間而導致整體運行時間拖后腿,Laptop(slave)比Desktop(master)多花了6s
?
注意:
實際運行集群時,每個stage的task的運行時間都要像上面一樣看過,才能判斷到底是哪個stage出現了data skew的問題,因為stage0的輸出是stage1的輸入,stage0的輸入即使是數據均衡的,你并不能保證這個stage0的輸出就是數據均衡的,
當stage0輸出不均衡,那么stage1的輸入就會不均衡(data skew現象)
#################################該版本代碼耗時計算######################################################
再次運行(所以某些數據可能與上面的不太一致,每次運行時間有波動是正常的,不要在意)
整個集群的運行時間是三個stage的耗時進行求和:9+9+1=19s
####################################數據處理在task中不平衡的原因分析#######################################
這里稍微不太好理解,需要首先理解文章開頭的例子,然后來理解[1]中作者的原話:
當我們運行 ?sc.parallelize(2 to n, 8)??語句的時候,Spark使用分區機制將數據很好地分成8個組。它最有可能使用的是range partitioner,也就是說2-250000被分到第一個分區; 250001-500000分到第二個分區等等。然而我們的map函數將這些數轉成(key,value)pairs,而value里面的數據大小變化很大(key比較小的時候,value的值就比較多,從而也比較大)。每個value都是一個list,里面存放著我們需要乘上key并小于2000000的倍數值,有一半以上的鍵值對(所有key大于1000000)的value是空的;而key等于2對應的value是最多的,包含了所有從2到1000000的數據!這就是為什么第一個分區擁有幾乎所有的數據,它的計算花費了最多的時間;而最后四個分區幾乎沒有數據!
?
####################################代碼修改如下############################################################
val n = 2000000 val composite = sc.parallelize(2 to n, 8).map(x => (x, (2 to (n / x)))).repartition(8).flatMap(kv => kv._2.map(_ * kv._1)) val prime = sc.parallelize(2 to n, 8).subtract(composite) prime.collect()其實是對stage1進行了修改
運行結果如下:
運行時間為17s,比之前快了兩秒
Reference:
[1]計算質數通過分區(Partition)提高Spark的運行性能
總結
以上是生活随笔為你收集整理的计算质数通过分区(Partition)提高Spark的运行性能(转载+自己理解)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦幻模拟战有单机版吗
- 下一篇: spark中stage的划分与宽依赖/窄