Spark小文件合并
1.問題描述
最近使用spark sql執行etl時候出現了,最終結果大小只有幾百k,但是小文件一個分區有上千的情況。
危害:
2.解決方法
方法一:通過spark的coalesce()方法和repartition()方法
val rdd2 = rdd1.coalesce(8, true) (true表示是否shuffle)
val rdd3 = rdd1.repartition(8)
說明:
coalesce:coalesce()方法的作用是返回指定一個新的指定分區的Rdd,
如果是生成一個窄依賴的結果,那么可以不發生shuffle,
分區的數量發生激烈的變化,計算節點不足,不設置true可能會出錯。
repartition:coalesce()方法shuffle為true的情況。
但是由于使用的是同事直接寫好的模塊,改新增函數相對比較麻煩,所以作為后手。
方法二:降低spark并行度,即調節spark.sql.shuffle.partitions
比如之前設置的為100,按理說應該生成的文件數為100;
但是由于業務比較特殊,采用的大量的union all,且union all在spark中屬于窄依賴,
不會進行shuffle,所以導致最終會生成(union all數量+1)*100的文件數。
如有10個union all,會生成1100個小文件。
這樣導致降低并行度為10之后,執行時長大大增加,且文件數依舊有110個,效果有,但是不理想。
方法三:新增一個并行度=1任務,專門合并小文件。
先將原來的任務數據寫到一個臨時分區(如tmp);
再起一個并行度為1的任務,類似:
insert overwrite 目標表 select * from 臨時分區
但是結果小文件數還是沒有減少,略感疑惑;
經過多次測后發現原因:‘select * from 臨時分區’ 這個任務在spark中屬于窄依賴;
并且spark DAG中分為寬依賴和窄依賴,只有寬依賴會進行shuffle;
故并行度shuffle,spark.sql.shuffle.partitions=1也就沒有起到作用;
由于數據量本身不是特別大,所以直接采用了group by(在spark中屬于寬依賴)的方式,類似:
insert overwrite 目標表 select * from 臨時分區 group by *
先運行原任務,寫到tmp分區,‘dfs -count’查看文件數,1100個,運行加上group by的臨時任務(spark.sql.shuffle.partitions=1),查看結果目錄,文件數=1,成功。
最后又加了個刪除tmp分區的任務。
2.結論
方便的話,可以采用coalesce()方法和repartition()方法
如果任務邏輯簡單,數據量少,可以直接降低并行度
任務邏輯復雜,數據量很大,原任務大并行度計算寫到臨時分區,再加兩個任務:
一個用來將臨時分區的文件用小并行度(加寬依賴)合并成少量文件到實際分區;
另一個刪除臨時分區;
hive任務減少小文件相對比較簡單,可以直接設置參數,如:
Map-only的任務結束時合并小文件:
sethive.merge.mapfiles = true
在Map-Reduce的任務結束時合并小文件:
sethive.merge.mapredfiles= true
當輸出文件的平均大小小于1GB時,啟動一個獨立的map-reduce任務進行文件merge:
sethive.merge.smallfiles.avgsize=1024000000
總結
以上是生活随笔為你收集整理的Spark小文件合并的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: viper4android 3D,中国设
- 下一篇: struct sockaddr和stru