Google MapReduce
摘要
MapReduce是一個(gè)編程模型,也是一個(gè)處理和生成超大數(shù)據(jù)集的算法模型的相關(guān)實(shí)現(xiàn)。用戶首先創(chuàng)建一個(gè)Map函數(shù)處理一個(gè)基于 key/value pair的數(shù)據(jù)集合,輸出中間的基于key/value pair的數(shù)據(jù)集合;然后再創(chuàng)建一個(gè)Reduce函數(shù)用來(lái)合并所有的具有相同中間key值的中間value值。現(xiàn)實(shí)世界中有很多滿足上述處理模型的例子, 本論文將詳細(xì)描述這個(gè)模型。 MapReduce架構(gòu)的程序能夠在大量的普通配置的計(jì)算機(jī)上實(shí)現(xiàn)并行化處理。這個(gè)系統(tǒng)在運(yùn)行時(shí)只關(guān)心:如何分割輸入數(shù)據(jù),在大量計(jì)算機(jī)組成的 集群上的調(diào)度,集群中計(jì)算機(jī)的錯(cuò)誤處理,管理集群中計(jì)算機(jī)之間必要的通信。采用MapReduce架構(gòu)可以使那些沒(méi)有并行計(jì)算和分布式處理系統(tǒng)開(kāi)發(fā)經(jīng)驗(yàn)的 程序員有效利用分布式系統(tǒng)的豐富資源。 我們的MapReduce實(shí)現(xiàn)運(yùn)行在規(guī)模可以靈活調(diào)整的由普通機(jī)器組成的集群上:一個(gè)典型的MapReduce計(jì)算往往由幾千臺(tái)機(jī)器組成、處理 以TB計(jì)算的數(shù)據(jù)。程序員發(fā)現(xiàn)這個(gè)系統(tǒng)非常好用:已經(jīng)實(shí)現(xiàn)了數(shù)以百計(jì)的MapReduce程序,在Google的集群上,每天都有1000多個(gè) MapReduce程序在執(zhí)行。1、介紹
在過(guò)去的5年里,包括本文作者在內(nèi)的Google的很多程序員,為了處理海量的原始數(shù)據(jù),已經(jīng)實(shí)現(xiàn)了數(shù)以百計(jì)的、專用的計(jì)算方法。這些計(jì)算方法 用來(lái)處理大量的原始數(shù)據(jù),比如,文檔抓取(類似網(wǎng)絡(luò)爬蟲(chóng)的程序)、Web請(qǐng)求日志等等;也為了計(jì)算處理各種類型的衍生數(shù)據(jù),比如倒排索引、Web文檔的圖 結(jié)構(gòu)的各種表示形勢(shì)、每臺(tái)主機(jī)上網(wǎng)絡(luò)爬蟲(chóng)抓取的頁(yè)面數(shù)量的匯總、每天被請(qǐng)求的最多的查詢的集合等等。大多數(shù)這樣的數(shù)據(jù)處理運(yùn)算在概念上很容易理解。然而由 于輸入的數(shù)據(jù)量巨大,因此要想在可接受的時(shí)間內(nèi)完成運(yùn)算,只有將這些計(jì)算分布在成百上千的主機(jī)上。如何處理并行計(jì)算、如何分發(fā)數(shù)據(jù)、如何處理錯(cuò)誤?所有這 些問(wèn)題綜合在一起,需要大量的代碼處理,因此也使得原本簡(jiǎn)單的運(yùn)算變得難以處理。 為了解決上述復(fù)雜的問(wèn)題,我們?cè)O(shè)計(jì)一個(gè)新的抽象模型,使用這個(gè)抽象模型,我們只要表述我們想要執(zhí)行的簡(jiǎn)單運(yùn)算即可,而不必關(guān)心并行計(jì)算、容錯(cuò)、 數(shù)據(jù)分布、負(fù)載均衡等復(fù)雜的細(xì)節(jié),這些問(wèn)題都被封裝在了一個(gè)庫(kù)里面。設(shè)計(jì)這個(gè)抽象模型的靈感來(lái)自Lisp和許多其他函數(shù)式語(yǔ)言的Map和Reduce的原 語(yǔ)。我們意識(shí)到我們大多數(shù)的運(yùn)算都包含這樣的操作:在輸入數(shù)據(jù)的“邏輯”記錄上應(yīng)用Map操作得出一個(gè)中間key/value pair集合,然后在所有具有相同key值的value值上應(yīng)用Reduce操作,從而達(dá)到合并中間的數(shù)據(jù),得到一個(gè)想要的結(jié)果的目的。使用 MapReduce模型,再結(jié)合用戶實(shí)現(xiàn)的Map和Reduce函數(shù),我們就可以非常容易的實(shí)現(xiàn)大規(guī)模并行化計(jì)算;通過(guò)MapReduce模型自帶的“再 次執(zhí)行”(re-execution)功能,也提供了初級(jí)的容災(zāi)實(shí)現(xiàn)方案。 這個(gè)工作(實(shí)現(xiàn)一個(gè)MapReduce框架模型)的主要貢獻(xiàn)是通過(guò)簡(jiǎn)單的接口來(lái)實(shí)現(xiàn)自動(dòng)的并行化和大規(guī)模的分布式計(jì)算,通過(guò)使用MapReduce模型接口實(shí)現(xiàn)在大量普通的PC機(jī)上高性能計(jì)算。 第二部分描述基本的編程模型和一些使用案例。第三部分描述了一個(gè)經(jīng)過(guò)裁剪的、適合我們的基于集群的計(jì)算環(huán)境的MapReduce實(shí)現(xiàn)。第四部分 描述我們認(rèn)為在MapReduce編程模型中一些實(shí)用的技巧。第五部分對(duì)于各種不同的任務(wù),測(cè)量我們MapReduce實(shí)現(xiàn)的性能。第六部分揭示了在 Google內(nèi)部如何使用MapReduce作為基礎(chǔ)重寫(xiě)我們的索引系統(tǒng)產(chǎn)品,包括其它一些使用MapReduce的經(jīng)驗(yàn)。第七部分討論相關(guān)的和未來(lái)的工 作。2、編程模型
MapReduce編程模型的原理是:利用一個(gè)輸入key/value pair集合來(lái)產(chǎn)生一個(gè)輸出的key/value pair集合。MapReduce庫(kù)的用戶用兩個(gè)函數(shù)表達(dá)這個(gè)計(jì)算:Map和Reduce。 用戶自定義的Map函數(shù)接受一個(gè)輸入的key/value pair值,然后產(chǎn)生一個(gè)中間key/value pair值的集合。MapReduce庫(kù)把所有具有相同中間key值I的中間value值集合在一起后傳遞給reduce函數(shù)。 用戶自定義的Reduce函數(shù)接受一個(gè)中間key的值I和相關(guān)的一個(gè)value值的集合。Reduce函數(shù)合并這些value值,形成一個(gè)較小 的value值的集合。一般的,每次Reduce函數(shù)調(diào)用只產(chǎn)生0或1個(gè)輸出value值。通常我們通過(guò)一個(gè)迭代器把中間value值提供給Reduce 函數(shù),這樣我們就可以處理無(wú)法全部放入內(nèi)存中的大量的value值的集合。2.1、例子
例如,計(jì)算一個(gè)大的文檔集合中每個(gè)單詞出現(xiàn)的次數(shù),下面是偽代碼段:map(String key, String value):
??? // key: document name
??? // value: document contents
??? for each word w in value:
??????? EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
??? // key: a word
??? // values: a list of counts
??? int result = 0;
??? for each v in values:
??????? result += ParseInt(v);
??? Emit(AsString(result)); Map函數(shù)輸出文檔中的每個(gè)詞、以及這個(gè)詞的出現(xiàn)次數(shù)(在這個(gè)簡(jiǎn)單的例子里就是1)。Reduce函數(shù)把Map函數(shù)產(chǎn)生的每一個(gè)特定的詞的計(jì)數(shù)累加起來(lái)。 另外,用戶編寫(xiě)代碼,使用輸入和輸出文件的名字、可選的調(diào)節(jié)參數(shù)來(lái)完成一個(gè)符合MapReduce模型規(guī)范的對(duì)象,然后調(diào)用MapReduce 函數(shù),并把這個(gè)規(guī)范對(duì)象傳遞給它。用戶的代碼和MapReduce庫(kù)鏈接在一起(用C++實(shí)現(xiàn))。附錄A包含了這個(gè)實(shí)例的全部程序代碼。
2.2、類型
盡管在前面例子的偽代碼中使用了以字符串表示的輸入輸出值,但是在概念上,用戶定義的Map和Reduce函數(shù)都有相關(guān)聯(lián)的類型:map(k1,v1) ->list(k2,v2)
? reduce(k2,list(v2)) ->list(v2)
比如,輸入的key和value值與輸出的key和value值在類型上推導(dǎo)的域不同。此外,中間key和value值與輸出key和value值在類型上推導(dǎo)的域相同。
(alex注:原文中這個(gè)domain的含義不是很清楚,我參考Hadoop、KFS等實(shí)現(xiàn),map和reduce都使用了泛型,因此,我把domain翻譯成類型推導(dǎo)的域)。
我們的C++中使用字符串類型作為用戶自定義函數(shù)的輸入輸出,用戶在自己的代碼中對(duì)字符串進(jìn)行適當(dāng)?shù)念愋娃D(zhuǎn)換。
2.3、更多的例子
這里還有一些有趣的簡(jiǎn)單例子,可以很容易的使用MapReduce模型來(lái)表示:- 分布式的Grep:Map函數(shù)輸出匹配某個(gè)模式的一行,Reduce函數(shù)是一個(gè)恒等函數(shù),即把中間數(shù)據(jù)復(fù)制到輸出。
- 計(jì)算URL訪問(wèn)頻率:Map函數(shù)處理日志中web頁(yè)面請(qǐng)求的記錄,然后輸出(URL,1)。Reduce函數(shù)把相同URL的value值都累加起來(lái),產(chǎn)生(URL,記錄總數(shù))結(jié)果。
- 倒轉(zhuǎn)網(wǎng)絡(luò)鏈接圖:Map函數(shù)在源頁(yè)面(source)中搜索所有的鏈接目標(biāo)(target)并輸出為(target,source)。Reduce函數(shù)把給定鏈接目標(biāo)(target)的鏈接組合成一個(gè)列表,輸出(target,list(source))。
- 每個(gè)主機(jī)的檢索詞向量:檢索詞向量用一個(gè)(詞,頻率)列表來(lái)概述出現(xiàn)在文檔或文檔集中的最重要的一些詞。Map函數(shù)為每一個(gè)輸入文檔輸出(主機(jī) 名,檢索詞向量),其中主機(jī)名來(lái)自文檔的URL。Reduce函數(shù)接收給定主機(jī)的所有文檔的檢索詞向量,并把這些檢索詞向量加在一起,丟棄掉低頻的檢索 詞,輸出一個(gè)最終的(主機(jī)名,檢索詞向量)。
- 倒排索引:Map函數(shù)分析每個(gè)文檔輸出一個(gè)(詞,文檔號(hào))的列表,Reduce函數(shù)的輸入是一個(gè)給定詞的所有(詞,文檔號(hào)),排序所有的文檔號(hào),輸出(詞,list(文檔號(hào)))。所有的輸出集合形成一個(gè)簡(jiǎn)單的倒排索引,它以一種簡(jiǎn)單的算法跟蹤詞在文檔中的位置。
- 分布式排序:Map函數(shù)從每個(gè)記錄提取key,輸出(key,record)。Reduce函數(shù)不改變?nèi)魏蔚闹怠_@個(gè)運(yùn)算依賴分區(qū)機(jī)制(在4.1描述)和排序?qū)傩?在4.2描述)。
3、實(shí)現(xiàn)
MapReduce模型可以有多種不同的實(shí)現(xiàn)方式。如何正確選擇取決于具體的環(huán)境。例如,一種實(shí)現(xiàn)方式適用于小型的共享內(nèi)存方式的機(jī)器,另外一種實(shí)現(xiàn)方式則適用于大型NUMA架構(gòu)的多處理器的主機(jī),而有的實(shí)現(xiàn)方式更適合大型的網(wǎng)絡(luò)連接集群。 本章節(jié)描述一個(gè)適用于Google內(nèi)部廣泛使用的運(yùn)算環(huán)境的實(shí)現(xiàn):用以太網(wǎng)交換機(jī)連接、由普通PC機(jī)組成的大型集群。在我們的環(huán)境里包括:1.x86架構(gòu)、運(yùn)行Linux操作系統(tǒng)、雙處理器、2-4GB內(nèi)存的機(jī)器。
2.普通的網(wǎng)絡(luò)硬件設(shè)備,每個(gè)機(jī)器的帶寬為百兆或者千兆,但是遠(yuǎn)小于網(wǎng)絡(luò)的平均帶寬的一半。 (alex注:這里需要網(wǎng)絡(luò)專家解釋一下了)
3.集群中包含成百上千的機(jī)器,因此,機(jī)器故障是常態(tài)。
4.存儲(chǔ)為廉價(jià)的內(nèi)置IDE硬盤。一個(gè)內(nèi)部分布式文件系統(tǒng)用來(lái)管理存儲(chǔ)在這些磁盤上的數(shù)據(jù)。文件系統(tǒng)通過(guò)數(shù)據(jù)復(fù)制來(lái)在不可靠的硬件上保證數(shù)據(jù)的可靠性和有效性。
5.用戶提交工作(job)給調(diào)度系統(tǒng)。每個(gè)工作(job)都包含一系列的任務(wù)(task),調(diào)度系統(tǒng)將這些任務(wù)調(diào)度到集群中多臺(tái)可用的機(jī)器上。
3.1、執(zhí)行概括
通過(guò)將Map調(diào)用的輸入數(shù)據(jù)自動(dòng)分割為M個(gè)數(shù)據(jù)片段的集合,Map調(diào)用被分布到多臺(tái)機(jī)器上執(zhí)行。輸入的數(shù)據(jù)片段能夠在不同的機(jī)器上并行處理。使 用分區(qū)函數(shù)將Map調(diào)用產(chǎn)生的中間key值分成R個(gè)不同分區(qū)(例如,hash(key) mod R),Reduce調(diào)用也被分布到多臺(tái)機(jī)器上執(zhí)行。分區(qū)數(shù)量(R)和分區(qū)函數(shù)由用戶來(lái)指定。 圖1展示了我們的MapReduce實(shí)現(xiàn)中操作的全部流程。當(dāng)用戶調(diào)用MapReduce函數(shù)時(shí),將發(fā)生下面的一系列動(dòng)作(下面的序號(hào)和圖1中的序號(hào)一一對(duì)應(yīng)):1.用戶程序首先調(diào)用的MapReduce庫(kù)將輸入文件分成M個(gè)數(shù)據(jù)片度,每個(gè)數(shù)據(jù)片段的大小一般從 16MB到64MB(可以通過(guò)可選的參數(shù)來(lái)控制每個(gè)數(shù)據(jù)片段的大小)。然后用戶程序在機(jī)群中創(chuàng)建大量的程序副本。 (alex:copies of the program還真難翻譯)
2.這些程序副本中的有一個(gè)特殊的程序–master。副本中其它的程序都是worker程序,由master分配任務(wù)。有M個(gè)Map任務(wù)和R個(gè)Reduce任務(wù)將被分配,master將一個(gè)Map任務(wù)或Reduce任務(wù)分配給一個(gè)空閑的worker。
3.被分配了map任務(wù)的worker程序讀取相關(guān)的輸入數(shù)據(jù)片段,從輸入的數(shù)據(jù)片段中解析出key/value pair,然后把key/value pair傳遞給用戶自定義的Map函數(shù),由Map函數(shù)生成并輸出的中間key/value pair,并緩存在內(nèi)存中。
4.緩存中的key/value pair通過(guò)分區(qū)函數(shù)分成R個(gè)區(qū)域,之后周期性的寫(xiě)入到本地磁盤上。緩存的key/value pair在本地磁盤上的存儲(chǔ)位置將被回傳給master,由master負(fù)責(zé)把這些存儲(chǔ)位置再傳送給Reduce worker。
5.當(dāng)Reduce worker程序接收到master程序發(fā)來(lái)的數(shù)據(jù)存儲(chǔ)位置信息后,使用RPC從Map worker所在主機(jī)的磁盤上讀取這些緩存數(shù)據(jù)。當(dāng)Reduce worker讀取了所有的中間數(shù)據(jù)后,通過(guò)對(duì)key進(jìn)行排序后使得具有相同key值的數(shù)據(jù)聚合在一起。由于許多不同的key值會(huì)映射到相同的Reduce 任務(wù)上,因此必須進(jìn)行排序。如果中間數(shù)據(jù)太大無(wú)法在內(nèi)存中完成排序,那么就要在外部進(jìn)行排序。
6.Reduce worker程序遍歷排序后的中間數(shù)據(jù),對(duì)于每一個(gè)唯一的中間key值,Reduce worker程序?qū)⑦@個(gè)key值和它相關(guān)的中間value值的集合傳遞給用戶自定義的Reduce函數(shù)。Reduce函數(shù)的輸出被追加到所屬分區(qū)的輸出文件。
7.當(dāng)所有的Map和Reduce任務(wù)都完成之后,master喚醒用戶程序。在這個(gè)時(shí)候,在用戶程序里的對(duì)MapReduce調(diào)用才返回。
在成功完成任務(wù)之后,MapReduce的輸出存放在R個(gè)輸出文件中(對(duì)應(yīng)每個(gè)Reduce任務(wù)產(chǎn)生一個(gè)輸出文件,文件名由用戶指定)。一般情況 下,用戶不需要將這R個(gè)輸出文件合并成一個(gè)文件–他們經(jīng)常把這些文件作為另外一個(gè)MapReduce的輸入,或者在另外一個(gè)可以處理多個(gè)分割文件的分布式 應(yīng)用中使用。
3.2、Master數(shù)據(jù)結(jié)構(gòu)
Master持有一些數(shù)據(jù)結(jié)構(gòu),它存儲(chǔ)每一個(gè)Map和Reduce任務(wù)的狀態(tài)(空閑、工作中或完成),以及Worker機(jī)器(非空閑任務(wù)的機(jī)器)的標(biāo)識(shí)。 Master就像一個(gè)數(shù)據(jù)管道,中間文件存儲(chǔ)區(qū)域的位置信息通過(guò)這個(gè)管道從Map傳遞到Reduce。因此,對(duì)于每個(gè)已經(jīng)完成的Map任 務(wù),master存儲(chǔ)了Map任務(wù)產(chǎn)生的R個(gè)中間文件存儲(chǔ)區(qū)域的大小和位置。當(dāng)Map任務(wù)完成時(shí),Master接收到位置和大小的更新信息,這些信息被逐 步遞增的推送給那些正在工作的Reduce任務(wù)。3.3、容錯(cuò)
因?yàn)镸apReduce庫(kù)的設(shè)計(jì)初衷是使用由成百上千的機(jī)器組成的集群來(lái)處理超大規(guī)模的數(shù)據(jù),所以,這個(gè)庫(kù)必須要能很好的處理機(jī)器故障。 worker故障master周期性的ping每個(gè)worker。如果在一個(gè)約定的時(shí)間范圍內(nèi)沒(méi)有收到worker返回的信息,master將把這個(gè) worker標(biāo)記為失效。所有由這個(gè)失效的worker完成的Map任務(wù)被重設(shè)為初始的空閑狀態(tài),之后這些任務(wù)就可以被安排給其他的worker。同樣 的,worker失效時(shí)正在運(yùn)行的Map或Reduce任務(wù)也將被重新置為空閑狀態(tài),等待重新調(diào)度。
當(dāng)worker故障時(shí),由于已經(jīng)完成的Map任務(wù)的輸出存儲(chǔ)在這臺(tái)機(jī)器上,Map任務(wù)的輸出已不可訪問(wèn)了,因此必須重新執(zhí)行。而已經(jīng)完成的Reduce任務(wù)的輸出存儲(chǔ)在全局文件系統(tǒng)上,因此不需要再次執(zhí)行。
當(dāng)一個(gè)Map任務(wù)首先被worker A執(zhí)行,之后由于worker A失效了又被調(diào)度到worker B執(zhí)行,這個(gè)“重新執(zhí)行”的動(dòng)作會(huì)被通知給所有執(zhí)行Reduce任務(wù)的worker。任何還沒(méi)有從worker A讀取數(shù)據(jù)的Reduce任務(wù)將從worker B讀取數(shù)據(jù)。 MapReduce可以處理大規(guī)模worker失效的情況。比如,在一個(gè)MapReduce操作執(zhí)行期間,在正在運(yùn)行的集群上進(jìn)行網(wǎng)絡(luò)維護(hù)引起 80臺(tái)機(jī)器在幾分鐘內(nèi)不可訪問(wèn)了,MapReduce master只需要簡(jiǎn)單的再次執(zhí)行那些不可訪問(wèn)的worker完成的工作,之后繼續(xù)執(zhí)行未完成的任務(wù),直到最終完成這個(gè)MapReduce操作。 master失敗一個(gè)簡(jiǎn)單的解決辦法是讓master周期性的將上面描述的數(shù)據(jù)結(jié)構(gòu) (alex注:指3.2節(jié))的 寫(xiě)入磁盤,即檢查點(diǎn)(checkpoint)。如果這個(gè)master任務(wù)失效了,可以從最后一個(gè)檢查點(diǎn)(checkpoint)開(kāi)始啟動(dòng)另一個(gè) master進(jìn)程。然而,由于只有一個(gè)master進(jìn)程,master失效后再恢復(fù)是比較麻煩的,因此我們現(xiàn)在的實(shí)現(xiàn)是如果master失效,就中止 MapReduce運(yùn)算。客戶可以檢查到這個(gè)狀態(tài),并且可以根據(jù)需要重新執(zhí)行MapReduce操作。 在失效方面的處理機(jī)制
(alex注:原文為”semantics in the presence of failures”)
當(dāng)用戶提供的Map和Reduce操作是輸入確定性函數(shù)(即相同的輸入產(chǎn)生相同的輸出)時(shí),我們的分布式實(shí)現(xiàn)在任何情況下的輸出都和所有程序沒(méi)有出現(xiàn)任何錯(cuò)誤、順序的執(zhí)行產(chǎn)生的輸出是一樣的。 我們依賴對(duì)Map和Reduce任務(wù)的輸出是原子提交的來(lái)完成這個(gè)特性。每個(gè)工作中的任務(wù)把它的輸出寫(xiě)到私有的臨時(shí)文件中。每個(gè)Reduce任 務(wù)生成一個(gè)這樣的文件,而每個(gè)Map任務(wù)則生成R個(gè)這樣的文件(一個(gè)Reduce任務(wù)對(duì)應(yīng)一個(gè)文件)。當(dāng)一個(gè)Map任務(wù)完成的時(shí),worker發(fā)送一個(gè)包 含R個(gè)臨時(shí)文件名的完成消息給master。如果master從一個(gè)已經(jīng)完成的Map任務(wù)再次接收到到一個(gè)完成消息,master將忽略這個(gè)消息;否 則,master將這R個(gè)文件的名字記錄在數(shù)據(jù)結(jié)構(gòu)里。 當(dāng)Reduce任務(wù)完成時(shí),Reduce worker進(jìn)程以原子的方式把臨時(shí)文件重命名為最終的輸出文件。如果同一個(gè)Reduce任務(wù)在多臺(tái)機(jī)器上執(zhí)行,針對(duì)同一個(gè)最終的輸出文件將有多個(gè)重命名 操作執(zhí)行。我們依賴底層文件系統(tǒng)提供的重命名操作的原子性來(lái)保證最終的文件系統(tǒng)狀態(tài)僅僅包含一個(gè)Reduce任務(wù)產(chǎn)生的數(shù)據(jù)。
使用MapReduce模型的程序員可以很容易的理解他們程序的行為,因?yàn)槲覀兘^大多數(shù)的Map和Reduce操作是確定性的,而且存在這樣的一個(gè) 事實(shí):我們的失效處理機(jī)制等價(jià)于一個(gè)順序的執(zhí)行的操作。當(dāng)Map或/和Reduce操作是不確定性的時(shí)候,我們提供雖然較弱但是依然合理的處理機(jī)制。當(dāng)使 用非確定操作的時(shí)候,一個(gè)Reduce任務(wù)R1的輸出等價(jià)于一個(gè)非確定性程序順序執(zhí)行產(chǎn)生時(shí)的輸出。但是,另一個(gè)Reduce任務(wù)R2的輸出也許符合一個(gè) 不同的非確定順序程序執(zhí)行產(chǎn)生的R2的輸出。
考慮Map任務(wù)M和Reduce任務(wù)R1、R2的情況。我們?cè)O(shè)定e(Ri)是Ri已經(jīng)提交的執(zhí)行過(guò)程(有且僅有一個(gè)這樣的執(zhí)行過(guò)程)。當(dāng)e(R1)讀取了由M一次執(zhí)行產(chǎn)生的輸出,而e(R2)讀取了由M的另一次執(zhí)行產(chǎn)生的輸出,導(dǎo)致了較弱的失效處理。3.4、存儲(chǔ)位置
在我們的計(jì)算運(yùn)行環(huán)境中,網(wǎng)絡(luò)帶寬是一個(gè)相當(dāng)匱乏的資源。我們通過(guò)盡量把輸入數(shù)據(jù)(由GFS管理)存儲(chǔ)在集群中機(jī)器的本地磁盤上來(lái)節(jié)省網(wǎng)絡(luò)帶 寬。GFS把每個(gè)文件按64MB一個(gè)Block分隔,每個(gè)Block保存在多臺(tái)機(jī)器上,環(huán)境中就存放了多份拷貝(一般是3個(gè)拷貝)。MapReduce的 master在調(diào)度Map任務(wù)時(shí)會(huì)考慮輸入文件的位置信息,盡量將一個(gè)Map任務(wù)調(diào)度在包含相關(guān)輸入數(shù)據(jù)拷貝的機(jī)器上執(zhí)行;如果上述努力失敗 了,master將嘗試在保存有輸入數(shù)據(jù)拷貝的機(jī)器附近的機(jī)器上執(zhí)行Map任務(wù)(例如,分配到一個(gè)和包含輸入數(shù)據(jù)的機(jī)器在一個(gè)switch里的 worker機(jī)器上執(zhí)行)。當(dāng)在一個(gè)足夠大的cluster集群上運(yùn)行大型MapReduce操作的時(shí)候,大部分的輸入數(shù)據(jù)都能從本地機(jī)器讀取,因此消耗 非常少的網(wǎng)絡(luò)帶寬。3.5、任務(wù)粒度
如前所述,我們把Map拆分成了M個(gè)片段、把Reduce拆分成R個(gè)片段執(zhí)行。理想情況下,M和R應(yīng)當(dāng)比集群中worker的機(jī)器數(shù)量要多得 多。在每臺(tái)worker機(jī)器都執(zhí)行大量的不同任務(wù)能夠提高集群的動(dòng)態(tài)的負(fù)載均衡能力,并且能夠加快故障恢復(fù)的速度:失效機(jī)器上執(zhí)行的大量Map任務(wù)都可以 分布到所有其他的worker機(jī)器上去執(zhí)行。但是實(shí)際上,在我們的具體實(shí)現(xiàn)中對(duì)M和R的取值都有一定的客觀限制,因?yàn)閙aster必須執(zhí)行O(M+R)次調(diào)度,并且在內(nèi)存中保存O(M*R)個(gè)狀態(tài)(對(duì)影響內(nèi)存使用的因素還是比較小的:O(M*R)塊狀態(tài),大概每對(duì)Map任務(wù)/Reduce任務(wù)1個(gè)字節(jié)就可以了)。
更進(jìn)一步,R值通常是由用戶指定的,因?yàn)槊總€(gè)Reduce任務(wù)最終都會(huì)生成一個(gè)獨(dú)立的輸出文件。實(shí)際使用時(shí)我們也傾向于選擇合適的M值,以使得 每一個(gè)獨(dú)立任務(wù)都是處理大約16M到64M的輸入數(shù)據(jù)(這樣,上面描寫(xiě)的輸入數(shù)據(jù)本地存儲(chǔ)優(yōu)化策略才最有效),另外,我們把R值設(shè)置為我們想使用的 worker機(jī)器數(shù)量的小的倍數(shù)。我們通常會(huì)用這樣的比例來(lái)執(zhí)行MapReduce:M=200000,R=5000,使用2000臺(tái)worker機(jī)器。3.6、備用任務(wù)
影響一個(gè)MapReduce的總執(zhí)行時(shí)間最通常的因素是“落伍者”:在運(yùn)算過(guò)程中,如果有一臺(tái)機(jī)器花了很長(zhǎng)的時(shí)間才完成最后幾個(gè)Map或 Reduce任務(wù),導(dǎo)致MapReduce操作總的執(zhí)行時(shí)間超過(guò)預(yù)期。出現(xiàn)“落伍者”的原因非常多。比如:如果一個(gè)機(jī)器的硬盤出了問(wèn)題,在讀取的時(shí)候要經(jīng) 常的進(jìn)行讀取糾錯(cuò)操作,導(dǎo)致讀取數(shù)據(jù)的速度從30M/s降低到1M/s。如果cluster的調(diào)度系統(tǒng)在這臺(tái)機(jī)器上又調(diào)度了其他的任務(wù),由于CPU、內(nèi) 存、本地硬盤和網(wǎng)絡(luò)帶寬等競(jìng)爭(zhēng)因素的存在,導(dǎo)致執(zhí)行MapReduce代碼的執(zhí)行效率更加緩慢。我們最近遇到的一個(gè)問(wèn)題是由于機(jī)器的初始化代碼有bug, 導(dǎo)致關(guān)閉了的處理器的緩存:在這些機(jī)器上執(zhí)行任務(wù)的性能和正常情況相差上百倍。 我們有一個(gè)通用的機(jī)制來(lái)減少“落伍者”出現(xiàn)的情況。當(dāng)一個(gè)MapReduce操作接近完成的時(shí)候,master調(diào)度備用(backup)任務(wù)進(jìn) 程來(lái)執(zhí)行剩下的、處于處理中狀態(tài)(in-progress)的任務(wù)。無(wú)論是最初的執(zhí)行進(jìn)程、還是備用(backup)任務(wù)進(jìn)程完成了任務(wù),我們都把這個(gè)任 務(wù)標(biāo)記成為已經(jīng)完成。我們調(diào)優(yōu)了這個(gè)機(jī)制,通常只會(huì)占用比正常操作多幾個(gè)百分點(diǎn)的計(jì)算資源。我們發(fā)現(xiàn)采用這樣的機(jī)制對(duì)于減少超大MapReduce操作的 總處理時(shí)間效果顯著。例如,在5.3節(jié)描述的排序任務(wù),在關(guān)閉掉備用任務(wù)的情況下要多花44%的時(shí)間完成排序任務(wù)。4、技巧
雖然簡(jiǎn)單的Map和Reduce函數(shù)提供的基本功能已經(jīng)能夠滿足大部分的計(jì)算需要,我們還是發(fā)掘出了一些有價(jià)值的擴(kuò)展功能。本節(jié)將描述這些擴(kuò)展功能。
4.1、分區(qū)函數(shù)
MapReduce的使用者通常會(huì)指定Reduce任務(wù)和Reduce任務(wù)輸出文件的數(shù)量(R)。我們?cè)谥虚gkey上使用分區(qū)函數(shù)來(lái)對(duì)數(shù)據(jù)進(jìn)行 分區(qū),之后再輸入到后續(xù)任務(wù)執(zhí)行進(jìn)程。一個(gè)缺省的分區(qū)函數(shù)是使用hash方法(比如,hash(key) mod R)進(jìn)行分區(qū)。hash方法能產(chǎn)生非常平衡的分區(qū)。然而,有的時(shí)候,其它的一些分區(qū)函數(shù)對(duì)key值進(jìn)行的分區(qū)將非常有用。比如,輸出的key值是 URLs,我們希望每個(gè)主機(jī)的所有條目保持在同一個(gè)輸出文件中。為了支持類似的情況,MapReduce庫(kù)的用戶需要提供專門的分區(qū)函數(shù)。例如,使用 “hash(Hostname(urlkey)) mod R”作為分區(qū)函數(shù)就可以把所有來(lái)自同一個(gè)主機(jī)的URLs保存在同一個(gè)輸出文件中。4.2、順序保證
我們確保在給定的分區(qū)中,中間key/value pair數(shù)據(jù)的處理順序是按照key值增量順序處理的。這樣的順序保證對(duì)每個(gè)分成生成一個(gè)有序的輸出文件,這對(duì)于需要對(duì)輸出文件按key值隨機(jī)存取的應(yīng)用非常有意義,對(duì)在排序輸出的數(shù)據(jù)集也很有幫助。4.3、Combiner函數(shù)
在某些情況下,Map函數(shù)產(chǎn)生的中間key值的重復(fù)數(shù)據(jù)會(huì)占很大的比重,并且,用戶自定義的Reduce函數(shù)滿足結(jié)合律和交換律。在2.1節(jié)的 詞數(shù)統(tǒng)計(jì)程序是個(gè)很好的例子。由于詞頻率傾向于一個(gè)zipf分布(齊夫分布),每個(gè)Map任務(wù)將產(chǎn)生成千上萬(wàn)個(gè)這樣的記錄。 所 有的這些記錄將通過(guò)網(wǎng)絡(luò)被發(fā)送到一個(gè)單獨(dú)的Reduce任務(wù),然后由這個(gè)Reduce任務(wù)把所有這些記錄累加起來(lái)產(chǎn)生一個(gè)數(shù)字。我們?cè)试S用戶指定一個(gè)可選 的combiner函數(shù),combiner函數(shù)首先在本地將這些記錄進(jìn)行一次合并,然后將合并的結(jié)果再通過(guò)網(wǎng)絡(luò)發(fā)送出去。 Combiner函數(shù)在每臺(tái)執(zhí)行Map任務(wù)的機(jī)器上都會(huì)被執(zhí)行一次。一般情況下,Combiner和Reduce函數(shù)是一樣的。 Combiner函數(shù)和Reduce函數(shù)之間唯一的區(qū)別是MapReduce庫(kù)怎樣控制函數(shù)的輸出。Reduce函數(shù)的輸出被保存在最終的輸出文件里,而 Combiner函數(shù)的輸出被寫(xiě)到中間文件里,然后被發(fā)送給Reduce任務(wù)。部分的合并中間結(jié)果可以顯著的提高一些MapReduce操作的速度。附錄A包含一個(gè)使用combiner函數(shù)的例子。
4.4、輸入和輸出的類型
MapReduce庫(kù)支持幾種不同的格式的輸入數(shù)據(jù)。比如,文本模式的輸入數(shù)據(jù)的每一行被視為是一個(gè)key/value pair。key是文件的偏移量,value是那一行的內(nèi)容。另外一種常見(jiàn)的格式是以key進(jìn)行排序來(lái)存儲(chǔ)的key/value pair的序列。每種輸入類型的實(shí)現(xiàn)都必須能夠把輸入數(shù)據(jù)分割成數(shù)據(jù)片段,該數(shù)據(jù)片段能夠由單獨(dú)的Map任務(wù)來(lái)進(jìn)行后續(xù)處理(例如,文本模式的范圍分割必 須確保僅僅在每行的邊界進(jìn)行范圍分割)。雖然大多數(shù)MapReduce的使用者僅僅使用很少的預(yù)定義輸入類型就滿足要求了,但是使用者依然可以通過(guò)提供一 個(gè)簡(jiǎn)單的Reader接口實(shí)現(xiàn)就能夠支持一個(gè)新的輸入類型。Reader并非一定要從文件中讀取數(shù)據(jù),比如,我們可以很容易的實(shí)現(xiàn)一個(gè)從數(shù)據(jù)庫(kù)里讀記錄的Reader,或者從內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)讀取數(shù)據(jù)的Reader。
類似的,我們提供了一些預(yù)定義的輸出數(shù)據(jù)的類型,通過(guò)這些預(yù)定義類型能夠產(chǎn)生不同格式的數(shù)據(jù)。用戶采用類似添加新的輸入數(shù)據(jù)類型的方式增加新的輸出類型。
4.5、副作用
在某些情況下,MapReduce的使用者發(fā)現(xiàn),如果在Map和/或Reduce操作過(guò)程中增加輔助的輸出文件會(huì)比較省事。我們依靠程序writer把這種“副作用”變成原子的和冪等的 (alex注:冪等的指一個(gè)總是產(chǎn)生相同結(jié)果的數(shù)學(xué)運(yùn)算)。通常應(yīng)用程序首先把輸出結(jié)果寫(xiě)到一個(gè)臨時(shí)文件中,在輸出全部數(shù)據(jù)之后,在使用系統(tǒng)級(jí)的原子操作rename重新命名這個(gè)臨時(shí)文件。如果一個(gè)任務(wù)產(chǎn)生了多個(gè)輸出文件,我們沒(méi)有提供類似兩階段提交的原子操作支持這種情況。因此,對(duì)于會(huì)產(chǎn)生多個(gè)輸出文件、并且對(duì)于跨文件有一致性要求的任務(wù),都必須是確定性的任務(wù)。但是在實(shí)際應(yīng)用過(guò)程中,這個(gè)限制還沒(méi)有給我們帶來(lái)過(guò)麻煩。
4.6、跳過(guò)損壞的記錄
有時(shí)候,用戶程序中的bug導(dǎo)致Map或者Reduce函數(shù)在處理某些記錄的時(shí)候crash掉,MapReduce操作無(wú)法順利完成。慣常的做 法是修復(fù)bug后再次執(zhí)行MapReduce操作,但是,有時(shí)候找出這些bug并修復(fù)它們不是一件容易的事情;這些bug也許是在第三方庫(kù)里邊,而我們手 頭沒(méi)有這些庫(kù)的源代碼。而且在很多時(shí)候,忽略一些有問(wèn)題的記錄也是可以接受的,比如在一個(gè)巨大的數(shù)據(jù)集上進(jìn)行統(tǒng)計(jì)分析的時(shí)候。我們提供了一種執(zhí)行模式,在 這種模式下,為了保證保證整個(gè)處理能繼續(xù)進(jìn)行,MapReduce會(huì)檢測(cè)哪些記錄導(dǎo)致確定性的crash,并且跳過(guò)這些記錄不處理。每個(gè)worker進(jìn)程都設(shè)置了信號(hào)處理函數(shù)捕獲內(nèi)存段異常(segmentation violation)和總線錯(cuò)誤(bus error)。在執(zhí)行Map或者Reduce操作之前,MapReduce庫(kù)通過(guò)全局變量保存記錄序號(hào)。如果用戶程序觸發(fā)了一個(gè)系統(tǒng)信號(hào),消息處理函數(shù)將 用“最后一口氣”通過(guò)UDP包向master發(fā)送處理的最后一條記錄的序號(hào)。當(dāng)master看到在處理某條特定記錄不止失敗一次時(shí),master就標(biāo)志著 條記錄需要被跳過(guò),并且在下次重新執(zhí)行相關(guān)的Map或者Reduce任務(wù)的時(shí)候跳過(guò)這條記錄。
4.7、本地執(zhí)行
調(diào)試Map和Reduce函數(shù)的bug是非常困難的,因?yàn)閷?shí)際執(zhí)行操作時(shí)不但是分布在系統(tǒng)中執(zhí)行的,而且通常是在好幾千臺(tái)計(jì)算機(jī)上執(zhí)行,具體的 執(zhí)行位置是由master進(jìn)行動(dòng)態(tài)調(diào)度的,這又大大增加了調(diào)試的難度。為了簡(jiǎn)化調(diào)試、profile和小規(guī)模測(cè)試,我們開(kāi)發(fā)了一套MapReduce庫(kù)的 本地實(shí)現(xiàn)版本,通過(guò)使用本地版本的MapReduce庫(kù),MapReduce操作在本地計(jì)算機(jī)上順序的執(zhí)行。用戶可以控制MapReduce操作的執(zhí)行, 可以把操作限制到特定的Map任務(wù)上。用戶通過(guò)設(shè)定特別的標(biāo)志來(lái)在本地執(zhí)行他們的程序,之后就可以很容易的使用本地調(diào)試和測(cè)試工具(比如gdb)。4.8、狀態(tài)信息
master使用嵌入式的HTTP服務(wù)器(如Jetty)顯示一組狀態(tài)信息頁(yè)面,用戶可以監(jiān)控各種執(zhí)行狀態(tài)。狀態(tài)信息頁(yè)面顯示了包括計(jì)算執(zhí)行的 進(jìn)度,比如已經(jīng)完成了多少任務(wù)、有多少任務(wù)正在處理、輸入的字節(jié)數(shù)、中間數(shù)據(jù)的字節(jié)數(shù)、輸出的字節(jié)數(shù)、處理百分比等等。頁(yè)面還包含了指向每個(gè)任務(wù)的 stderr和stdout文件的鏈接。用戶根據(jù)這些數(shù)據(jù)預(yù)測(cè)計(jì)算需要執(zhí)行大約多長(zhǎng)時(shí)間、是否需要增加額外的計(jì)算資源。這些頁(yè)面也可以用來(lái)分析什么時(shí)候計(jì) 算執(zhí)行的比預(yù)期的要慢。另外,處于最頂層的狀態(tài)頁(yè)面顯示了哪些worker失效了,以及他們失效的時(shí)候正在運(yùn)行的Map和Reduce任務(wù)。這些信息對(duì)于調(diào)試用戶代碼中的bug很有幫助。
4.9、計(jì)數(shù)器
MapReduce庫(kù)使用計(jì)數(shù)器統(tǒng)計(jì)不同事件發(fā)生次數(shù)。比如,用戶可能想統(tǒng)計(jì)已經(jīng)處理了多少個(gè)單詞、已經(jīng)索引的多少篇German文檔等等。 為了使用這個(gè)特性,用戶在程序中創(chuàng)建一個(gè)命名的計(jì)數(shù)器對(duì)象,在Map和Reduce函數(shù)中相應(yīng)的增加計(jì)數(shù)器的值。例如:Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
?for each word w in contents:
??if (IsCapitalized(w)):
???uppercase->Increment();
??EmitIntermediate(w, “1″);
計(jì)數(shù)器機(jī)制對(duì)于MapReduce操作的完整性檢查非常有用。比如,在某些MapReduce操作中,用戶需要確保輸出的key value pair精確的等于輸入的key value pair,或者處理的German文檔數(shù)量在處理的整個(gè)文檔數(shù)量中屬于合理范圍。
5、性能
本節(jié)我們用在一個(gè)大型集群上運(yùn)行的兩個(gè)計(jì)算來(lái)衡量MapReduce的性能。一個(gè)計(jì)算在大約1TB的數(shù)據(jù)中進(jìn)行特定的模式匹配,另一個(gè)計(jì)算對(duì)大約1TB的數(shù)據(jù)進(jìn)行排序。 這兩個(gè)程序在大量的使用MapReduce的實(shí)際應(yīng)用中是非常典型的 — 一類是對(duì)數(shù)據(jù)格式進(jìn)行轉(zhuǎn)換,從一種表現(xiàn)形式轉(zhuǎn)換為另外一種表現(xiàn)形式;另一類是從海量數(shù)據(jù)中抽取少部分的用戶感興趣的數(shù)據(jù)。5.1、集群配置
所有這些程序都運(yùn)行在一個(gè)大約由1800臺(tái)機(jī)器構(gòu)成的集群上。每臺(tái)機(jī)器配置2個(gè)2G主頻、支持超線程的Intel Xeon處理器,4GB的物理內(nèi)存,兩個(gè)160GB的IDE硬盤和一個(gè)千兆以太網(wǎng)卡。這些機(jī)器部署在一個(gè)兩層的樹(shù)形交換網(wǎng)絡(luò)中,在root節(jié)點(diǎn)大概有 100-200GBPS的傳輸帶寬。所有這些機(jī)器都采用相同的部署(對(duì)等部署),因此任意兩點(diǎn)之間的網(wǎng)絡(luò)來(lái)回時(shí)間小于1毫秒。在4GB內(nèi)存里,大概有1-1.5G用于運(yùn)行在集群上的其他任務(wù)。測(cè)試程序在周末下午開(kāi)始執(zhí)行,這時(shí)主機(jī)的CPU、磁盤和網(wǎng)絡(luò)基本上處于空閑狀態(tài)。
5.2、GREP
這個(gè)分布式的grep程序需要掃描大概10的10次方個(gè)由100個(gè)字節(jié)組成的記錄,查找出現(xiàn)概率較小的3個(gè)字符的模式(這個(gè)模式在92337個(gè)記錄中出現(xiàn))。輸入數(shù)據(jù)被拆分成大約64M的Block(M=15000),整個(gè)輸出數(shù)據(jù)存放在一個(gè)文件中(R=1)。圖2顯示了這個(gè)運(yùn)算隨時(shí)間的處理過(guò)程。其中Y軸表示輸入數(shù)據(jù)的處理速度。處理速度隨著參與MapReduce計(jì)算的機(jī)器數(shù)量的增加而增加,當(dāng) 1764臺(tái)worker參與計(jì)算的時(shí),處理速度達(dá)到了30GB/s。當(dāng)Map任務(wù)結(jié)束的時(shí)候,即在計(jì)算開(kāi)始后80秒,輸入的處理速度降到0。整個(gè)計(jì)算過(guò)程 從開(kāi)始到結(jié)束一共花了大概150秒。這包括了大約一分鐘的初始啟動(dòng)階段。初始啟動(dòng)階段消耗的時(shí)間包括了是把這個(gè)程序傳送到各個(gè)worker機(jī)器上的時(shí)間、 等待GFS文件系統(tǒng)打開(kāi)1000個(gè)輸入文件集合的時(shí)間、獲取相關(guān)的文件本地位置優(yōu)化信息的時(shí)間。
5.3、排序
排序程序處理10的10次方個(gè)100個(gè)字節(jié)組成的記錄(大概1TB的數(shù)據(jù))。這個(gè)程序模仿TeraSort benchmark[10]。 排序程序由不到50行代碼組成。只有三行的Map函數(shù)從文本行中解析出10個(gè)字節(jié)的key值作為排序的key,并且把這個(gè)key和原始文本行作 為中間的key/value pair值輸出。我們使用了一個(gè)內(nèi)置的恒等函數(shù)作為Reduce操作函數(shù)。這個(gè)函數(shù)把中間的key/value pair值不作任何改變輸出。最終排序結(jié)果輸出到兩路復(fù)制的GFS文件系統(tǒng)(也就是說(shuō),程序輸出2TB的數(shù)據(jù))。 如前所述,輸入數(shù)據(jù)被分成64MB的Block(M=15000)。我們把排序后的輸出結(jié)果分區(qū)后存儲(chǔ)到4000個(gè)文件(R=4000)。分區(qū)函數(shù)使用key的原始字節(jié)來(lái)把數(shù)據(jù)分區(qū)到R個(gè)片段中。在這個(gè)benchmark測(cè)試中,我們使用的分區(qū)函數(shù)知道key的分區(qū)情況。通常對(duì)于排序程序來(lái)說(shuō),我們會(huì)增加一個(gè)預(yù)處理的MapReduce操作用于采樣key值的分布情況,通過(guò)采樣的數(shù)據(jù)來(lái)計(jì)算對(duì)最終排序處理的分區(qū)點(diǎn)。
圖三(a)顯示了這個(gè)排序程序的正常執(zhí)行過(guò)程。左上的圖顯示了輸入數(shù)據(jù)讀取的速度。數(shù)據(jù)讀取速度峰值會(huì)達(dá)到13GB/s,并且所有Map任務(wù)完 成之后,即大約200秒之后迅速滑落到0。值得注意的是,排序程序輸入數(shù)據(jù)讀取速度小于分布式grep程序。這是因?yàn)榕判虺绦虻腗ap任務(wù)花了大約一半的 處理時(shí)間和I/O帶寬把中間輸出結(jié)果寫(xiě)到本地硬盤。相應(yīng)的分布式grep程序的中間結(jié)果輸出幾乎可以忽略不計(jì)。 左邊中間的圖顯示了中間數(shù)據(jù)從Map任務(wù)發(fā)送到Reduce任務(wù)的網(wǎng)絡(luò)速度。這個(gè)過(guò)程從第一個(gè)Map任務(wù)完成之后就開(kāi)始緩慢啟動(dòng)了。圖示的第一 個(gè)高峰是啟動(dòng)了第一批大概1700個(gè)Reduce任務(wù)(整個(gè)MapReduce分布到大概1700臺(tái)機(jī)器上,每臺(tái)機(jī)器1次最多執(zhí)行1個(gè)Reduce任 務(wù))。排序程序運(yùn)行大約300秒后,第一批啟動(dòng)的Reduce任務(wù)有些完成了,我們開(kāi)始執(zhí)行剩下的Reduce任務(wù)。所有的處理在大約600秒后結(jié)束。 左下圖表示Reduce任務(wù)把排序后的數(shù)據(jù)寫(xiě)到最終的輸出文件的速度。在第一個(gè)排序階段結(jié)束和數(shù)據(jù)開(kāi)始寫(xiě)入磁盤之間有一個(gè)小的延時(shí),這是因?yàn)?worker機(jī)器正在忙于排序中間數(shù)據(jù)。磁盤寫(xiě)入速度在2-4GB/s持續(xù)一段時(shí)間。輸出數(shù)據(jù)寫(xiě)入磁盤大約持續(xù)850秒。計(jì)入初始啟動(dòng)部分的時(shí)間,整個(gè)運(yùn) 算消耗了891秒。這個(gè)速度和TeraSort benchmark[18]的最高紀(jì)錄1057秒相差不多。還有一些值得注意的現(xiàn)象:輸入數(shù)據(jù)的讀取速度比排序速度和輸出數(shù)據(jù)寫(xiě)入磁盤速度要高不少,這是因?yàn)槲覀兊妮斎霐?shù)據(jù)本地化優(yōu)化策略起了作用 — 絕大部分?jǐn)?shù)據(jù)都是從本地硬盤讀取的,從而節(jié)省了網(wǎng)絡(luò)帶寬。排序速度比輸出數(shù)據(jù)寫(xiě)入到磁盤的速度快,這是因?yàn)檩敵鰯?shù)據(jù)寫(xiě)了兩份(我們使用了2路的GFS文件 系統(tǒng),寫(xiě)入復(fù)制節(jié)點(diǎn)的原因是為了保證數(shù)據(jù)可靠性和可用性)。我們把輸出數(shù)據(jù)寫(xiě)入到兩個(gè)復(fù)制節(jié)點(diǎn)的原因是因?yàn)檫@是底層文件系統(tǒng)的保證數(shù)據(jù)可靠性和可用性的實(shí) 現(xiàn)機(jī)制。如果底層文件系統(tǒng)使用類似容錯(cuò)編碼[14](erasure coding)的方式而不是復(fù)制的方式保證數(shù)據(jù)的可靠性和可用性,那么在輸出數(shù)據(jù)寫(xiě)入磁盤的時(shí)候,就可以降低網(wǎng)絡(luò)帶寬的使用。
5.4、高效的backup任務(wù)
圖三(b)顯示了關(guān)閉了備用任務(wù)后排序程序執(zhí)行情況。執(zhí)行的過(guò)程和圖3(a)很相似,除了輸出數(shù)據(jù)寫(xiě)磁盤的動(dòng)作在時(shí)間上拖了一個(gè)很長(zhǎng)的尾巴,而 且在這段時(shí)間里,幾乎沒(méi)有什么寫(xiě)入動(dòng)作。在960秒后,只有5個(gè)Reduce任務(wù)沒(méi)有完成。這些拖后腿的任務(wù)又執(zhí)行了300秒才完成。整個(gè)計(jì)算消耗了 1283秒,多了44%的執(zhí)行時(shí)間。5.5、失效的機(jī)器
在圖三(c)中演示的排序程序執(zhí)行的過(guò)程中,我們?cè)诔绦蜷_(kāi)始后幾分鐘有意的kill了1746個(gè)worker中的200個(gè)。集群底層的調(diào)度立刻在這些機(jī)器上重新開(kāi)始新的worker處理進(jìn)程(因?yàn)橹皇莣orker機(jī)器上的處理進(jìn)程被kill了,機(jī)器本身還在工作)。 圖三(c)顯示出了一個(gè)“負(fù)”的輸入數(shù)據(jù)讀取速度,這是因?yàn)橐恍┮呀?jīng)完成的Map任務(wù)丟失了(由于相應(yīng)的執(zhí)行Map任務(wù)的worker進(jìn)程被 kill了),需要重新執(zhí)行這些任務(wù)。相關(guān)Map任務(wù)很快就被重新執(zhí)行了。整個(gè)運(yùn)算在933秒內(nèi)完成,包括了初始啟動(dòng)時(shí)間(只比正常執(zhí)行多消耗了5%的時(shí) 間)。6、經(jīng)驗(yàn)
我們?cè)?003年1月完成了第一個(gè)版本的MapReduce庫(kù),在2003年8月的版本有了顯著的增強(qiáng),這包括了輸入數(shù)據(jù)本地優(yōu)化、 worker機(jī)器之間的動(dòng)態(tài)負(fù)載均衡等等。從那以后,我們驚喜的發(fā)現(xiàn),MapReduce庫(kù)能廣泛應(yīng)用于我們?nèi)粘9ぷ髦杏龅降母黝悊?wèn)題。它現(xiàn)在在 Google內(nèi)部各個(gè)領(lǐng)域得到廣泛應(yīng)用,包括:- 大規(guī)模機(jī)器學(xué)習(xí)問(wèn)題
- Google News和Froogle產(chǎn)品的集群?jiǎn)栴}
- 從公眾查詢產(chǎn)品(比如Google的Zeitgeist)的報(bào)告中抽取數(shù)據(jù)。
- 從大量的新應(yīng)用和新產(chǎn)品的網(wǎng)頁(yè)中提取有用信息(比如,從大量的位置搜索網(wǎng)頁(yè)中抽取地理位置信息)。
- 大規(guī)模的圖形計(jì)算。
在每個(gè)任務(wù)結(jié)束的時(shí)候,MapReduce庫(kù)統(tǒng)計(jì)計(jì)算資源的使用狀況。在表1,我們列出了2004年8月份MapReduce運(yùn)行的任務(wù)所占用的相關(guān)資源。
6.1、大規(guī)模索引
到目前為止,MapReduce最成功的應(yīng)用就是重寫(xiě)了Google網(wǎng)絡(luò)搜索服務(wù)所使用到的index系統(tǒng)。索引系統(tǒng)的輸入數(shù)據(jù)是網(wǎng)絡(luò)爬蟲(chóng)抓取回來(lái)的海量的文檔,這些文檔數(shù)據(jù)都保存在GFS文件系統(tǒng)里。這些文檔原始內(nèi)容 (alex注:raw contents,我認(rèn)為就是網(wǎng)頁(yè)中的剔除html標(biāo)記后的內(nèi)容、pdf和word等有格式文檔中提取的文本內(nèi)容等)的大小超過(guò)了20TB。索引程序是通過(guò)一系列的MapReduce操作(大約5到10次)來(lái)建立索引。使用MapReduce(替換上一個(gè)特別設(shè)計(jì)的、分布式處理的索引程序)帶來(lái)這些好處:- 實(shí)現(xiàn)索引部分的代碼簡(jiǎn)單、小巧、容易理解,因?yàn)閷?duì)于容錯(cuò)、分布式以及并行計(jì)算的處理都是MapReduce庫(kù)提供的。比如,使用MapReduce庫(kù),計(jì)算的代碼行數(shù)從原來(lái)的3800行C++代碼減少到大概700行代碼。
- MapReduce庫(kù)的性能已經(jīng)足夠好了,因此我們可以把在概念上不相關(guān)的計(jì)算步驟分開(kāi)處理,而不是混在一起以期減少數(shù)據(jù)傳遞的額外消耗。概念 上不相關(guān)的計(jì)算步驟的隔離也使得我們可以很容易改變索引處理方式。比如,對(duì)之前的索引系統(tǒng)的一個(gè)小更改可能要耗費(fèi)好幾個(gè)月的時(shí)間,但是在使用 MapReduce的新系統(tǒng)上,這樣的更改只需要花幾天時(shí)間就可以了。
- 索引系統(tǒng)的操作管理更容易了。因?yàn)橛蓹C(jī)器失效、機(jī)器處理速度緩慢、以及網(wǎng)絡(luò)的瞬間阻塞等引起的絕大部分問(wèn)題都已經(jīng)由MapReduce庫(kù)解決了,不再需要操作人員的介入了。另外,我們可以通過(guò)在索引系統(tǒng)集群中增加機(jī)器的簡(jiǎn)單方法提高整體處理性能。
7、相關(guān)工作
很多系統(tǒng)都提供了嚴(yán)格的編程模式,并且通過(guò)對(duì)編程的嚴(yán)格限制來(lái)實(shí)現(xiàn)并行計(jì)算。例如,一個(gè)結(jié)合函數(shù)可以通過(guò)把N個(gè)元素的數(shù)組的前綴在N個(gè)處理器上使用并行前綴算法,在log N的時(shí)間內(nèi)計(jì)算完[6,9,13] (alex注:完全沒(méi)有明白作者在說(shuō)啥,具體參考相關(guān)6、9、13文檔)。MapReduce可以看作是我們結(jié)合在真實(shí)環(huán)境下處理海量數(shù)據(jù)的經(jīng)驗(yàn),對(duì)這些經(jīng)典模型進(jìn)行簡(jiǎn)化和萃取的成果。更加值得驕傲的是,我們還實(shí)現(xiàn)了基于上千臺(tái)處理器的集群的容錯(cuò)處理。相比而言,大部分并發(fā)處理系統(tǒng)都只在小規(guī)模的集群上實(shí)現(xiàn),并且把容錯(cuò)處理交給了程序員。 Bulk Synchronous Programming[17]和一些MPI原語(yǔ)[11]提供了更高級(jí)別的并行處理抽象,可以更容易寫(xiě)出并行處理的程序。MapReduce和這些系統(tǒng)的 關(guān)鍵不同之處在于,MapReduce利用限制性編程模式實(shí)現(xiàn)了用戶程序的自動(dòng)并發(fā)處理,并且提供了透明的容錯(cuò)處理。 我們數(shù)據(jù)本地優(yōu)化策略的靈感來(lái)源于active disks[12,15]等技術(shù),在active disks中,計(jì)算任務(wù)是盡量推送到數(shù)據(jù)存儲(chǔ)的節(jié)點(diǎn)處理 (alex注:即靠近數(shù)據(jù)源處理),這樣就減少了網(wǎng)絡(luò)和IO子系統(tǒng)的吞吐量。我們?cè)趻燧d幾個(gè)硬盤的普通機(jī)器上執(zhí)行我們的運(yùn)算,而不是在磁盤處理器上執(zhí)行我們的工作,但是達(dá)到的目的一樣的。 我們的備用任務(wù)機(jī)制和Charlotte System[3]提出的eager調(diào)度機(jī)制比較類似。Eager調(diào)度機(jī)制的一個(gè)缺點(diǎn)是如果一個(gè)任務(wù)反復(fù)失效,那么整個(gè)計(jì)算就不能完成。我們通過(guò)忽略引起故障的記錄的方式在某種程度上解決了這個(gè)問(wèn)題。 MapReduce的實(shí)現(xiàn)依賴于一個(gè)內(nèi)部的集群管理系統(tǒng),這個(gè)集群管理系統(tǒng)負(fù)責(zé)在一個(gè)超大的、共享機(jī)器的集群上分布和運(yùn)行用戶任務(wù)。雖然這個(gè)不是本論文的重點(diǎn),但是有必要提一下,這個(gè)集群管理系統(tǒng)在理念上和其它系統(tǒng),如Condor[16]是一樣。 MapReduce庫(kù)的排序機(jī)制和NOW-Sort[1]的操作上很類似。讀取輸入源的機(jī)器(map workers)把待排序的數(shù)據(jù)進(jìn)行分區(qū)后,發(fā)送到R個(gè)Reduce worker中的一個(gè)進(jìn)行處理。每個(gè)Reduce worker在本地對(duì)數(shù)據(jù)進(jìn)行排序(盡可能在內(nèi)存中排序)。當(dāng)然,NOW-Sort沒(méi)有給用戶自定義的Map和Reduce函數(shù)的機(jī)會(huì),因此不具備 MapReduce庫(kù)廣泛的實(shí)用性。 River[2]提供了一個(gè)編程模型:處理進(jìn)程通過(guò)分布式隊(duì)列傳送數(shù)據(jù)的方式進(jìn)行互相通訊。和MapReduce類似,River系統(tǒng)嘗試在不 對(duì)等的硬件環(huán)境下,或者在系統(tǒng)顛簸的情況下也能提供近似平均的性能。River是通過(guò)精心調(diào)度硬盤和網(wǎng)絡(luò)的通訊來(lái)平衡任務(wù)的完成時(shí)間。MapReduce 庫(kù)采用了其它的方法。通過(guò)對(duì)編程模型進(jìn)行限制,MapReduce框架把問(wèn)題分解成為大量的“小”任務(wù)。這些任務(wù)在可用的worker集群上動(dòng)態(tài)的調(diào)度, 這樣快速的worker就可以執(zhí)行更多的任務(wù)。通過(guò)對(duì)編程模型進(jìn)行限制,我們可用在工作接近完成的時(shí)候調(diào)度備用任務(wù),縮短在硬件配置不均衡的情況下縮小整 個(gè)操作完成的時(shí)間(比如有的機(jī)器性能差、或者機(jī)器被某些操作阻塞了)。 BAD-FS[5]采用了和MapReduce完全不同的編程模式,它是面向廣域網(wǎng) (alex注:wide-area network)的。不過(guò),這兩個(gè)系統(tǒng)有兩個(gè)基礎(chǔ)功能很類似。(1)兩個(gè)系統(tǒng)采用重新執(zhí)行的方式來(lái)防止由于失效導(dǎo)致的數(shù)據(jù)丟失。(2)兩個(gè)都使用數(shù)據(jù)本地化調(diào)度策略,減少網(wǎng)絡(luò)通訊的數(shù)據(jù)量。TACC[7]是一個(gè)用于簡(jiǎn)化構(gòu)造高可用性網(wǎng)絡(luò)服務(wù)的系統(tǒng)。和MapReduce一樣,它也依靠重新執(zhí)行機(jī)制來(lái)實(shí)現(xiàn)的容錯(cuò)處理。
8、結(jié)束語(yǔ)
MapReduce編程模型在Google內(nèi)部成功應(yīng)用于多個(gè)領(lǐng)域。我們把這種成功歸結(jié)為幾個(gè)方面:首先,由于MapReduce封裝了并行處 理、容錯(cuò)處理、數(shù)據(jù)本地化優(yōu)化、負(fù)載均衡等等技術(shù)難點(diǎn)的細(xì)節(jié),這使得MapReduce庫(kù)易于使用。即便對(duì)于完全沒(méi)有并行或者分布式系統(tǒng)開(kāi)發(fā)經(jīng)驗(yàn)的程序員 而言;其次,大量不同類型的問(wèn)題都可以通過(guò)MapReduce簡(jiǎn)單的解決。比如,MapReduce用于生成Google的網(wǎng)絡(luò)搜索服務(wù)所需要的數(shù)據(jù)、用 來(lái)排序、用來(lái)數(shù)據(jù)挖掘、用于機(jī)器學(xué)習(xí),以及很多其它的系統(tǒng);第三,我們實(shí)現(xiàn)了一個(gè)在數(shù)千臺(tái)計(jì)算機(jī)組成的大型集群上靈活部署運(yùn)行的MapReduce。這個(gè) 實(shí)現(xiàn)使得有效利用這些豐富的計(jì)算資源變得非常簡(jiǎn)單,因此也適合用來(lái)解決Google遇到的其他很多需要大量計(jì)算的問(wèn)題。我們也從MapReduce開(kāi)發(fā)過(guò)程中學(xué)到了不少東西。首先,約束編程模式使得并行和分布式計(jì)算非常容易,也易于構(gòu)造容錯(cuò)的計(jì)算環(huán)境;其次,網(wǎng)絡(luò)帶 寬是稀有資源。大量的系統(tǒng)優(yōu)化是針對(duì)減少網(wǎng)絡(luò)傳輸量為目的的:本地優(yōu)化策略使大量的數(shù)據(jù)從本地磁盤讀取,中間文件寫(xiě)入本地磁盤、并且只寫(xiě)一份中間文件也節(jié) 約了網(wǎng)絡(luò)帶寬;第三,多次執(zhí)行相同的任務(wù)可以減少性能緩慢的機(jī)器帶來(lái)的負(fù)面影響(alex注:即硬件配置的不平衡),同時(shí)解決了由于機(jī)器失效導(dǎo)致的數(shù)據(jù)丟失問(wèn)題。
9、感謝
(alex注:還是原汁原味的感謝詞比較好,這個(gè)就不翻譯了)Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper.The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports.10、參考資料
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
附錄A、單詞頻率統(tǒng)計(jì)
本節(jié)包含了一個(gè)完整的程序,用于統(tǒng)計(jì)在一組命令行指定的輸入文件中,每一個(gè)不同的單詞出現(xiàn)頻率。#include “mapreduce/mapreduce.h”
// User’s map function
class WordCounter : public Mapper {
?public:
??virtual void Map(const MapInput& input) {
???const string& text = input.value();
???const int n = text.size();
???for (int i = 0; i < n; ) {
????// Skip past leading whitespace
????while ((i < n) && isspace(text[i]))
?????i++;
???// Find word end
???int start = i;
???while ((i < n) && !isspace(text[i]))
????i++;
???if (start < i)
????Emit(text.substr(start,i-start),”1″);
??}
?}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
?virtual void Reduce(ReduceInput* input) {
??// Iterate over all entries with the
??// same key and add the values
??int64 value = 0;
??while (!input->done()) {
???value += StringToInt(input->value());
???input->NextValue();
??}
??// Emit sum for input->key()
??Emit(IntToString(value));
?}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
?ParseCommandLineFlags(argc, argv);
?
?MapReduceSpecification spec;
?
?// Store list of input files into “spec”
?for (int i = 1; i < argc; i++) {
??MapReduceInput* input = spec.add_input();
??input->set_format(“text”);
??input->set_filepattern(argv[i]);
??input->set_mapper_class(“WordCounter”);
?}
?// Specify the output files:
?// /gfs/test/freq-00000-of-00100
?// /gfs/test/freq-00001-of-00100
?// …
?MapReduceOutput* out = spec.output();
?out->set_filebase(“/gfs/test/freq”);
?out->set_num_tasks(100);
?out->set_format(“text”);
?out->set_reducer_class(“Adder”);
?
?// Optional: do partial sums within map
?// tasks to save network bandwidth
?out->set_combiner_class(“Adder”);
?// Tuning parameters: use at most 2000
?// machines and 100 MB of memory per task
?spec.set_machines(2000);
?spec.set_map_megabytes(100);
?spec.set_reduce_megabytes(100);
?
?// Now run it
?MapReduceResult result;
?if (!MapReduce(spec, &result)) abort();
?
?// Done: ‘result’ structure contains info
?// about counters, time taken, number of
?// machines used, etc.
?return 0;
}
轉(zhuǎn)載于:https://www.cnblogs.com/kevinX/p/5458307.html
總結(jié)
以上是生活随笔為你收集整理的Google MapReduce的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Ruby Cucumber环境
- 下一篇: 信用卡可以跨行取现吗