spark优化总结
2019獨角獸企業重金招聘Python工程師標準>>>
1. RDD
1.1 RDD持久化
????對多次使用的RDD進行持久化。此時Spark就會根據你的持久化策略,將RDD中的數據保存到內存或者磁盤中。以后每次對這個RDD進行算子操作時,都會直接從內存或磁盤中提取持久化的RDD數據,然后執行算子,而不會從源頭處重新計算一遍這個RDD。
1.2 調整RDD partition數
????spark一般按照HDFS上加載block個數決定并行度,也就是為每個block(如128M)創建一個RDDpartition,這個數據量在很多時候是偏多的;而當我們使用filter算子過濾掉較多數據后,后續的計算并行度不變這在有時又是偏高的。
????spark提供了兩種方法對操作并行度進行調優。第一種,在數據混洗時使用參數為混洗后的RDD制定并行度;第二種,對于已有的RDD可進行重新分區以獲取更多或更少的分區。repartition()會把RDD隨機打亂并重新分成制定分區數。如果要減少RDD可使用coalesce()不打亂數據更高效。
????其他方法手動textFile()、parallelize()等方法的第二個參數來設置并行度,也可以使用spark.default.parallelism參數,來設置統一的并行度
2. shuffle
????shuffle過程中上游的task會根據所處理數據的KEY,來產生多個不同KEY值的中間文件。下游的task根據自己所要處理的KEY值,去上游拉取相應的中間文件。這個過程中涉及到大量的磁盤IO以及大量的網絡傳輸;且下游的某些task拉取到比其他task多得多的數據,導致數據傾斜從而產生嚴重的性能問題。
2.1 map join消除shuffle
????傳統的join操作會導致shuffle操作導致性能問題。如果join兩端的數據一個大一個小,則可以廣播小RDD全量數據,driver和每個Executor內存中都會駐留一份小RDD的全量數據。此時就不會發生shuffle操作,也就不會發生數據傾斜。
2.2 預聚合減少shuffle量
????map-side預聚合指上游task對相同的key進行一次聚合,類似于MapReduce中的本地combiner。預聚合之后每個task下每個key都只有一條,大大減少下游task要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。
????通常來說,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,全量的中間數據各個節點之間分發和傳輸,性能相對較差。
2.3 復用shuffle中間文件
????默認情況下shuffle中間文件數目為map tasks * reduce tasks,通過設置spark.shuffle.consolidateFiles為true,來合并shuffle中間文件。每個計算核心只產生一批文件,同一個核心后面運行的task復用前面task產生的中間文件,此時文件數為cores * reduce tasks數目,大大減少了數據拉取的文件數。
????設置方法:new SparkConf().set("spark.shuffle.consolidateFiles", "true")
2.4?spark.local.dirs多目錄
????spark需要使用本地磁盤存儲數據混洗的中間數據,以及溢寫到磁盤總的RDD分區數據。因此設置更多的本地磁盤會分擔IO壓力,提升IO吞吐量效率。
????獨立模式下修改spark-env.sh文件中的SPARK_LOCAL_DIRSyarn、mesos模式下修改spark.local.dir來實現
2.5?使用kryo序列化
????當spark需要進行網絡傳輸數據或將數據溢寫到磁盤的時候,spark需要將數據序列化成二進制格式。默認情況下spark會使用java的內建序列化庫,spark也支持地方三房的序列化庫Kryo,提供比java序列化工具更短的序列化時間和更高的壓縮比。
第一步: 在sparkConf中設置一個屬性。
set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")第二步: 注冊你使用到的,需要通過Kryo序列化,一些自定義的類。
.registerKryoClasses(new Class[]{CategorySortKey.class});Kryo不是默認序列化類庫的原因:如果要達到它的最佳性能的話,那么就一定要注冊你自定義的類(比如,你的算子函數中使用到了外部自定義類型的對象變量,這時就要求必須注冊你的類,否則達不到最佳性能)
2.6 提升shuffle并行度
????增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。但該方法只是緩解了數據傾斜而已,沒有徹底根除問題。
3. 分配資源
3.1 num-executors
參數說明:該參數用于設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在各個工作節點啟動相應數量的Executor。如果不設置的話,默認只會給你啟動少量的Executor進程,此時Spark作業的運行速度是非常慢的。
參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,隊列其他任務可能無法給予充分的資源。
3.2 executor-memory
參數說明:該參數用于設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。
參數調優建議:每個Executor進程的內存設置4G~8G較為合適。num-executors乘以executor-memory就代表Spark作業申請到的總內存量,這個量是不能超過隊列的最大內存量的。此外,如果與其他人共享這個資源隊列,為避免獨占則申請的總內存量最好不要超過資源隊列總內存的1/3~1/2。
3.3 executor-cores
參數說明:該參數用于設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程并行執行task線程的能力。
參數調優建議:Executor的CPU core數量設置為2~4個較為合適。num-executors * executor-cores就代表了你的Spark作業申請到的總CPU核心數,這個量是不能超過隊列的最大核心數。此外,如果是跟他人共享這個隊列,不要超過隊列總CPU core的1/3~1/2。
3.4 driver-memory
參數說明:該參數用于設置Driver進程的內存。
參數調優建議:Driver的內存通常來說不設置,或者設置1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那么必須確保Driver的內存足夠大,否則會出現OOM內存溢出的問題。
3.5 spark.default.parallelism
參數說明:該參數用于設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
參數調優建議:Spark作業的默認task數量為500~1000個較為合適。若不去設置這個參數,那么Spark會根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。通常來說,Spark默認設置的數量是偏少的。? 如果task數量偏少的話,就會導致設置好的Executor的參數都前功盡棄,無論Executor有多少個,內存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那么設置1000個task是可以的。
3.6 spark.storage.memoryFraction
參數說明:該參數用于設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那么這個參數的值適當降低一些比較合適。此外,如果發現作業由于頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
3.7 spark.shuffle.memoryFraction
參數說明:該參數用于設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由于頻繁的gc導致運行緩慢,意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
資源參數的調優,沒有一個固定的值,需要同學們根據自己的實際情況(包括Spark作業中的shuffle操作數量、RDD持久化操作數量以及spark web ui中顯示的作業gc情況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。
4. 計算邏輯
4.1 mapPartitions
mapPartitions、foreachPartitions等算子替代map、foreach,函數一次調用批量處理一個partition的數據,而不是一次函數調用處理一條,性能相對來說會高一些。但要注意使用mapPartitions類算子易出現OOM(內存溢出)的問題,因為垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。
轉載于:https://my.oschina.net/puwenchao/blog/1554473
總結
- 上一篇: 多层交换机配置模式
- 下一篇: LINQ Order by 排序