从MapReduce的执行来看如何优化MaxCompute(原ODPS) SQL
摘要: SQL基礎(chǔ)有這些操作(按照?qǐng)?zhí)行順序來(lái)排列): from join(left join, right join, inner join, outer join ,semi join) where group by select sum distinct count order by 如果我們能理解mapreduce是怎么實(shí)現(xiàn)這些SQL中的基本操作的,那么我們將很容易理解怎么優(yōu)化SQL寫(xiě)法。
點(diǎn)此查看原文:http://click.aliyun.com/m/41382/
SQL基礎(chǔ)有這些操作(按照?qǐng)?zhí)行順序來(lái)排列):
from
join(left join, right join, inner join, outer join ,semi join)
where
group by
select
sum
distinct
count
order by
如果我們能理解mapreduce是怎么實(shí)現(xiàn)這些SQL中的基本操作的,那么我們將很容易理解怎么優(yōu)化SQL寫(xiě)法。接下來(lái)我們一個(gè)一個(gè)的談:
from
這個(gè)操作是在解析過(guò)程中就完成了,目的就是找出輸入的表(文件)。
join(left join, right join, inner join, outer join ,semi join)
這個(gè)操作需要在參與map和reduce整個(gè)階段。下圖給出了各個(gè)階段的數(shù)據(jù)輸入輸出變化:
假如執(zhí)行這個(gè)SQL:
從上面圖可以看出當(dāng)出現(xiàn)數(shù)據(jù)在某個(gè)(某些)key特別集中的時(shí)候,就會(huì)出現(xiàn)reduce的接收數(shù)據(jù)是不均勻的,導(dǎo)致reduce端數(shù)據(jù)傾斜。
where
這個(gè)地方如果有分區(qū)字段的話(huà),會(huì)直接解析階段就做裁剪。不會(huì)拖到后面的map和reduce階段。如果不是分區(qū)字段,則只會(huì)涉及得到map階段,在這個(gè)階段直接過(guò)濾。
group by
select student_id, sum(score) from student_course group by student_id將GroupBy的字段組合為map的輸出key值,利用MapReduce的排序,在reduce階段保存LastKey區(qū)分不同的key。MapReduce的過(guò)程如下(當(dāng)然這里只是說(shuō)明Reduce端的非Hash聚合過(guò)程)
select
因?yàn)镸axComput(原ODPS)的文件存儲(chǔ)是列式的,所以在select在編譯解析的過(guò)程中會(huì)起到裁剪列的作用。比如一個(gè)表假如有100列,select中只出現(xiàn)了3列,那么其余的97列是沒(méi)有進(jìn)行計(jì)算的。寫(xiě)select盡量避免使用*,并且不需要的字段盡量刪減掉。
sum
到這里開(kāi)始涉及到了聚合函數(shù),聚合函數(shù)需要區(qū)分可以拆分并行和不可以拆分并行兩種。sum是典型的可拆分并行的。sum(1,2,3,1) = sum(1,2) + sum(3,1) = 7。而avg就是不可并行計(jì)算,avg(1,2,3,1) != avg(1,2) + avg(3,1) != avg(avg(1,2) + avg(3,1))。但是avg可以轉(zhuǎn)化成可并行計(jì)算,比如先sum分子,再sum分母來(lái)并行化。
如果函數(shù)可并行,那么就可以在map階段進(jìn)行提前聚合,大大減少后面的發(fā)往reduce端的網(wǎng)絡(luò)傳遞。
distinct
如果是單distinct的話(huà),會(huì)把distinct的列直接附在group-by字段組后面,然后進(jìn)行處理。
麻煩的是multi distinct。根據(jù)disinct的邏輯,必須保證每個(gè)分組(group-by)相同的distinct列相同的key都分在同一個(gè)reduce中,否則就沒(méi)有辦法完成去重工作。所以如果按照單distinct的邏輯,reduce端就需要針對(duì)每一個(gè)distinct字段進(jìn)行排序和去重。這樣做顯然是不高效的,因?yàn)閷?duì)reduce端的計(jì)算壓力很大,而且也沒(méi)有利用到shuffle階段的排序。
第二種方法就是把distinct的字段都拆開(kāi),形成獨(dú)立的n張表。最后再做union all的操作。過(guò)程如下:
select date, count(distinct student_id),count(distinct course), sum(score) from student_course group by date
order by
在odps上和order by相似的功能在還有sort by, distribute by,cluster by。 后面的語(yǔ)法在普通的關(guān)系型數(shù)據(jù)庫(kù)都不存在。算是mapreduce特有的功能。這里先解釋下每個(gè)語(yǔ)句的含義:
order by —— order by會(huì)對(duì)輸入做全局排序,因此只有一個(gè)Reducer(多個(gè)Reducer無(wú)法保證全局有序),然而只有一個(gè)Reducer,會(huì)導(dǎo)致當(dāng)輸入規(guī)模較大時(shí),消耗較長(zhǎng)的計(jì)算時(shí)間。
sort by —— sort by不是全局排序,其在數(shù)據(jù)進(jìn)入reducer前完成排序,因此,如果用sort by進(jìn)行排序,并且設(shè)置mapred.reduce.tasks>1,則sort by只會(huì)保證每個(gè)reducer的輸出有序,并不保證全局有序。sort by不同于order by,它不受Hive.mapred.mode屬性的影響,sort by的數(shù)據(jù)只能保證在同一個(gè)reduce中的數(shù)據(jù)可以按指定字段排序。使用sort by你可以指定執(zhí)行的reduce個(gè)數(shù)(通過(guò)set mapred.reduce.tasks=n來(lái)指定),對(duì)輸出的數(shù)據(jù)再執(zhí)行歸并排序,即可得到全部結(jié)果。
distribute by —— distribute by是控制在map端如何拆分?jǐn)?shù)據(jù)給reduce端的。hive會(huì)根據(jù)distribute by后面列,對(duì)應(yīng)reduce的個(gè)數(shù)進(jìn)行分發(fā),默認(rèn)是采用hash算法。sort by為每個(gè)reduce產(chǎn)生一個(gè)排序文件。在有些情況下,你需要控制某個(gè)特定行應(yīng)該到哪個(gè)reducer,這通常是為了進(jìn)行后續(xù)的聚集操作。distribute by剛好可以做這件事。因此,distribute by經(jīng)常和sort by配合使用。
cluster by —— cluster by除了具有distribute by的功能外還兼具sort by的功能。但是排序只能是倒敘排序,不能指定排序規(guī)則為ASC或者DESC。
MapReduce的幾個(gè)階段
input
split
map
shuffle
reduce
output 這每個(gè)階段都會(huì)出現(xiàn)各種問(wèn)題,我們依次從前到后來(lái)講怎么處理各個(gè)階段出現(xiàn)的問(wèn)題。
Input & split
根據(jù)MaxCompute的功能,input可以是本地文件,也可以是數(shù)據(jù)庫(kù)的表。可以通過(guò)InputFormat借口來(lái)定義。但是這個(gè)Format和后面的split階段息息相關(guān)。因?yàn)閟plit只切割比block小的文件,對(duì)于小文件則不作處理。所以當(dāng)存在大量的小文件(特指大小達(dá)不到block大小的文件),會(huì)生成大量的split塊,同時(shí)也會(huì)啟動(dòng)大量map任務(wù)。
可能出現(xiàn)的問(wèn)題
分區(qū)裁剪中出現(xiàn)問(wèn)題 > 解決方法是讓odps在生成任務(wù)之前就能確定好讀區(qū)到分區(qū)的范圍
輸入存在大量小文件,導(dǎo)致map instance數(shù)量超標(biāo) > 解決辦法是讀取時(shí)候設(shè)定塊大小,可以使用setSplitSize來(lái)控制讀取文件總大小 > 解決方案二是提前就把這些小文件給合并了
輸入文件大小分布非常不均勻,導(dǎo)致split的塊大小分布不均勻,從而導(dǎo)致map端傾斜 > 可以使用setSplitSize來(lái)控制讀取文件總大小
輸入的文件不能被切割,導(dǎo)致split塊大小不均勻
暫時(shí)沒(méi)有找到解法
相比于hadoop,odps系統(tǒng)在小文件處理方面的功能已經(jīng)比較完善,主要體現(xiàn)在以下兩個(gè)方面:
(1) 默認(rèn)情況下,當(dāng)Job完成之后,如果滿(mǎn)足一定的條件,系統(tǒng)會(huì)自動(dòng)分配一個(gè)FuxiTask(調(diào)度任務(wù))進(jìn)行小文件合并,即我們經(jīng)常看到的MergeTask;
map
map階段的輸入是上面Input&split階段來(lái)保障的,一個(gè)分片一個(gè)map任務(wù)。所以當(dāng)分片處理的不合理,map階段就會(huì)出現(xiàn)問(wèn)題。而map端經(jīng)過(guò)shuffle和combianer(可選)后,會(huì)把數(shù)據(jù)交給reduce端。
從input&split 到map可能出現(xiàn)的問(wèn)題
輸入存在大量小文件,導(dǎo)致map instance數(shù)量超標(biāo) > 同上
因?yàn)镺DPS的SQL或者其他任務(wù)會(huì)解析成一個(gè)Task DAG。所以從最初輸入到最終輸出會(huì)有很多的中間計(jì)算。而這些中間計(jì)算之間也是對(duì)應(yīng)著一個(gè)個(gè)的map reduce。如果當(dāng)上一個(gè)map/reduce任務(wù)產(chǎn)生的輸入可能形成一個(gè)種長(zhǎng)尾分布,導(dǎo)致下一個(gè)mapreduce輸入出現(xiàn)長(zhǎng)尾。也就是map端任務(wù)傾斜。
shuffle
這個(gè)階段是mapreduce的核心,設(shè)計(jì)到sort,group和數(shù)據(jù)分發(fā)。
可能出現(xiàn)的問(wèn)題
數(shù)據(jù)量特別大,可以使用combinar來(lái)進(jìn)行mapper端的聚合。odps的參數(shù)是
reduce
知道m(xù)apreduce計(jì)算模型的人都知道,map階段輸入是非結(jié)構(gòu)化的,并不需要實(shí)現(xiàn)規(guī)定好輸入的內(nèi)容,輸出則是一塊塊分區(qū)好的pair。而到reduce則有要求,那就是同樣key的map處理的pair需要發(fā)送到同樣的reduce中。這樣就會(huì)出現(xiàn)某key數(shù)據(jù)量很大,某key數(shù)據(jù)量很小的時(shí)候?qū)?yīng)的reduce處理的數(shù)據(jù)量大小也是不均勻的。一旦出現(xiàn)這種情,任務(wù)執(zhí)行的結(jié)束時(shí)間必然會(huì)受到最長(zhǎng)任務(wù)的拖累。,v>,v>
能產(chǎn)生reduce數(shù)據(jù)分布不均勻的操作,最長(zhǎng)出現(xiàn)的有兩分類(lèi):
這里推薦本書(shū)《mapreduce設(shè)計(jì)模式》,其中的連接模式篇章把各種join的描述。在這里大概說(shuō)下join的類(lèi)型:
reduce端連接
map端連接(在odps中使用mapjoin即可),這個(gè)操作的前提是存在一個(gè)小表能放入到mapreduce中的環(huán)形內(nèi)存中。而且大表必須作為“主表”(比如left join的話(huà)就必須是左表,而right join就必須是右表)。
所以到底為什么會(huì)產(chǎn)生傾斜呢?map端連接肯定是不會(huì)產(chǎn)生數(shù)據(jù)傾斜的,那么傾斜的必然是reduce連接。當(dāng)一張表出現(xiàn)數(shù)據(jù)熱點(diǎn)。這樣就會(huì)出現(xiàn)熱點(diǎn)reduce的運(yùn)行遠(yuǎn)遠(yuǎn)大于其它的長(zhǎng)尾,導(dǎo)致數(shù)據(jù)不均衡。
大概總結(jié)下就是:
- 如果存在小表,且如果左外連接時(shí)候小表是右表(或者是右外連接,小表必須是左表),可以使用mapjoin。
- 如果都是大表且有熱點(diǎn),這樣會(huì)出現(xiàn)傾斜,這時(shí)候需要剔除熱點(diǎn)數(shù)據(jù)單獨(dú)處理。
- 如果都是大表沒(méi)有熱點(diǎn),這樣不會(huì)出現(xiàn)傾斜,這樣還需要怎么優(yōu)化?——這里首選想辦法減小數(shù)據(jù)集合,如果不能在查看是否出現(xiàn)某些熱門(mén)的數(shù)據(jù),如果有,則對(duì)數(shù)據(jù)進(jìn)行分桶。
count(distinct) 對(duì)于distinct的實(shí)現(xiàn),單鍵的時(shí)候會(huì)被直接附到group by的字段后,同時(shí)作為map輸出的key值來(lái)處理。這樣轉(zhuǎn)化成了group by處理,一般是沒(méi)有問(wèn)題的。但是麻煩的是多鍵值count(distinct),這個(gè)沒(méi)有辦法直接把所有的distinct的字段附到group by后面了。因?yàn)檫@樣無(wú)法利用shuffle階段的排序,到了reduce階段需要做很多遍的去重操作。所有一般對(duì)于multi distinct都是采用給distinct 字段做編號(hào),然后復(fù)制數(shù)據(jù)。比如輸入數(shù)據(jù)是這樣:
可以看到distinct會(huì)導(dǎo)致數(shù)據(jù)翻倍膨脹,而這些膨脹的數(shù)據(jù)后會(huì)通過(guò)網(wǎng)絡(luò)傳輸?shù)絩educe,必然會(huì)造成很大的浪費(fèi)。所以要治理,方法一是首先把distinct轉(zhuǎn)成group by放在子查詢(xún)中,然后外層再套一層查詢(xún)進(jìn)行分組count。
方法二:設(shè)置參數(shù)——odps.sql.groupby.skewindata=true
當(dāng)選項(xiàng)設(shè)定為 true,生成的查詢(xún)計(jì)劃會(huì)有兩個(gè) MR Job。第一個(gè) MR Job 中,Map 的輸出結(jié)果集合會(huì)隨機(jī)分布到 Reduce 中,每個(gè) Reduce 做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的 Group By Key 有可能被分發(fā)到不同的 Reduce 中,從而達(dá)到負(fù)載均衡的目的;第二個(gè) MR Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照 Group By Key 分布到 Reduce 中(這個(gè)過(guò)程可以保證相同的 Group By Key 被分布到同一個(gè) Reduce 中),最后完成最終的聚合操作。
總結(jié)
以上是生活随笔為你收集整理的从MapReduce的执行来看如何优化MaxCompute(原ODPS) SQL的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: mysql inodb主键bug_MyS
- 下一篇: this super java_java