3万字细品数据倾斜(建议收藏)
一、前言
1.1?緒論
數據傾斜是大數據領域繞不開的攔路虎,當你所需處理的數據量到達了上億甚至是千億條的時候,數據傾斜將是橫在你面前一道巨大的坎。
邁的過去,將會海闊天空!邁不過去,就要做好準備:很可能有幾周甚至幾月都要頭疼于數據傾斜導致的各類詭異的問題。
1.2 鄭重聲明
話題比較大,技術要求也比較高,筆者盡最大的能力來寫出自己的理解,寫的不對和不好的地方大家一起交流。
有些例子不是特別嚴謹,一些小細節對文章理解沒有影響,不要太在意。(比如我在算機器內存的時候,就不把Hadoop自身的進程算到使用內存中)
1.3 文章結構
1.先大致解釋一下什么是數據傾斜
2.再根據幾個場景來描述一下數據傾斜產生的情況
3.詳細分析一下在Hadoop和Spark中產生數據傾斜的原因
4.如何解決(優化)數據傾斜問題?
二、什么是數據傾斜
簡單的講,數據傾斜就是我們在計算數據的時候,數據的分散度不夠,導致大量的數據集中到了集群中的一臺或者幾臺機器上計算,而集群中的其他節點空閑。這些傾斜了的數據的計算速度遠遠低于平均計算速度,導致整個計算過程過慢。
2.1 關鍵字:數據傾斜
相信大部分做數據的童鞋們都會遇到數據傾斜,數據傾斜會發生在數據開發的各個環節中,比如:
1.用Hive算數據的時候reduce階段卡在99.99%
2.用SparkStreaming做實時算法時候,一直會有executor出現OOM的錯誤,但是其余的executor內存使用率卻很低。
3.這些問題經常會困擾我們,辛辛苦苦等了幾個小時的數據就是跑不出來,心里多難過啊。
2.2 關鍵字:千億級
為什么要突出這么大數據量?先說一下筆者自己最初對數據量的理解:
數據量大就了不起了?數據量少,機器也少,計算能力也是有限的,因此難度也是一樣的。憑什么數據量大就會有數據傾斜,數據量小就沒有?
這樣理解也有道理,但是比較片面,舉兩個場景來對比:
公司一:總用戶量1000萬,5臺64G內存的的服務器。
公司二:總用戶量10億,1000臺64G內存的服務器。
兩個公司都部署了Hadoop集群。假設現在遇到了數據傾斜,發生什么?
1.公司一的數據分析師在做join的時候發生了數據傾斜,會導致有幾百萬用戶的相關數據集中到了一臺服務器上,幾百萬的用戶數據,說大也不大,正常字段量的數據的話64G還是能輕松處理掉的。
2.公司二的數據分析師在做join的時候也發生了數據傾斜,可能會有1個億的用戶相關數據集中到了一臺機器上了(相信我,這很常見)。這時候一臺機器就很難搞定了,最后會很難算出結果。
三、數據傾斜長什么樣
下面會分幾個場景來描述一下數據傾斜的特征,方便讀者辨別。由于Hadoop和Spark是最常見的兩個計算平臺,下面就以這兩個平臺說明。
3.1 Hadoop中的數據傾斜
3.1.1 概述
Hadoop中直接貼近用戶使用使用的時Mapreduce程序和Hive程序,雖說Hive最后也是用MR來執行(至少目前Hive內存計算并不普及),但是畢竟寫的內容邏輯區別很大,一個是程序,一個是Sql,因此這里稍作區分。
3.1.2 表現
Hadoop中的數據傾斜主要表現在、Reduce階段卡在99.99%,一直不能結束。
這里如果詳細的看日志或者和監控界面的話會發現:
有一個多幾個Reduce卡住
各種container報錯OOM
異常的Reducer讀寫的數據量極大,至少遠遠超過其它正常的Reducer
伴隨著數據傾斜,會出現任務被kill等各種詭異的表現。
3.1.2 經驗
Hive的數據傾斜,一般都發生在Sql中group by和join on上,而且和數據邏輯綁定比較深。
3.2 Spark中的數據傾斜
Spark中的數據傾斜也很常見,這里包括Spark Streaming和Spark Sql,表現主要有下面幾種:
Executor lost,OOM,Shuffle過程出錯
Driver OOM
單個Executor執行時間特別久,整體任務卡在某個階段不能結束
正常運行的任務突然失敗
注意,在Spark streaming程序中,數據傾斜更容易出現,特別是在程序中包含一些類似sql的join、group這種操作的時候。?因為Spark Streaming程序在運行的時候,我們一般不會分配特別多的內存,因此一旦在這個過程中出現一些數據傾斜,就十分容易造成OOM。
四、數據傾斜的原理
4.1 數據傾斜產生原因概述
我們以Spark和Hive的使用場景為例。
他們在做數據運算的時候會涉及到,count distinct、group by、join on等操作,這些都會觸發Shuffle動作。一旦觸發Shuffle,所有相同key的值就會被拉到一個或幾個Reducer節點上,容易發生單點計算問題,導致數據傾斜。
一般來說,數據傾斜原因有以下幾方面:
key分布不均勻
業務數據本身的特性
建表時考慮不周
某些SQL語句本身就有數據傾斜
4.2 Shuffle與數據傾斜
Hadoop和Spark在Shuffle過程中產生數據傾斜的原理基本類似即數據不均勻。如下圖:
大部分數據傾斜的原理就類似于上圖,很明了,因為數據分布不均勻,導致大量的數據分配到了一個節點。
4.3 數據本身與數據傾斜
我們舉一個例子,就說數據默認值的設計吧,假設我們有兩張表:
user(用戶信息表):userid,register_ip
ip(IP表):ip,register_user_cnt
這可能是兩個不同的人開發的數據表。如果我們的數據規范不太完善的話,會出現一種情況:
user表中的register_ip字段,如果獲取不到這個信息,我們默認為null;
但是在ip表中,我們在統計這個值的時候,為了方便,我們把獲取不到ip的用戶,統一認為他們的ip為0。
兩邊其實都沒有錯的,但是一旦我們做關聯了,這個任務會在做關聯的階段,也就是sql的on的階段卡死。
4.4 業務邏輯與數據傾斜
數據往往和業務是強相關的,業務的場景直接影響到了數據的分布。
再舉一個例子,比如就說訂單場景吧,我們在某一天在北京和上海兩個城市多了強力的推廣,結果可能是這兩個城市的訂單量增長了10000%,其余城市的數據量不變。
然后我們要統計不同城市的訂單情況,這樣,一做group操作,可能直接就數據傾斜了。
五、 解決數據傾斜思路
5.1 概述
數據傾斜的產生是有一些討論的,解決它們也是有一些討論的,本章會先給出幾個解決數據傾斜的思路,然后對Hadoop和Spark分別給出一些解決數據傾斜的方案。
注意:?很多數據傾斜的問題,都可以用和平臺無關的方式解決,比如更好的數據預處理, 異常值的過濾等,因此筆者認為,解決數據傾斜的重點在于對數據設計和業務的理解,這兩個搞清楚了,數據傾斜就解決了大部分了。
5.2 解決思路
解決數據傾斜有這幾個思路:
5.2.1 業務邏輯
我們從業務邏輯的層面上來優化數據傾斜,比如上面的兩個城市做推廣活動導致那兩個城市數據量激增的例子,我們可以單獨對這兩個城市來做count,單獨做時可用兩次MR,第一次打散計算,第二次再最終聚合計算。完成后和其它城市做整合。
5.2.2 程序層面
比如說在Hive中,經常遇到count(distinct)操作,這樣會導致最終只有一個Reduce任務。
我們可以先group by,再在外面包一層count,就可以了。比如計算按用戶名去重后的總用戶量:
// 優化前 只有一個reduce,先去重再count負擔比較大:
select?name,count(distinct?name)from?user;//優化后
// 設置該任務的每個job的reducer個數為3個。Hive默認-1,自動推斷。
set?mapred.reduce.tasks=3;// 啟動兩個job,一個負責子查詢(可以有多個reduce),另一個負責count(1):
select?count(1)?from?(select?name?from?user?group?by?name)?tmp;5.2.3 調參方面
Hadoop和Spark都自帶了很多的參數和機制來調節數據傾斜,合理利用它們就能解決大部分問題。
5.3 從業務和數據上解決數據傾斜
很多數據傾斜都是在數據的使用上造成的。我們舉幾個場景,并分別給出它們的解決方案。
數據分布不均勻:
前面提到的“從數據角度來理解數據傾斜”和“從業務計角度來理解數據傾斜”中的例子,其實都是數據分布不均勻的類型,這種情況和計算平臺無關,我們能通過設計的角度嘗試解決它。
有損的方法:
找到異常數據,比如ip為0的數據,過濾掉
無損的方法:
對分布不均勻的數據,單獨計算
先對key做一層hash,先將數據隨機打散讓它的并行度變大,再匯集
數據預處理
六、MR解決數據傾斜具體方法
6.1 大量相同key沒有combine就傳到Reducer
combiner函數
思想:提前在map進行combine,減少傳輸的數據量
在Mapper加上combiner相當于提前進行reduce,即把一個Mapper中的相同key進行了聚合,減少shuffle過程中傳輸的數據量,以及Reducer端的計算量。
如果導致數據傾斜的key 大量分布在不同的mapper的時候,這種方法就不是很有效了。
6.2 導致數據傾斜的key 大量分布在不同的mapper
局部聚合加全局聚合。
第一次在map階段對那些導致了數據傾斜的key 加上1到n的隨機前綴,這樣本來相同的key 也會被分到多個Reducer中進行局部聚合,數量就會大大降低。
第二次mapreduce,去掉key的隨機前綴,進行全局聚合。
思想:二次mr,第一次將key隨機散列到不同reducer進行處理達到負載均衡目的。第二次再根據去掉key的隨機前綴,按原key進行reduce處理。
該方法進行兩次mapreduce:
這個方法進行兩次mapreduce,性能稍差。
增加Reducer
思想:增加Reducer,提升并行度
JobConf.setNumReduceTasks(int)
實現custom partitioner
思想:根據數據分布情況,自定義散列函數,將key均勻分配到不同Reducer
七、Hive解決數據傾斜具體方法
7.1 場景
7.1.1 group by
注:group by 優于distinct group
情形:group by 維度過小,某值的數量過多
后果:處理某值的reduce非常耗時
解決方式:采用sum() group by的方式來替換count(distinct)完成計算。
7.1.2 count(distinct)
count(distinct xx)
情形:某特殊值過多
后果:處理此特殊值的reduce耗時;只有一個reduce任務
解決方式:count distinct時,將值為空的情況單獨處理,比如可以直接過濾空值的行,在最后結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
7.1.3 join
情形1:小表與大表join,但較小表key集中
后果:shuffle分發到某一個或幾個Reducer上的數據量遠高于平均值。想象極端情況,小表的join列全部為一個值,那么shuffle后全部到一個Reducer節點,其他節點無負載。這就是極端的數據傾斜了。
解決方式:mapjoin
情形2:大表與大表join,但是分桶的判斷字段0值或空值過多
后果:這些空值/0值都由一個Reducer處理,非常慢
解決方式:把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由于null值關聯不上,處理后并不影響最終結果。
7.1.4 不同數據類型關聯產生數據傾斜
情形:比如用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時。
后果:處理此特殊值的reduce耗時;只有一個reduce任務
默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
解決方式:把數字類型轉換成字符串類型
select?*?from?users?aleft?outer?join?logs?bon?a.usr_id?=?cast(b.user_id?as?string)7.2 調優
7.2.1 hive.map.aggr=true
#?開啟map端combiner set?hive.map.aggr=true;思想
開啟map combiner。在map中會做部分聚集操作,效率更高但需要更多的內存。
點評
假如map各條數據基本上不一樣, 聚合沒什么意義,做combiner反而畫蛇添足,hive里也考慮的比較周到通過參數:
hive.groupby.mapaggr.checkinterval?= 100000 (默認)
hive.map.aggr.hash.min.reduction=0.5(默認)
7.2.2?hive.groupby.skewindata=true
#?開啟數據傾斜時負載均衡 set?hive.groupby.skewindata=true;思想
就是先隨機分發并處理,再按照key group by來分發處理。
操作
當選項設定為true,生成的查詢計劃會有兩個MRJob。
第一個MRJob 中,Map的輸出結果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,并輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;
第二個MRJob再根據預處理的數據結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的原始GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。
點評
它使計算變成了兩個mapreduce,先在第一個中在 shuffle 過程 partition 時隨機給 key 打標記,使每個key 隨機均勻分布到各個 reduce 上計算,但是這樣只能完成部分計算,因為相同key沒有分配到相同reduce上。
所以需要第二次的mapreduce,這次就回歸正常 shuffle,但是數據分布不均勻的問題在第一次mapreduce已經有了很大的改善,因此基本解決數據傾斜。因為大量計算已經在第一次mr中隨機分布到各個節點完成。
7.2.3 Join
7.2.3.1 關于驅動表的選取
選用join key分布最均勻的表作為驅動表。
7.2.3.2 做好列裁剪和filter操作
以達到兩表做join的時候,數據量相對變小的效果。
7.2.3.3 left semi join
7.2.3.4 大小表Join - MapJoin
思想
小表關聯一個超大表時,容易發生數據傾斜,使用?MapJoin把小表全部加載到內存在map端進行join。如果需要的數據在 Map 的過程中可以訪問到則不再需要Reduce。
實例分析
原始sql:
以上為小表join大表的操作,可以使用mapjoin把小表c放到內存中處理,語法很簡單只需要增加?/*+ MAPJOIN(小標) */,把需要分發的表放入到內存中。
select?/*+?MAPJOIN(c)?*/ c.channel_name,count(t.requesturl)?PVfrom?ods.cms_channel?cjoin(select?host,requesturl?from??dms.tracklog_5min?where?day='20151111'?)?ton?c.channel_name=t.hostgroup?by?c.channel_nameorder?by?c.channel_name;7.2.3.5 大表Join大表 - skewjoin
當key值都是有效值時可使用hive配置:
set hive.optimize.skewjoin=true;
指定是否開啟數據傾斜的join運行時優化,默認不開啟即false。
set hive.skewjoin.key=100000;
判斷數據傾斜的閾值,如果在join中發現同樣的key超過該值,則認為是該key是傾斜key。
默認100000。一般可以設置成處理的總記錄數/reduce個數的2-4倍。
set hive.optimize.skewjoin.compiletime=true;
指定是否開啟數據傾斜的join編譯時優化,默認不開啟即false。
具體來說,會基于存儲在原數據中的傾斜key,來在編譯時為導致傾斜的key單獨創建執行計劃,而其他key也有一個執行計劃用來join。然后,對上面生成的兩個join執行后求并集。因此,除非相同的傾斜key同時存在于這兩個join表中,否則對于引起傾斜的key的join就會優化為map-side join。
此外,該參數與hive.optimize.skewjoin之間的主要區別在于,此參數使用存儲在metastore中的傾斜信息在編譯時來優化執行計劃。如果元數據中沒有傾斜信息,則此參數無效。一般可將這兩個參數都設為true。如果元數據中有傾斜信息,則hive.optimize.skewjoin不做任何操作。
7.2.3.6 小結
以上方式,都是根據數據傾斜形成的原因進行的一些變化。要么將 reduce 端的隱患在 map 端就解決,要么就是對 key 的操作,以減緩reduce 的壓力。了解了原因再去尋找解決之道就相對思路多了些,方法肯定不止這幾種。
7.2.4 先group再count
能先進行?group?操作的時候先進行group操作,把 key 先進行一次 reduce,之后再進行 count 或者 distinct count 操作。
7.2.5 控制空值分布
將為空的key轉變為字符串加隨機數或純隨機數,將因空值而造成傾斜的數據分不到多個Reducer。
注:對于異常值如果不需要的話,最好是提前在where條件里過濾掉,這樣可以使計算量大大減少
實踐中,可以使用case when對空值賦上隨機值。此方法比直接寫is not null更好,因為前者job數為1,后者為2.
使用case when實例1:
select?userid,?name fromuser_info?a join?( select?case when?userid?is?null??then??cast?(rand(47)*?100000?as?int?) else?userid end from?user_read_log )?b on?a.userid?=?b.userid使用case when實例2:
select'${date}'?as?thedate,a.search_type,a.query,a.category,a.cat_name,a.brand_id,a.brand_name,a.dir_type,a.rewcatid,a.new_cat_name,a.new_brand_id,f.brand_name?as?new_brand_name,a.pv,a.uv,a.ipv,a.ipvuv,a.trans_amt,a.trans_num,a.alipay_uv from?fdi_search_query_cat_qp_temp?a left?outer?join?brand?f onf.pt='${date}000000'and?case?when?a.new_brand_id?is?null?then?concat('hive',rand()?)?else?a.new_brand_id?end?=?f.brand_id如果上述的方法還不能解決,比如當有多個JOIN的時候,建議建立臨時表,然后拆分HIVE SQL語句。
7.2.6 壓縮
設置map端輸出、中間結果壓縮。(不完全是解決數據傾斜的問題,但是減少了IO讀寫和網絡傳輸,能提高很多效率)
7.2.7 增加Reuducer個數
默認是由參數hive.exec.reducers.bytes.per.reducer來推斷需要的Reducer個數。
可通過mapred.reduce.tasks控制,默認-
八、Spark解決數據傾斜具體方法
8.1 概述
mapjoin
設置rdd壓縮
合理設置driver的內存
Spark Sql中的優化和Hive類似,可以參考Hive
8.2 Spark數據傾斜表現
絕大多數task執行得都非???#xff0c;但個別task執行極慢。比如,總共有1000個task,997個task都在1分鐘之內執行完了,但是剩余兩三個task卻要一兩個小時。這種情況很常見。
原本能夠正常執行的Spark作業,某天突然報出OOM(內存溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。
8.3 Spark數據傾斜的原理
Shuffle必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。
比如大部分key對應10條數據,但是個別key卻對應了100萬條數據,那么大部分task可能就只會分配到10條數據,然后1秒鐘就運行完了;但是個別task可能分配到了100萬數據,要運行一兩個小時。因此,整個Spark作業的運行進度是由運行時間最長的那個task決定的。
因此出現數據傾斜的時候,Spark作業看起來會運行得非常緩慢,甚至可能因為某個task處理的數據量過大導致OOM。
8.4 Spark數據傾斜例子
下圖就是一個很清晰的例子:
hello這個key,在三個節點上對應了總共7條數據,這些數據都會被拉取到同一個task中進行處理;
而world和you這兩個key分別才對應1條數據,所以這兩個task只要分別處理1條數據即可。
此時第一個task的運行時間可能是另外兩個task的7倍,而整個stage的運行速度也由運行最慢的那個task所決定。
8.4 定位導致數據傾斜代碼
Spark數據傾斜只會發生在shuffle過程中。
這里給大家羅列一些常用的并且可能會觸發shuffle操作的算子:
distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。
8.4.1 某個task執行特別慢的情況
首先要看的,就是數據傾斜發生在第幾個stage中:
如果是用yarn-client模式提交,那么在提交的機器本地是直接可以看到log,可以在log中找到當前運行到了第幾個stage;
如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當前運行到了第幾個stage。
此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
看task運行時間和數據量
task運行時間
比如下圖中,倒數第三列顯示了每個task的運行時間。明顯可以看到,有的task運行特別快,只需要幾秒鐘就可以運行完;而有的task運行特別慢,需要幾分鐘才能運行完,此時單從運行時間上看就已經能夠確定發生數據傾斜了。
task數據量
此外,倒數第一列顯示了每個task處理的數據量,明顯可以看到,運行時間特別短的task只需要處理幾百KB的數據即可,而運行時間特別長的task需要處理幾千KB的數據,處理的數據量差了10倍。此時更加能夠確定是發生了數據傾斜。
推斷傾斜代碼
知道數據傾斜發生在哪一個stage之后,接著我們就需要根據stage劃分原理,推算出來發生傾斜的那個stage對應代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。
精準推算stage與代碼的對應關系,需要對Spark的源碼有深入的理解,這里我們可以介紹一個相對簡單實用的推算方法:只要看到Spark代碼中出現了一個shuffle類算子或者是Spark SQL的SQL語句中出現了會導致shuffle的語句(比如group by語句),那么就可以判定,以那個地方為界限劃分出了前后兩個stage。
這里我們就以如下單詞計數來舉例。
val?conf?=?new?SparkConf() val?sc?=?new?SparkContext(conf) val?lines?=?sc.textFile("hdfs://...") val?words?=?lines.flatMap(_.split("?")) val?pairs?=?words.map((_,?1)) val?wordCounts?=?pairs.reduceByKey(_?+?_) wordCounts.collect().foreach(println(_))在整個代碼中只有一個reduceByKey是會發生shuffle的算子,也就是說這個算子為界限劃分出了前后兩個stage:
stage0,主要是執行從textFile到map操作,以及shuffle write操作(對pairs RDD中的數據進行分區操作,每個task處理的數據中,相同的key會寫入同一個磁盤文件內)。
stage1,主要是執行從reduceByKey到collect操作,以及stage1的各個task一開始運行,就會首先執行shuffle read操作(會從stage0的各個task所在節點拉取屬于自己處理的那些key,然后對同一個key進行全局性的聚合或join等操作,在這里就是對key的value值進行累加)
stage1在執行完reduceByKey算子之后,就計算出了最終的wordCounts RDD,然后會執行collect算子,將所有數據拉取到Driver上,供我們遍歷和打印輸出。
通過對單詞計數程序的分析,希望能夠讓大家了解最基本的stage劃分的原理,以及stage劃分后shuffle操作是如何在兩個stage的邊界處執行的。然后我們就知道如何快速定位出發生數據傾斜的stage對應代碼的哪一個部分了。
比如我們在Spark Web UI或者本地log中發現,stage1的某幾個task執行得特別慢,判定stage1出現了數據傾斜,那么就可以回到代碼中,定位出stage1主要包括了reduceByKey這個shuffle類算子,此時基本就可以確定是是該算子導致了數據傾斜問題。
此時,如果某個單詞出現了100萬次,其他單詞才出現10次,那么stage1的某個task就要處理100萬數據,整個stage的速度就會被這個task拖慢。
8.4.2 某個task莫名其妙內存溢出的情況
這種情況下去定位出問題的代碼就比較容易了。我們建議直接看yarn-client模式下本地log的異常棧,或者是通過YARN查看yarn-cluster模式下的log中的異常棧。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發生了內存溢出。然后在那行代碼附近找找,一般也會有shuffle類算子,此時很可能就是這個算子導致了數據傾斜。
但是大家要注意的是,不能單純靠偶然的內存溢出就判定發生了數據傾斜。因為自己編寫的代碼的bug,以及偶然出現的數據異常,也可能會導致內存溢出。因此還是要按照上面所講的方法,通過Spark Web UI查看報錯的那個stage的各個task的運行時間以及分配的數據量,才能確定是否是由于數據傾斜才導致了這次內存溢出。
8.5 查看導致數據傾斜的key分布情況
知道了數據傾斜發生在哪里之后,通常需要分析一下那個執行了shuffle操作并且導致了數據傾斜的RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術方案提供依據。針對不同的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術方案來解決。
此時根據你執行操作的情況不同,可以有很多種查看key分布的方式:
如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下SQL中使用的表的key分布情況。
如果是對Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就可以看到key的分布情況。
舉例來說,對于上面所說的單詞計數程序,如果確定了是stage1的reduceByKey算子導致了數據傾斜,那么就應該看看進行reduceByKey操作的RDD中的key分布情況,在這個例子中指的就是pairs RDD。如下示例,我們可以先對pairs采樣10%的樣本數據,然后使用countByKey算子統計出每個key出現的次數,最后在客戶端遍歷和打印樣本數據中各個key的出現次數。
val?sampledPairs?=?pairs.sample(false,?0.1) val?sampledWordCounts?=?sampledPairs.countByKey() sampledWordCounts.foreach(println(_))8.6 Spark 數據傾斜的解決方案
8.6.1 使用Hive ETL預處理數據
8.6.1.1 適用場景
導致數據傾斜的是Hive表。如果該Hive表中的數據本身很不均勻(比如某個key對應了100萬數據,其他key才對應了10條數據),而且業務場景需要頻繁使用Spark對Hive表執行某個分析操作,那么比較適合使用這種技術方案。
8.6.1.2 實現思路
此時可以評估一下,是否可以通過Hive來進行數據預處理(即通過Hive ETL預先對數據按照key進行聚合,或者是預先和其他表進行join),然后在Spark作業中針對的數據源就不是原來的Hive表了,而是預處理后的Hive表。此時由于數據已經預先進行過聚合或join操作了,那么在Spark作業中也就不需要使用原先的shuffle類算子執行這類操作了。
8.6.1.3 方案實現原理
這種方案從根源上解決了數據傾斜,因為徹底避免了在Spark中執行shuffle類算子,那么肯定就不會有數據傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標不治本。因為畢竟數據本身就存在分布不均勻的問題,所以Hive ETL中進行group by或者join等shuffle操作時,還是會出現數據傾斜,導致Hive ETL的速度很慢。我們只是把數據傾斜的發生提前到了Hive ETL中,避免Spark程序發生數據傾斜而已。
8.6.1.4 方案優缺點
優點
實現起來簡單便捷,效果還非常好,完全規避掉了數據傾斜,Spark作業的性能會大幅度提升。
缺點
治標不治本,Hive ETL中還是會發生數據傾斜。
8.6.1.5 方案實踐經驗
在一些Java系統與Spark結合使用的項目中,會出現Java代碼頻繁調用Spark作業的場景,而且對Spark作業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提前到上游的Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之后每次Java調用Spark作業時,執行速度都會很快,能夠提供更好的用戶體驗。
8.6.1.6 項目實踐經驗
在美團·點評的交互式用戶行為分析系統中使用了這種方案,該系統主要是允許用戶通過Java Web系統提交數據分析統計任務,后端通過Java提交Spark作業進行數據分析統計。要求Spark作業速度必須要快,盡量在10分鐘以內,否則速度太慢,用戶體驗會很差。所以我們將有些Spark作業的shuffle操作提前到了Hive ETL中,從而讓Spark直接使用預處理的Hive中間表,盡可能地減少Spark的shuffle操作,大幅度提升了性能,將部分作業的性能提升了6倍以上。
8.6.2 過濾少數導致傾斜的key
8.6.2.1 方案適用場景
如果發現導致傾斜的key就少數幾個,而且對計算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對應10條數據,但是只有一個key對應了100萬數據,從而導致了數據傾斜。
8.6.2.2 方案實現思路
如果我們判斷那少數幾個數據量特別多的key,對作業的執行和計算結果不是特別重要的話,那么干脆就直接過濾掉那少數幾個key。
比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。
如果需要每次作業執行時,動態判定哪些key的數據量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數量,取數據量最多的key過濾掉即可。
8.6.2.3 方案實現原理
將導致數據傾斜的key給過濾掉之后,這些key就不會參與計算了,自然不可能產生數據傾斜。
8.6.2.4 方案優缺點
優點
實現簡單,而且效果也很好,可以完全規避掉數據傾斜。
缺點
適用場景不多,大多數情況下,導致傾斜的key還是很多的,并不是只有少數幾個。
8.6.2.5 方案實踐經驗
在項目中我們也采用過這種方案解決數據傾斜。有一次發現某一天Spark作業在運行的時候突然OOM了,追查之后發現,是Hive表中的某一個key在那天數據異常,導致數據量暴增。因此就采取每次執行前先進行采樣,計算出樣本中數據量最大的幾個key之后,直接在程序中將那些key給過濾掉。
8.6.3 提高shuffle操作的并行度
8.6.3.1 方案適用場景
如果我們必須要對數據傾斜迎難而上,那么建議優先使用這種方案,因為這是處理數據傾斜最簡單的一種方案。
8.6.3.2 方案實現思路
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的并行度,默認是200,對于很多場景來說都有點過小。
8.6.3.3 方案實現原理
增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。
而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。具體原理如下圖所示。
8.6.3.4 方案優缺點
優點
實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。
缺點
只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。
8.6.3.5 方案實踐經驗
該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬,那么無論你的task數量增加到多少,這個對應著100萬數據的key肯定還是會分配到一個task中去處理,因此注定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用最簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。
8.6.4 兩階段聚合(局部聚合+全局聚合)
8.6.4.1 方案適用場景
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
8.6.4.2 方案實現思路
這個方案的核心實現思路就是進行兩階段聚合:
第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。
接著對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。
然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。
示例代碼如下:
//?第一步,給RDD中的每個key都打上一個隨機前綴。 JavaPairRDD<String,?Long>?randomPrefixRdd?=?rdd.mapToPair(new?PairFunction<Tuple2<Long,Long>,?String,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?Long>?call(Tuple2<Long,?Long>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(10);return?new?Tuple2<String,?Long>(prefix?+?"_"?+?tuple._1,?tuple._2);}});//?第二步,對打上隨機前綴的key進行局部聚合。 JavaPairRDD<String,?Long>?localAggrRdd?=?randomPrefixRdd.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});//?第三步,去除RDD中每個key的隨機前綴。 JavaPairRDD<Long,?Long>?removedRandomPrefixRdd?=?localAggrRdd.mapToPair(new?PairFunction<Tuple2<String,Long>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<String,?Long>?tuple)throws?Exception?{long?originalKey?=?Long.valueOf(tuple._1.split("_")[1]);return?new?Tuple2<Long,?Long>(originalKey,?tuple._2);}});//?第四步,對去除了隨機前綴的RDD進行全局聚合。 JavaPairRDD<Long,?Long>?globalAggrRdd?=?removedRandomPrefixRdd.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});8.6.4.3 方案實現原理
將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。具體原理見下圖。
8.6.4.4 方案優缺點
優點
對于聚合類的shuffle操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark作業的性能提升數倍以上。
缺點
僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
8.6.5 將reduce join轉為map join
8.6.5.1 方案適用場景
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(比如幾百M或者一兩G),比較適用此方案。
8.6.5.2 方案實現思路
不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量,廣播給其他Executor節點;
接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。
示例如下:
//?首先將數據量比較小的RDD的數據,collect到Driver中來。 List<Tuple2<Long,?Row>>?rdd1Data?=?rdd1.collect() //?然后使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。 //?可以盡可能節省內存空間,并且減少網絡傳輸性能開銷。 final?Broadcast<List<Tuple2<Long,?Row>>>?rdd1DataBroadcast?=?sc.broadcast(rdd1Data);//?對另外一個RDD執行map類操作,而不再是join類操作。 JavaPairRDD<String,?Tuple2<String,?Row>>?joinedRdd?=?rdd2.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?Tuple2<String,?Row>>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?Tuple2<String,?Row>>?call(Tuple2<Long,?String>?tuple)throws?Exception?{//?在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。List<Tuple2<Long,?Row>>?rdd1Data?=?rdd1DataBroadcast.value();//?可以將rdd1的數據轉換為一個Map,便于后面進行join操作。Map<Long,?Row>?rdd1DataMap?=?new?HashMap<Long,?Row>();for(Tuple2<Long,?Row>?data?:?rdd1Data)?{rdd1DataMap.put(data._1,?data._2);}//?獲取當前RDD數據的key以及value。String?key?=?tuple._1;String?value?=?tuple._2;//?從rdd1數據Map中,根據key獲取到可以join到的數據。Row?rdd1Value?=?rdd1DataMap.get(key);return?new?Tuple2<String,?String>(key,?new?Tuple2<String,?Row>(value,?rdd1Value));}});//?這里得提示一下。 //?上面的做法,僅僅適用于rdd1中的key沒有重復,全部是唯一的場景。 //?如果rdd1中有多個相同的key,那么就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。 // rdd2中每條數據都可能會返回多條join后的數據。8.6.5.3 方案實現原理
普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。
但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。具體原理如下圖所示。
8.6.5.4 方案優缺點
優點
對join操作導致的數據傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生數據傾斜。
缺點
適用場景較少,因為這個方案只適用于一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內存資源,driver和每個Executor內存中都會駐留一份小RDD的全量數據。如果我們廣播出去的RDD數據比較大,比如10G以上,那么就可能發生內存溢出了。因此并不適合兩個都是大表的情況。
8.6.6 采樣傾斜key并分拆join操作
8.6.6.1 方案適用場景
兩個RDD/Hive表進行join的時候,如果數據量都比較大,無法采用“解決方案五”,那么此時可以看一下兩個RDD/Hive表中的key分布情況。
如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。
8.6.6.2 方案實現思路
對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以內的隨機數作為前綴;
而不會導致傾斜的大部分key形成另外一個RDD。
接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據并形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴;
不會導致傾斜的大部分key也形成另外一個RDD。
再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
而另外兩個普通的RDD就照常join即可。
最后將兩次join的結果使用union算子合并起來即可,就是最終的join結果。
示例如下:
//?首先從包含了少數幾個導致數據傾斜key的rdd1中,采樣10%的樣本數據。 JavaPairRDD<Long,?String>?sampledRDD?=?rdd1.sample(false,?0.1);//?對樣本數據RDD統計出每個key的出現次數,并按出現次數降序排序。 //?對降序排序后的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。 //?具體取出多少個數據量最多的key,由大家自己決定,我們這里就取1個作為示范。//?每行數據變為<key,1> JavaPairRDD<Long,?Long>?mappedSampledRDD?=?sampledRDD.mapToPair(new?PairFunction<Tuple2<Long,String>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<Long,?String>?tuple)throws?Exception?{return?new?Tuple2<Long,?Long>(tuple._1,?1L);}?????});//?按key累加行數 JavaPairRDD<Long,?Long>?countedSampledRDD?=?mappedSampledRDD.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});//?反轉key和value,變為<value,key> JavaPairRDD<Long,?Long>?reversedSampledRDD?=?countedSampledRDD.mapToPair(?new?PairFunction<Tuple2<Long,Long>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<Long,?Long>?tuple)throws?Exception?{return?new?Tuple2<Long,?Long>(tuple._2,?tuple._1);}});//?以行數排序key,取最多行數的key final?Long?skewedUserid?=?reversedSampledRDD.sortByKey(false).take(1).get(0)._2;//?從rdd1中分拆出導致數據傾斜的key,形成獨立的RDD。 JavaPairRDD<Long,?String>?skewedRDD?=?rdd1.filter(new?Function<Tuple2<Long,String>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?String>?tuple)?throws?Exception?{return?tuple._1.equals(skewedUserid);}});//?從rdd1中分拆出不導致數據傾斜的普通key,形成獨立的RDD。 JavaPairRDD<Long,?String>?commonRDD?=?rdd1.filter(new?Function<Tuple2<Long,String>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?String>?tuple)?throws?Exception?{return?!tuple._1.equals(skewedUserid);}?});// rdd2,就是那個所有key的分布相對較為均勻的rdd。 //?這里將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,并對rdd中的數據使用flatMap算子都擴容100倍。 //?對擴容的每條數據,都打上0~100的前綴。 JavaPairRDD<String,?Row>?skewedRdd2?=?rdd2.filter(new?Function<Tuple2<Long,Row>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?Row>?tuple)?throws?Exception?{return?tuple._1.equals(skewedUserid);}}).flatMapToPair(new?PairFlatMapFunction<Tuple2<Long,Row>,?String,?Row>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Iterable<Tuple2<String,?Row>>?call(Tuple2<Long,?Row>?tuple)?throws?Exception?{Random?random?=?new?Random();List<Tuple2<String,?Row>>?list?=?new?ArrayList<Tuple2<String,?Row>>();for(int?i?=?0;?i?<?100;?i++)?{list.add(new?Tuple2<String,?Row>(i?+?"_"?+?tuple._1,?tuple._2));}return?list;}});//?將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條數據都打上100以內的隨機前綴。 //?然后將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD1?=?skewedRDD.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?String>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?String>?call(Tuple2<Long,?String>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(100);return?new?Tuple2<String,?String>(prefix?+?"_"?+?tuple._1,?tuple._2);}}).join(skewedUserid2infoRDD).mapToPair(new?PairFunction<Tuple2<String,Tuple2<String,Row>>,?Long,?Tuple2<String,?Row>>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Tuple2<String,?Row>>?call(Tuple2<String,?Tuple2<String,?Row>>?tuple)throws?Exception?{long?key?=?Long.valueOf(tuple._1.split("_")[1]);return?new?Tuple2<Long,?Tuple2<String,?Row>>(key,?tuple._2);}});//?將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD2?=?commonRDD.join(rdd2);//?將傾斜key join后的結果與普通key join后的結果,uinon起來。 //?就是最終的join結果。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD?=?joinedRDD1.union(joinedRDD2);8.6.6.3 方案實現原理
對于join導致的數據傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,并附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了。具體原理見下圖。
8.6.6.4 方案優缺點
優點
對于join導致的數據傾斜,如果只是某幾個key導致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數傾斜key對應的數據進行擴容n倍,不需要對全量數據進行擴容。避免了占用過多內存。
缺點
如果導致傾斜的key特別多的話,比如成千上萬個key都導致數據傾斜,那么這種方式也不適合。
8.6.7 使用隨機前綴和擴容RDD進行join
8.6.7.1 方案適用場景
如果在進行join操作時,RDD中有大量的key導致數據傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了。
8.6.7.2 方案實現思路
該方案的實現思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數據分布情況,找到那個造成數據傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條數據。
然后將該RDD的每條數據都打上一個n以內的隨機前綴。
同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。
最后將兩個處理后的RDD進行join即可。
示例代碼如下:
//?首先將其中一個key分布相對較為均勻的RDD膨脹100倍。 JavaPairRDD<String,?Row>?expandedRDD?=?rdd1.flatMapToPair(new?PairFlatMapFunction<Tuple2<Long,Row>,?String,?Row>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Iterable<Tuple2<String,?Row>>?call(Tuple2<Long,?Row>?tuple)throws?Exception?{List<Tuple2<String,?Row>>?list?=?new?ArrayList<Tuple2<String,?Row>>();for(int?i?=?0;?i?<?100;?i++)?{list.add(new?Tuple2<String,?Row>(0?+?"_"?+?tuple._1,?tuple._2));}return?list;}});//?其次,將另一個有數據傾斜key的RDD,每條數據都打上100以內的隨機前綴。 JavaPairRDD<String,?String>?mappedRDD?=?rdd2.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?String>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?String>?call(Tuple2<Long,?String>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(100);return?new?Tuple2<String,?String>(prefix?+?"_"?+?tuple._1,?tuple._2);}});//?將兩個處理后的RDD進行join即可。 JavaPairRDD<String,?Tuple2<String,?Row>>?joinedRDD?=?mappedRDD.join(expandedRDD);8.6.7.3 方案實現原理
將原先一樣的key通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。
該方案與“解決方案六”的不同之處就在于,上一種方案是盡量只對少數傾斜key對應的數據進行特殊處理,由于處理過程需要擴容RDD,因此上一種方案擴容RDD后對內存的占用并不大;
而這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數據擴容,對內存資源要求很高。
8.6.7.4 方案優缺點
優點
對join類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。
缺點
該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個RDD進行擴容,對內存資源要求很高。
8.6.7.5 方案實踐經驗
曾經開發一個數據需求的時候,發現一個join導致了數據傾斜。優化之前,作業的執行時間大約是60分鐘左右;使用該方案優化之后,執行時間縮短到10分鐘左右,性能提升了6倍。
8.6.8 多種方案組合使用
在實踐中發現,很多情況下,如果只是處理較為簡單的數據傾斜場景,那么使用上述方案中的某一種基本就可以解決。但是如果要處理一個較為復雜的數據傾斜場景,那么可能需要將多種方案組合起來使用。
比如說,我們針對出現了多個數據傾斜環節的Spark作業,可以先運用解決方案一HiveETL預處理和過濾少數導致傾斜的k,預處理一部分數據,并過濾一部分數據來緩解;
其次可以對某些shuffle操作提升并行度,優化其性能;
最后還可以針對不同的聚合或join操作,選擇一種方案來優化其性能。
大家需要對這些方案的思路和原理都透徹理解之后,在實踐中根據各種不同的情況,靈活運用多種方案,來解決自己的數據傾斜問題。
8.7 Spark數據傾斜處理小結
總結
以上是生活随笔為你收集整理的3万字细品数据倾斜(建议收藏)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis探秘:选择合适的数据结构,减少
- 下一篇: 公司的API接口被刷了,那是因为你没这样