mapreduce shuffle过程问答
? ? ? 通過hadoop權(quán)威指南學(xué)習(xí)hadoop,對(duì)shuffle過程一直很疑惑,經(jīng)過查看網(wǎng)上多個(gè)帖子,最終 完成此篇問答總結(jié)。
1.什么叫shuffle
從map任務(wù)輸出到reducer任務(wù)輸入之間的過程就叫做shuffle
?
2.每個(gè)map任務(wù)都有對(duì)應(yīng)的緩存嗎?默認(rèn)是多少,怎么配置這個(gè)值的大小?
每個(gè)map任務(wù)都有一個(gè)緩存支持輸出,默認(rèn)大小是100m,可以通過屬性io.sort.mb配置
?
3.什么時(shí)候觸發(fā)緩存的數(shù)據(jù)寫入磁盤
當(dāng)緩存的容量達(dá)到緩存一定比例時(shí)觸發(fā),這個(gè)比例由屬性Io.sort.spill.percent配置,默認(rèn)是0.8
?
4.為什么需要設(shè)置寫入比例
達(dá)到一定比例后,由于寫緩存和讀緩存是可以同時(shí)并行執(zhí)行的,這會(huì)降低把緩存數(shù)據(jù)騰空的時(shí)間,從而提高效率
?
5.怎么理解緩存叫做環(huán)形緩存
緩存有一個(gè)閥值比例配置,當(dāng)達(dá)到整個(gè)緩存的這個(gè)比例時(shí),會(huì)觸發(fā)spill操作;觸發(fā)時(shí),map輸出還會(huì)接著往剩下的空間寫入,但是寫滿的空間會(huì)被鎖定,數(shù)據(jù)溢出寫入磁盤。
當(dāng)這部分溢出的數(shù)據(jù)寫完后,空出的內(nèi)存空間可以接著被使用,形成像環(huán)一樣的被循環(huán)使用的效果。如圖:
圖一表示剛好達(dá)到溢出比例的結(jié)構(gòu):
?
圖二表示有數(shù)據(jù)開始spill到磁盤,并且新的數(shù)據(jù)繼續(xù)往空的空間寫入
?
圖三表示溢出的數(shù)據(jù)都被寫入磁盤后緩存的狀態(tài)
?
圖四表示溢出前剩余的空間被寫滿后繼續(xù)從頭(以前被溢出的數(shù)據(jù)所占空間)開始寫入
?
以上四個(gè)圖展示的過程為,尾部寫滿后從頭部接著寫,形成類似環(huán)狀的形態(tài)
?
6.緩存的結(jié)構(gòu)是什么樣的?
如圖:
數(shù)據(jù)從右到左開始寫入,關(guān)于此keyvalue的元數(shù)據(jù)(partition,keystart,valuestart)寫入左邊的索引區(qū)
?
?
?
7.怎么理解partition的過程
分做兩步:
1).標(biāo)記key value所屬與的分區(qū)
??? 當(dāng)map輸出的時(shí)候,寫入緩存之前,會(huì)調(diào)用partition函數(shù),計(jì)算出數(shù)據(jù)所屬的分區(qū),并且把這個(gè)元 數(shù)據(jù)存儲(chǔ)起來
2).把屬與同一分區(qū)的數(shù)據(jù)合并在一起
??? 當(dāng)數(shù)據(jù)達(dá)到溢出的條件時(shí)(即達(dá)到溢出比例,啟動(dòng)線程準(zhǔn)備寫入文件前),讀取緩存中的數(shù)據(jù)和分區(qū)元數(shù)據(jù),然后把屬與同一分區(qū)的數(shù)據(jù)合并到一起
?
8.map任務(wù)端數(shù)據(jù)輸出排序過程是什么樣的?
當(dāng)達(dá)到溢出條件后,比如默認(rèn)的是0.8,則會(huì)讀出80M的數(shù)據(jù),根據(jù)之前的分區(qū)元數(shù)據(jù),按照分區(qū)號(hào)進(jìn)行排序,這樣就可實(shí)現(xiàn)同一分區(qū)的數(shù)據(jù)都在一起,然后再根據(jù)map輸出的key進(jìn)行排序。最后實(shí)現(xiàn)溢出的文件內(nèi)是分區(qū)的,且分區(qū)內(nèi)是有序的
?
9.map任務(wù)數(shù)據(jù)輸出后所做的combine和merge有什么區(qū)別?
1)combine主要是把形如aa:1,aa:2這樣的key值相同的數(shù)據(jù)進(jìn)行計(jì)算,計(jì)算規(guī)則與reduce一致,比如:當(dāng)前計(jì)算是求key對(duì)應(yīng)的值求和,則combine操作后得到aa:3這樣的結(jié)果。
?????? 當(dāng)map輸出數(shù)據(jù)根據(jù)分區(qū)排序完成后,在寫入文件之前會(huì)執(zhí)行一次combine操作(前提是設(shè)客戶端設(shè)置了這個(gè)操作);如果map輸出比較大,溢出文件個(gè)數(shù)大于3(此值可以通過屬性min.num.spills.for.combine配置)時(shí),在merge的過程(多個(gè)spill文件合并為一個(gè)大文件)中還會(huì)執(zhí)行combine操作
注意事項(xiàng):不是每種作業(yè)都可以做combine操作的,只有滿足以下條件才可以:
?a)reduce的輸入輸出類型都一樣,因?yàn)?/span>combine本質(zhì)上就是用的reduce
?b)計(jì)算邏輯上,combine操作后不會(huì)影響計(jì)算結(jié)果,像求和就不會(huì)影響
?
2)merge操作是對(duì)形如a:1 a:2這樣的數(shù)據(jù)最后形成{"a":[1,2]}這樣的數(shù)據(jù),作為reduce任務(wù)的輸入
??? map輸出的時(shí)候,只有在多個(gè)溢出文件合并為一個(gè)大文件時(shí)才會(huì)執(zhí)行merge操作
?
無論是combine還是merge都是為了增加數(shù)據(jù)的密度,減少數(shù)據(jù)的傳輸和存儲(chǔ),提高系統(tǒng)的效率
?
10.怎么標(biāo)記溢出文件中不同分區(qū)的數(shù)據(jù)
每次溢出的數(shù)據(jù)寫入文件時(shí),都按照分區(qū)的數(shù)值從小到大排序,內(nèi)部存儲(chǔ)是以tag的方式區(qū)分不同分區(qū)的數(shù)據(jù);同時(shí)生成一個(gè)索引文件,這個(gè)索引文件記錄分區(qū)的描述信息,包括:起始位置、長度、以及壓縮長度,這些信息存儲(chǔ)在IndexRecord結(jié)構(gòu)里面。一個(gè)spill文件中的多個(gè)段的索引數(shù)據(jù)被組織成SpillRecord結(jié)構(gòu),SpillRecord又被加入進(jìn)indexCacheList中。
?
11.怎樣把所有的spill文件合并進(jìn)入唯一一個(gè)文件
????? map輸出數(shù)據(jù)比較多的時(shí)候,會(huì)生成多個(gè)溢出文件,任務(wù)完成的最后一件事情就是把這些文件合并為一個(gè)大文件。合并的過程中一定會(huì)做merge操作,可能會(huì)做combine操作。
1)如果生成的文件太多,可能會(huì)執(zhí)行多次合并,每次最多能合并的文件數(shù)默認(rèn)為10,可以通過屬性min.num.spills.for.combine配置
2)多個(gè)溢出文件合并是,同一個(gè)分區(qū)內(nèi)部也必須再做一次排序,排序算法是多路歸并排序
3)是否還需要做combine操作,一是看是否設(shè)置了combine,二是看溢出的文件數(shù)是否大于等于3,請(qǐng)看第9點(diǎn)的介紹
4)最終生成的文件格式與單個(gè)溢出文件一致,也是按分區(qū)順序存儲(chǔ),并且有一個(gè)對(duì)應(yīng)的索引文件,記錄每個(gè)分區(qū)數(shù)據(jù)的起始位置,長度以及壓縮長度。這個(gè)索引文件名叫做file.out.index
?
12.reducer怎么知道去哪兒讀取map輸出呢?
當(dāng)任務(wù)執(zhí)行完成后,tasktracker會(huì)通知jobtracker;當(dāng)reducer所在的reducer通過心跳請(qǐng)求任務(wù)時(shí),jobtracker會(huì)告訴reducer去哪兒拷貝數(shù)據(jù)
?
13.reducer怎么知道自己應(yīng)該讀取那個(gè)分區(qū)呢?
這個(gè)問題,我一直沒有搞明白,目前猜測是按照順序,比如第一個(gè)分配的reudcer任務(wù)對(duì)應(yīng)的分區(qū)號(hào)是0;還有一種可能是,執(zhí)行map任務(wù)的tasktracker把分區(qū)索引告訴了jobtracker,然后jobtracker明確告訴reducer去哪兒讀取輸出,讀取的是那個(gè)分區(qū)的數(shù)據(jù)。
?
14.reduce端的過程是什么樣的?
?
reduce的運(yùn)行是分成三個(gè)階段的。分別為copy->sort->reduce。由于job的每一個(gè)map都會(huì)根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個(gè)partition,
所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的。所以,為了優(yōu)化reduce的執(zhí)行時(shí)間,hadoop中是等job的第一個(gè)map結(jié)束后,
所有的reduce就開始嘗試從完成的map中下載該reduce對(duì)應(yīng)的partition部分?jǐn)?shù)據(jù)。這個(gè)過程就是通常所說的shuffle,也就是copy過程。
?
Reduce task在做shuffle時(shí),實(shí)際上就是從不同的已經(jīng)完成的map上去下載屬于自己這個(gè)reduce的部分?jǐn)?shù)據(jù),由于map通常有許多個(gè),
所以對(duì)一個(gè)reduce來說,下載也可以是并行的從多個(gè)map下載,這個(gè)并行度是可以調(diào)整的,調(diào)整參數(shù)為:mapred.reduce.parallel.copies(default 5)。
默認(rèn)情況下,每個(gè)只會(huì)有5個(gè)并行的下載線程在從map下數(shù)據(jù),如果一個(gè)時(shí)間段內(nèi)job完成的map有100個(gè)或者更多,那么reduce也最多只能同時(shí)下載5個(gè)map的數(shù)據(jù),
所以這個(gè)參數(shù)比較適合map很多并且完成的比較快的job的情況下調(diào)大,有利于reduce更快的獲取屬于自己部分的數(shù)據(jù)。
?
reduce的每一個(gè)下載線程在下載某個(gè)map數(shù)據(jù)的時(shí)候,有可能因?yàn)槟莻€(gè)map中間結(jié)果所在機(jī)器發(fā)生錯(cuò)誤,或者中間結(jié)果的文件丟失,或者網(wǎng)絡(luò)瞬斷等等情況,
這樣reduce的下載就有可能失敗,所以reduce的下載線程并不會(huì)無休止的等待下去,當(dāng)一定時(shí)間后下載仍然失敗,那么下載線程就會(huì)放棄這次下載,
并在隨后嘗試從另外的地方下載(因?yàn)檫@段時(shí)間map可能重跑)。所以reduce下載線程的這個(gè)最大的下載時(shí)間段是可以調(diào)整的,
調(diào)整參數(shù)為:mapred.reduce.copy.backoff(default 300秒)。如果集群環(huán)境的網(wǎng)絡(luò)本身是瓶頸,那么用戶可以通過調(diào)大這個(gè)參數(shù)來避免reduce下載線程被誤判為失敗的情況。不過在網(wǎng)絡(luò)環(huán)境比較好的情況下,沒有必要調(diào)整。通常來說專業(yè)的集群網(wǎng)絡(luò)不應(yīng)該有太大問題,所以這個(gè)參數(shù)需要調(diào)整的情況不多。
?
Reduce將map結(jié)果下載到本地時(shí),同樣也是需要進(jìn)行merge的,所以io.sort.factor的配置選項(xiàng)同樣會(huì)影響reduce進(jìn)行merge時(shí)的行為,該參數(shù)的詳細(xì)介紹上文已經(jīng)提到,
當(dāng)發(fā)現(xiàn)reduce在shuffle階段iowait非常的高的時(shí)候,就有可能通過調(diào)大這個(gè)參數(shù)來加大一次merge時(shí)的并發(fā)吞吐,優(yōu)化reduce效率。
?
Reduce在shuffle階段對(duì)下載來的map數(shù)據(jù),并不是立刻就寫入磁盤的,而是會(huì)先緩存在內(nèi)存中,然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才刷入磁盤。
這個(gè)內(nèi)存大小的控制就不像map一樣可以通過io.sort.mb來設(shè)定了,而是通過另外一個(gè)參數(shù)來設(shè)置:mapred.job.shuffle.input.buffer.percent(default 0.7),
這個(gè)參數(shù)其實(shí)是一個(gè)百分比,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task。也就是說,
如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設(shè)置,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)。默認(rèn)情況下,
reduce會(huì)使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)。如果reduce的heap由于業(yè)務(wù)原因調(diào)整的比較大,相應(yīng)的緩存大小也會(huì)變大,這也是為什么reduce
用來做緩存的參數(shù)是一個(gè)百分比,而不是一個(gè)固定的值了。
?
假設(shè)mapred.job.shuffle.input.buffer.percent為0.7,reduce task的max heapsize為1G,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右,
這700M的內(nèi)存,跟map端一樣,也不是要等到全部寫滿才會(huì)往磁盤刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個(gè)百分比),就會(huì)開始往磁盤刷。
這個(gè)限度閾值也是可以通過job參數(shù)來設(shè)定的,設(shè)定參數(shù)為:mapred.job.shuffle.merge.percent(default 0.66)。如果下載速度很快,
很容易就把內(nèi)存緩存撐大,那么調(diào)整一下這個(gè)參數(shù)有可能會(huì)對(duì)reduce的性能有所幫助。
?
當(dāng)reduce將所有的map上對(duì)應(yīng)自己partition的數(shù)據(jù)下載完成后,就會(huì)開始真正的reduce計(jì)算階段(中間有個(gè)sort階段通常時(shí)間非常短,幾秒鐘就完成了,
因?yàn)檎麄€(gè)下載階段就已經(jīng)是邊下載邊sort,然后邊merge的)。當(dāng)reduce task真正進(jìn)入reduce函數(shù)的計(jì)算階段的時(shí)候,有一個(gè)參數(shù)也是可以調(diào)整reduce的計(jì)算行為。
也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。由于reduce計(jì)算時(shí)肯定也是需要消耗內(nèi)存的,而在讀取reduce需要的數(shù)據(jù)時(shí),
同樣是需要內(nèi)存作為buffer,這個(gè)參數(shù)是控制,需要多少的內(nèi)存百分比來作為reduce讀已經(jīng)sort好的數(shù)據(jù)的buffer百分比。默認(rèn)情況下為0,也就是說,
默認(rèn)情況下,reduce是全部從磁盤開始讀處理數(shù)據(jù)。如果這個(gè)參數(shù)大于0,那么就會(huì)有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當(dāng)reduce計(jì)算邏輯消耗內(nèi)存很小時(shí),
可以分一部分內(nèi)存用來緩存數(shù)據(jù),反正reduce的內(nèi)存閑著也是閑著。
?
參考資料:
1.http://www.linuxidc.com/Linux/2011-11/47053.htm
2.http://blog.csdn.net/mrtitan/article/details/8711366
3.http://www.alidata.org/archives/1470
4.http://blog.sina.com.cn/s/blog_4a1f59bf0100ssap.html
5.http://blog.csdn.net/HEYUTAO007/article/details/5725379
?
轉(zhuǎn)載于:https://blog.51cto.com/xigan/1163820
總結(jié)
以上是生活随笔為你收集整理的mapreduce shuffle过程问答的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: audio 标签简介
- 下一篇: Find Minimum in Rota