解决spark中遇到的数据倾斜问题
一. 數(shù)據(jù)傾斜的現(xiàn)象
多數(shù)task執(zhí)行速度較快,少數(shù)task執(zhí)行時(shí)間非常長,或者等待很長時(shí)間后提示你內(nèi)存不足,執(zhí)行失敗。
二. 數(shù)據(jù)傾斜的原因
常見于各種shuffle操作,例如reduceByKey,groupByKey,join等操作。
數(shù)據(jù)問題
spark使用問題
三. 數(shù)據(jù)傾斜的后果
一個(gè)理想的分布式程序:?
發(fā)生數(shù)據(jù)傾斜時(shí),任務(wù)的執(zhí)行速度由最大的那個(gè)任務(wù)決定:?
四. 數(shù)據(jù)問題造成的數(shù)據(jù)傾斜
發(fā)現(xiàn)數(shù)據(jù)傾斜的時(shí)候,不要急于提高executor的資源,修改參數(shù)或是修改程序,首先要檢查數(shù)據(jù)本身,是否存在異常數(shù)據(jù)。
找出異常的key
如果任務(wù)長時(shí)間卡在最后最后1個(gè)(幾個(gè))任務(wù),首先要對(duì)key進(jìn)行抽樣分析,判斷是哪些key造成的。
選取key,對(duì)數(shù)據(jù)進(jìn)行抽樣,統(tǒng)計(jì)出現(xiàn)的次數(shù),根據(jù)出現(xiàn)次數(shù)大小排序取出前幾個(gè)
df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個(gè)別數(shù)據(jù)比其他數(shù)據(jù)大上若干個(gè)數(shù)量級(jí),則說明發(fā)生了數(shù)據(jù)傾斜。
經(jīng)過分析,傾斜的數(shù)據(jù)主要有以下三種情況:
解決辦法
第1,2種情況,直接對(duì)數(shù)據(jù)進(jìn)行過濾即可。
第3種情況則需要進(jìn)行一些特殊操作,常見的有以下幾種做法。
舉例:
如果使用reduceByKey因?yàn)閿?shù)據(jù)傾斜造成運(yùn)行失敗的問題。具體操作如下:
tip1: 如果此時(shí)依舊存在問題,建議篩選出傾斜的數(shù)據(jù)單獨(dú)處理。最后將這份數(shù)據(jù)與正常的數(shù)據(jù)進(jìn)行union即可。
tips2: 單獨(dú)處理異常數(shù)據(jù)時(shí),可以配合使用Map Join解決。
五. spark使用不當(dāng)造成的數(shù)據(jù)傾斜
1. 提高shuffle并行度
dataFrame和sparkSql可以設(shè)置spark.sql.shuffle.partitions參數(shù)控制shuffle的并發(fā)度,默認(rèn)為200。?
rdd操作可以設(shè)置spark.default.parallelism控制并發(fā)度,默認(rèn)參數(shù)由不同的Cluster Manager控制。
局限性:?只是讓每個(gè)task執(zhí)行更少的不同的key。無法解決個(gè)別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個(gè)task單獨(dú)執(zhí)行它,也會(huì)受到數(shù)據(jù)傾斜的困擾。
2. 使用map join 代替reduce join
在小表不是特別大(取決于你的executor大小)的情況下使用,可以使程序避免shuffle的過程,自然也就沒有數(shù)據(jù)傾斜的困擾了。
局限性:?因?yàn)槭窍葘⑿?shù)據(jù)發(fā)送到每個(gè)executor上,所以數(shù)據(jù)量不能太大。
具體使用方法和處理流程參照:
Spark map-side-join 關(guān)聯(lián)優(yōu)化
spark join broadcast優(yōu)化
六. MapReduce過程中數(shù)據(jù)傾斜的處理
?
轉(zhuǎn)載于:https://www.cnblogs.com/0xcafedaddy/p/7610613.html
總結(jié)
以上是生活随笔為你收集整理的解决spark中遇到的数据倾斜问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数列
- 下一篇: (59)Linux操作系统深入应用