MapReduce 论文翻译
目錄
1?介紹
2?編程模型
3?實現
4?技巧
5?性能
6?經驗
7?相關工作
8?結束語
附錄A、單詞頻率統計
原文下載鏈接:https://pan.baidu.com/s/1L9hclpiqQ-NVjmbqSz3mMQ? 提取碼:xhwn?
?
摘要
MapReduce是一個編程模型,也是一個處理和生成超大數據集的算法模型的相關實現。用戶首先創建一個Map函數處理一個基于key/value pair的數據集合,輸出中間的基于key/value pair的數據集合;然后再創建一個Reduce函數用來合并所有具有相同中間key值的中間value值。現實世界中有很多滿足上述處理模型的例子,本論文將詳細描述這個模型。
MapReduce架構的程序能夠在大量的配置普通的計算機上實現并行化處理。這個系統在運行時只關心:如何分割輸入數據,在大量計算機組成的集群上的調度,集群中計算機的錯誤處理,管理集群中計算機之間必要的通信。采用MapReduce架構可以使那些沒有并行計算和分布式處理系統開發經驗的程序員有效利用分布式系統的豐富資源。
我們的MapReduce實現運行在規模可以靈活調整的由普通機器組成的集群上:一個典型的MapReduce計算往往由幾千臺機器組成、處理以TB計算的數據。程序員發現這個系統非常好用:已經實現了數以百計的MapReduce程序,在Google的集群上,每天都有1000多個MapReduce程序在執行。
?
1?介紹
在過去的5年里,包括本文作者在內的Google的很多程序員,為了處理海量的原始數據,已經實現了數以百計的、專用的計算方法。這些計算方法用來處理大量的原始數據,比如,文檔抓取(類似網絡爬蟲的程序)、Web請求日志等;也為了計算處理各種類型的衍生數據,比如倒排索引、Web文檔的圖結構的各種表示形勢、每臺主機上網絡爬蟲抓取的頁面數量的匯總、每天被請求的最多的查詢的集合等。大多數這樣的數據處理運算在概念上很容易理解。然而由于輸入的數據量巨大,因此要想在可接受的時間內完成運算,只有將這些計算分布在成百上千的主機上。如何處理并行計算、如何分發數據、如何處理錯誤?所有這些問題綜合在一起,需要大量的代碼處理,因此也使得原本簡單的運算變得難以處理。
為了解決上述復雜的問題,我們設計一個新的抽象模型,使用這個抽象模型,我們只要表述我們想要執行的簡單運算即可,而不必關心并行計算、容錯、數據分布、負載均衡等復雜的細節,這些問題都被封裝在一個庫里面。設計這個抽象模型的靈感來自Lisp和許多其它函數式語言的Map和Reduce的原語。我們意識到我們大多數的運算都包含這樣的操作:在輸入數據的"邏輯"記錄上應用Map操作得出一個中間key/value pair集合,然后在所有具有相同key值的value值上應用Reduce操作,從而達到合并中間的數據,得到一個想要的結果的目的。使用MapReduce模型,再結合用戶實現的Map和Reduce函數,我們就可以非常容易的實現大規模并行化計算;通過MapReduce模型自帶的"再次執行"功能,也提供了初級的容災實現方案。
這個工作(實現一個MapReduce框架模型)的主要貢獻是通過簡單的接口來實現自動的、并行化和大規模的分布式計算,通過使用MapReduce模型接口實現在大量普通的PC機上高性能計算。
第二部分描述基本的編程模型和一些使用案例。第三部分描述了一個基于我們集群的計算環境所定制的MapReduce接口的實現。第四部分描述我們認為在MapReduce編程模型中一些實用的技巧。第五部分對于各種不同的任務,測量我們MapReduce實現的性能。第六部分揭示了在Google內部如何使用MapReduce作為基礎重寫我們的索引系統產品,包括其它一些使用MapReduce的經驗。第七部分討論相關的和未來的工作。
?
2?編程模型
MapReduce編程模型的原理是:利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合。MapReduce庫的用戶用兩個函數表達這個計算:Map和Reduce。
用戶自定義的Map函數接受一個輸入的key/value pair值,然后產生一個中間key/value pair值的集合(takes an input pair and produces a set of intermediate key/value pairs.)。MapReduce庫把所有具有相同intermediate key值I的intermediate value值集合在一起后傳遞給Reduce函數。
用戶自定義的Reduce函數接受一個中間key的值(假設此值為I)和相關的一個value值的集合。Reduce函數合并這些value值,形成一個較小的value值的集合。一般每次Reduce函數調用只產生0或1個輸出的value值。通常,我們通過一個迭代器把中間value值提供給Reduce函數,這樣我們就可以處理無法全部放入內存中的大量的value值的集合。
?
2.1 例子
例如,計算一個大的文檔集合中每個單詞出現的次數,下面是偽代碼段:
map(String key, String value): // key: document name // value: document contentsfor each word w in value:EmitIntermediate(w, "1");reduce(String key, Iterator values): // key: a word // values: a list of countsint result = 0;for each v in values:result += ParseInt(v);Emit(AsString(result));Map函數輸出文檔中的每個詞、以及這個詞的出現次數(在這個簡單的例子里就是1)。Reduce函數把Map函數產生的每一個特定的詞的計數累加起來。
另外,用戶編寫代碼,使用輸入和輸出文件的名字、可選的調節參數來完成一個符合MapReduce模型規范的對象,然后調用MapReduce函數,并把這個規范對象傳遞給它。用戶的代碼和MapReduce庫鏈接在一起(用C++實現)。附錄A包含了這個實例的全部程序代碼。
?
2.2?類型
盡管在前面例子的偽代碼中使用了以字符串表示的輸入輸出值,但是在概念上,用戶定義的Map和Reduce函數都有相關聯的類型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
比如,輸入的key和value值與輸出的key和value值在類型上推導的域不同。此外,中間key和value值與輸出key和value值在類型上推導的域相同。(注:參考Hadoop、KFS等實現,map和reduce都使用了泛型,因此,domain理解成類型推導的域)。我們的C++中使用字符串類型作為用戶自定義函數的輸入輸出,用戶在自己的代碼中對字符串進行適當的類型轉換。
?
2.3?更多的例子
這里還有一些有趣的簡單例子,可以很容易的使用MapReduce模型來表示:
?
3?實現
MapReduce模型可以有多種不同的實現方式。如何正確選擇取決于具體的環境。例如,一種實現方式適用于小型的共享內存方式的機器,另外一種實現方式則適用于大型NUMA架構的多處理器的主機,而有的實現方式更適合大型的網絡連接集群。
本章節描述一個適用于Google內部廣泛使用的運算環境的實現:用以太網交換機連接、由普通PC機組成的大型集群。在我們的環境里包括:
1.x86架構、運行Linux操作系統、雙處理器、2-4GB內存的機器。
2.普通的網絡硬件設備,每個機器的帶寬為百兆或者千兆,但是遠小于網絡的平均帶寬的一半。
3.集群中包含成百上千的機器,因此,機器故障是常態。
4.存儲為廉價的內置IDE硬盤。一個內部分布式文件系統用來管理存儲在這些磁盤上的數據。文件系統通過數據復制來在不可靠的硬件上保證數據的可靠性和有效性。
5.用戶提交工作(job)給調度系統。每個工作(job)都包含一系列的任務(task),調度系統將這些任務調度到集群中多臺可用的機器上。
?
3.1?執行概括
通過將Map調用的輸入數據自動分割為M個數據片段的集合,Map調用被分布到多臺機器上執行。輸入的數據片段能夠在不同的機器上并行處理。使用分區函數將Map調用產生的中間key值分成R個不同分區(例如,hash(key) mod R),Reduce調用也被分布到多臺機器上執行。分區數量(R)和分區函數由用戶來指定。
圖1展示了我們的MapReduce實現中操作的全部流程。當用戶調用MapReduce函數時,將發生下面的一系列動作(下面的序號和圖1中的序號一一對應):
1.用戶程序首先調用的MapReduce庫將輸入文件分成M個數據片段,每個數據片段的大小一般從 16MB到64MB(可以通過可選的參數來控制每個數據片段的大小)。然后用戶程序在集群中創建大量的程序副本。
2.這些程序副本中的有一個特殊的程序——Master。副本中其它的程序都是worker程序,由Master分配任務。有M個Map任務和R個Reduce任務將被分配,Master將一個Map任務或Reduce任務分配給一個空閑的worker。
3.被分配了map任務的worker程序讀取相關的輸入數據片段,從輸入的數據片段中解析出key/value?pair,然后把key/value pair傳遞給用戶自定義的Map函數,由Map函數生成并輸出的中間key/value?pair,并緩存在內存中。
4.緩存中的key/value pair通過分區函數分成R個區域,之后周期性的寫入到本地磁盤上。緩存的key/value pair在本地磁盤上的存儲位置將被回傳給Master,由Master負責把這些存儲位置再傳送給Reduce worker。
5.當Reduce worker程序接收到Master程序發來的數據存儲位置信息后,使用RPC從Map worker所在主機的磁盤上讀取這些緩存數據。當Reduce worker讀取了所有的中間數據后,通過對key進行排序后使得具有相同key值的數據聚合在一起。由于許多不同的key值會映射到相同的Reduce任務上,因此必須進行排序。如果中間數據太大無法在內存中完成排序,那么就要在外部進行排序。
6.Reduce worker程序遍歷排序后的中間數據,對于每一個唯一的中間key值,Reduce worker程序將這個key值和它相關的中間value值的集合傳遞給用戶自定義的Reduce函數。Reduce函數的輸出被追加到所屬分區的輸出文件。
7.當所有的Map和Reduce任務都完成之后,Master喚醒用戶程序。在這個時候,在用戶程序里對MapReduce的調用才返回。
在成功完成任務之后,MapReduce的輸出存放在R個輸出文件中(對應每個Reduce任務產生一個輸出文件,文件名由用戶指定)。一般情況下,用戶不需要將這R個輸出文件合并成一個文件——它們經常把這些文件作為另外一個MapReduce的輸入,或者在另外一個可以處理多個分割文件的分布式應用中使用。
?
3.2?Master數據結構
Master持有一些數據結構,它存儲每一個Map和Reduce任務的狀態(空閑、工作中或完成),以及Worker機器(非空閑任務的機器)的標識。
Master就像一個數據管道,中間文件存儲區域的位置信息通過這個管道從Map傳遞到Reduce。因此,對于每個已經完成的Map任務,Master存儲了Map任務產生的R個中間文件存儲區域的大小和位置。當Map任務完成時,Master接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工作的Reduce任務。
?
3.3?容錯
因為MapReduce庫的設計初衷是使用由成百上千的機器組成的集群來處理超大規模的數據,所以,這個庫必須要能很好的處理機器故障。
Worker故障
Master周期性的ping每個worker。如果在一個約定的時間范圍內沒有收到worker返回的信息,Master將把這個worker標記為失效。所有由這個失效的worker完成的Map任務被重置為初始的空閑狀態,之后這些任務就可以被安排給其它的worker。同樣的,worker失效時正在運行的Map或Reduce任務也將被重新置為空閑狀態,等待重新調度。
當worker故障時,由于已經完成的Map任務的輸出存儲在本地,Map任務的輸出已不可訪問了,因此必須重新執行。而已經完成的Reduce任務的輸出存儲在全局文件系統上,因此不需要再次執行。
當一個Map任務首先被worker A執行,之后由于worker A失效了又被調度到worker B執行,這個"重新執行"的動作會被通知給所有執行Reduce任務的worker。任何還沒有從worker A讀取數據的Reduce任務將從worker B讀取數據。
MapReduce可以處理大規模worker失效的情況。比如,在一個MapReduce操作執行期間,在正在運行的集群上進行網絡維護引起80臺機器在幾分鐘內不可訪問了,MapReduce Master只需要簡單的再次執行那些不可訪問的worker完成的工作,之后繼續執行未完成的任務,直到最終完成這個MapReduce操作。
?
Master失敗
一個簡單的解決辦法是讓Master周期性的將上面描述的數據結構的寫入磁盤,即檢查點(checkpoint)。如果這個Master任務失效了,可以從最后一個檢查點(checkpoint)開始啟動另一個Master進程。然而,由于只有一個Master進程,Master失效后再恢復是比較麻煩的,因此我們現在的實現是如果Master失效,就中止MapReduce運算。客戶可以檢查到這個狀態,并且可以根據需要重新執行MapReduce操作。
?
在失效方面的處理機制
當用戶提供的Map和Reduce操作是輸入確定性函數(即相同的輸入產生相同的輸出)時,我們的分布式實現在任何情況下的輸出都和所有程序沒有出現任何錯誤、順序的執行產生的輸出是一樣的。
我們依賴對Map和Reduce任務的輸出是原子提交的來完成這個特性。每個工作中的任務把它的輸出寫到私有的臨時文件中。每個Reduce任務生成一個這樣的文件,而每個Map任務則生成R個這樣的文件(一個Reduce任務對應一個文件)。當一個Map任務完成的時,worker發送一個包含R個臨時文件名的完成消息給Master。如果Master從一個已經完成的Map任務再次接收到到一個完成消息,Master將忽略這個消息;否則,Master將這R個文件的名字記錄在數據結構里。
當Reduce任務完成時,Reduce worker進程以原子的方式把臨時文件重命名為最終的輸出文件。如果同一個Reduce任務在多臺機器上執行,針對同一個最終的輸出文件將有多個重命名操作執行。我們依賴底層文件系統提供的重命名操作的原子性來保證最終的文件系統狀態僅僅包含一個Reduce任務產生的數據。
使用MapReduce模型的程序員可以很容易的理解它們程序的行為,因為我們絕大多數的Map和Reduce操作是確定的,而且存在這樣的一個事實:我們的失效處理機制等價于一個順序的執行的操作。當Map和/或Reduce操作是不確定性的時候,我們提供雖然較弱但是依然合理的處理機制。當使用非確定操作的時候,一個Reduce任務R1的輸出等價于一個非確定性程序的順序執行產生的輸出(In the presence of non-deterministic operators, the output of a particular reduce task R1 is equivalent to the output for R1 produced by a sequential execution of the non-deterministic program.)。但是,另一個不同的Reduce任務R2的輸出也許符合一個非確定程序的非順序執行產生的R2的輸出(However, the output for a different reduce task R2 may correspond to the output for R2 produced by a different sequential execution of the non-deterministic program.)。
考慮Map任務M和Reduce任務R1、R2的情況。我們設定e(Ri)是Ri已經提交的執行過程(有且僅有一個這樣的執行過程)。語義較弱是因為e(R1)可能讀取了M的一次執行產生的輸出,e(R2)可能讀取了M的另一次執行產生的輸出。
?
3.4?存儲位置
在我們的計算運行環境中,網絡帶寬是一個相當匱乏的資源。我們通過盡量把輸入數據(由GFS管理)存儲在集群中機器的本地磁盤上來節省網絡帶寬。GFS把每個文件按64MB一個Block分割,每個Block保存在多臺機器上,環境中存放了多份拷貝(一般是3個拷貝)。MapReduce的Master在調度Map任務時會考慮輸入文件的位置信息,盡量將一個Map任務調度在包含相關輸入數據拷貝的機器上執行;如果上述努力失敗了,Master將嘗試在保存有輸入數據拷貝的機器附近的機器上執行Map任務(例如,分配到一個和包含輸入數據的機器在一個switch里的worker機器上執行)。當在一個足夠大的cluster集群上運行大型MapReduce操作的時候,大部分的輸入數據都能從本地機器讀取,因此消耗非常少的網絡帶寬。
?
3.5?任務粒度
我們將Map階段細分為M個片段,Reduce階段細分為R個片段,如上所述。理想情況下,M和R應該遠大于worker機器的數量。讓每個worker執行許多不同任務改善了動態負載平衡,并在worker失敗時加快了恢復速度:它完成的許多映射任務可以分布在所有其它worker機器上。
在我們的實現中,M和R的大小有實際的界限,因為Master必須做出O(M+R)調度決策,并在內存中保持O(M*R)狀態,如上文所述。(但是,內存使用的常量因素很小:狀態的O(M*R)部分由每個映射任務/減少任務對大約1個字節的數據組成。)更進一步,R值通常是由用戶指定的,因為每個Reduce任務最終都會生成一個獨立的輸出文件。實際使用時我們也傾向于選擇合適的M值,以使得每一個獨立任務都是處理大約16M到64M的輸入數據(這樣,上面描寫的輸入數據本地存儲優化策略才最有效),另外,我們把R值設置為我們想使用的worker機器數量的幾倍(small multiple)。我們通常會用這樣的比例來執行MapReduce:M=200000,R=5000,使用2000臺worker機器。
?
3.6?備用任務(Backup Tasks)
影響一個MapReduce的總執行時間最通常的因素是"落伍者":在運算過程中,如果有一臺機器花了很長的時間才完成最后幾個Map或Reduce任務,導致MapReduce操作總的執行時間超過預期。出現"落伍者"的原因非常多。比如:如果一個機器的硬盤出了問題,在讀取的時候要經常的進行讀取糾錯操作,導致讀取數據的速度從30M/s降低到1M/s。如果cluster的調度系統在這臺機器上又調度了其它的任務,由于CPU、內存、本地硬盤和網絡帶寬等競爭因素的存在,導致執行MapReduce代碼的執行效率更加緩慢。我們最近遇到的一個問題是由于機器的初始化代碼有bug,導致關閉了的處理器的緩存:在這些機器上執行任務的性能和正常情況相差上百倍。
我們有一個通用的機制來減少"落伍者"出現的情況。當一個MapReduce操作接近完成的時候,Master調度備用(backup)任務進程來執行剩下的、處于運行中狀態(in-progress)的任務。無論是最初的執行進程、還是備用(backup)任務進程完成了任務,我們都把這個任務標記成為已經完成。我們調優了這個機制,通常只會占用比正常操作多幾個百分點的計算資源。我們發現采用這樣的機制對于減少超大MapReduce操作的總處理時間效果顯著。例如,在5.3節描述的排序任務,在關閉掉備用任務的情況下要多花44%的時間完成排序任務。
?
4?技巧
雖然簡單的Map和Reduce函數提供的基本功能已經能夠滿足大部分的計算需要,我們還是發掘出了一些有價值的擴展功能。本節將描述這些擴展功能。
?
4.1?分區函數??
MapReduce的使用者通常會指定Reduce任務和Reduce任務輸出文件的數量(R)。我們在中間key上使用分區函數來對數據進行分區,之后再輸入到后續任務執行進程。一個缺省的分區函數是使用hash方法(比如,hash(key) mod R)進行分區。hash方法能產生非常平衡的分區。然而,有的時候,其它的一些分區函數對key值進行的分區將非常有用。比如,輸出的key值是URLs,我們希望每個主機的所有條目保持在同一個輸出文件中。為了支持類似的情況,MapReduce庫的使用者需要提供專門的分區函數。例如,使用"hash(Hostname(urlkey)) mod R"作為分區函數就可以把所有來自同一個主機的URLs保存在同一個輸出文件中。
?
4.2?順序保證
我們確保在給定的分區中,中間key/value pair數據的處理順序是按照key值增量順序處理的。這樣的順序保證對每個分區生成一個有序的輸出文件,這對于需要對輸出文件按key值隨機存取的應用非常有意義,對在排序輸出的數據集也很有幫助。
?
4.3?Combiner函數
在某些情況下,Map函數產生的中間key值的重復數據會占很大的比重,并且,用戶自定義的Reduce函數滿足結合律和交換律。在2.1節的詞數統計程序是個很好的例子。由于詞頻率傾向于一個zipf分布(齊夫分布),每個Map任務將產生成千上萬個這樣的記錄<the,1>。所有的這些記錄將通過網絡被發送到一個單獨的Reduce任務,然后由這個Reduce任務把所有這些記錄累加起來產生一個數字。我們允許用戶指定一個可選的Combiner函數,Combiner函數首先在本地將這些記錄進行一次合并,然后將合并的結果再通過網絡發送出去。
Combiner函數在每臺執行Map任務的機器上都會被執行一次。一般情況下,Combiner和Reduce函數是一樣的。Combiner函數和Reduce函數之間唯一的區別是MapReduce庫怎樣控制函數的輸出。Reduce函數的輸出被保存在最終的輸出文件里,而Combiner函數的輸出被寫到中間文件里,然后被發送給Reduce任務。
合并中間結果可以顯著的提高一些MapReduce操作的速度。附錄A包含一個使用Combiner函數的例子。
?
4.4?輸入和輸出的類型
MapReduce庫支持幾種不同格式的輸入數據。比如,文本模式的輸入數據的每一行被視為是一個key/value pair。key是文件的偏移量,value是那一行的內容。另外一種常見的格式是以key進行排序來存儲的key/value pair的序列。每種輸入類型的實現都必須能夠把輸入數據分割成數據片段,該數據片段能夠由單獨的Map任務來進行后續處理(例如,文本模式的范圍分割必須確保僅僅在每行的邊界進行范圍分割)。雖然大多數MapReduce的使用者僅僅使用很少的預定義輸入類型就滿足要求了,但是使用者依然可以通過提供一個簡單的Reader接口就能夠支持一個新的輸入類型。
Reader并非一定要從文件中讀取數據,比如,我們可以很容易的實現一個從數據庫里讀記錄的Reader,或者從內存中的數據結構讀取數據的Reader。
類似的,我們提供了一些預定義的輸出數據類型,通過這些預定義類型能夠產生不同格式的數據。用戶采用類似添加新的輸入數據類型的方式增加新的輸出類型。
?
4.5?副作用
在某些情況下,MapReduce的使用者發現,如果在Map和/或Reduce操作(map and/or reduce operators.)過程中增加輔助的輸出文件會比較省事。我們依靠程序writer把這種"副作用"變成原子的以及冪等的。通常應用程序首先把輸出結果寫到一個臨時文件中,在輸出全部數據之后,在使用系統級的原子操作rename重新命名這個臨時文件。
我們不支持由單個任務生成的多個輸出文件的原子兩階段提交,因此,生成具有跨文件一致性要求的多個輸出文件的任務應該是確定性的。這種限制在實踐中從未成為一個問題。
?
4.6?跳過損壞的記錄
有時候,用戶程序中的bug導致Map或者Reduce函數在處理某些記錄的時候crash掉,MapReduce操作無法順利完成。慣常的做法是修復bug后再次執行MapReduce操作,但是,有時候找出這些bug并修復它們不是一件容易的事情;這些bug也許是在第三方庫里,而我們手頭沒有這些庫的源代碼。而且在很多時候,忽略一些有問題的記錄也是可以接受的,比如在一個巨大的數據集上進行統計分析的時候。我們提供了一種可選的執行模式,其中MapReduce庫檢測哪些記錄會導致確定性crash,并跳過這些記錄以向前推進。
每個worker進程都設置了信號處理函數捕獲內存段異常(segmentation violation)和總線錯誤(bus?error)。在執行Map或者Reduce操作之前,MapReduce庫通過全局變量保存記錄序號。如果用戶程序觸發了一個系統信號,消息處理函數將用"最后一口氣"通過UDP包向Master發送處理的最后一條記錄的序號。當Master看到在處理某條特定記錄不止失敗一次時,Master就標志這條記錄需要被跳過,并且在下次重新執行相關的Map或者Reduce任務的時候跳過這條記錄。
?
4.7?本地執行
調試Map和Reduce函數的bug是非常困難的,因為實際執行操作時不但是分布在系統中執行的,而且通常是在好幾千臺計算機上執行,具體的執行位置是由Master進行動態調度的,這又大大增加了調試的難度。為了簡化調試、性能分析和小規模測試,我們開發了一套MapReduce庫的本地實現版本,通過使用本地版本的MapReduce庫,MapReduce操作在本地計算機上順序執行。用戶可以控制MapReduce操作的執行,可以把操作限制到特定的Map任務上。用戶通過設定特別的標志來在本地執行它們的程序,之后就可以很容易的使用本地調試和測試工具(比如gdb)。
?
4.8?狀態信息
主服務器運行內部HTTP服務器并導出一組狀態頁供人使用。狀態頁顯示計算的進度,例如已完成的任務數、正在進行的任務數、輸入字節數、中間數據字節數、輸出字節數、處理速率等。這些頁還包含指向每個任務生成的標準錯誤和標準輸出文件的鏈接。用戶可以使用這些數據來預測計算將花費多長時間,以及是否應向計算中添加更多的資源。這些頁面還可用于確定計算速度比預期慢得多的時間。
此外,頂級狀態頁面顯示哪些worker失敗,以及哪些失效的時候正在運行的Map和Reduce任務。當試圖調試用戶代碼中的錯誤時,此信息非常有用。
?
4.9?計數器
MapReduce庫使用計數器統計不同事件發生次數。比如,用戶可能想統計已經處理了多少個單詞、已經索引的多少篇German文檔等等。
為了使用這個特性,用戶在程序中創建一個命名的計數器對象,在Map和Reduce函數中相應的增加計數器的值。例如:
Counter* uppercase; uppercase = GetCounter("uppercase");map(String name, String contents):for each word w in contents: if (IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w, "1");這些計數器的值周期性的從各個單獨的worker機器上傳遞給Master(附加在ping的應答包中傳遞)。Master把執行成功的Map和Reduce任務的計數器值進行累計,當MapReduce操作完成之后,返回給用戶代碼。
計數器當前的值也會顯示在Master的狀態頁面上,這樣用戶就可以看到當前計算的進度。當累加計數器的值的時候,Master要檢查重復運行的Map或者Reduce任務,避免重復累加(之前提到的備用任務和失效后重新執行任務這兩種情況會導致相同的任務被多次執行)。有些計數器的值是由MapReduce庫自動維持的,比如已經處理的輸入的key/value pair的數量、輸出的key/value pair的數量等等。
計數器機制對于MapReduce操作的完整性檢查非常有用。比如,在某些MapReduce操作中,用戶需要確保產生的output pairs精確的等于處理的input pairs,或者處理的German文檔數量在被處理的整個文檔數量中屬于合理范圍。
?
5?性能
本節我們分析在一個大型集群上運行的兩個計算來衡量MapReduce的性能。一個計算是在大約1TB的數據中進行特定的模式匹配,另一個計算對大約1TB的數據進行排序。
這兩個程序在大量使用的MapReduce應用中是非常具有代表性的——一類是對數據格式進行轉換,從一種表現形式轉換為另外一種表現形式;另一類是從海量數據中抽取少部分用戶感興趣的數據。
?
5.1?集群配置
所有這些程序都運行在一個由大約1800臺機器構成的集群上。每臺機器配置2個2G主頻、支持超線程的Intel Xeon處理器,4GB的物理內存,兩個160GB的IDE硬盤和一個千兆以太網卡。這些機器部署在一個兩層的樹形交換網絡中,在root節點大概有100-200GBPS的傳輸帶寬。所有這些機器都采用相同的部署(對等部署),因此任意兩點之間的網絡往返時延小于1毫秒。
在4GB內存里,大概有1-1.5G用于運行集群上的其它任務。測試程序在周末下午開始執行,這時主機的CPU、磁盤和網絡基本上處于空閑狀態。
?
5.2?Grep(一種強大的文本搜索工具)
這個分布式的Grep程序需要掃描大概10的10次方個由100個字節組成的記錄,查找出現概率較小的3個字符的模式(這個模式在92337個記錄中出現)。輸入數據被拆分成大約64M的Block(M=15000),整個輸出數據存放在一個文件中(R=1)。
圖2顯示了這個運算隨時間的處理過程。其中Y軸表示輸入數據的處理速度。處理速度隨著參與MapReduce計算的機器數量的增加而增加,當1764臺worker參與計算的時,處理速度達到了30GB/s。當Map任務結束的時候,即在計算開始后80秒,輸入的處理速度降到0。整個計算過程從開始到結束一共花了大概150秒。這包括了大約一分鐘的初始啟動階段。初始啟動階段消耗的時間包括了把這個程序傳送到各個worker機器上的時間、等待GFS文件系統打開1000個輸入文件集合的時間、獲取相關的文件本地位置優化信息的時間。
?
5.3?排序
排序程序處理10的10次方個100個字節組成的記錄(大約1TB的數據)。這個程序模仿TeraSort?benchmark。
排序程序由不到50行代碼組成。只有三行的Map函數從文本行中解析出10個字節的key值作為排序的key,并且把這個key和原始文本行作為中間的key/value pair值輸出。我們使用了一個內置的恒等函數(Identity function)作為Reduce操作函數,這個函數把中間key/value pair不作任何改變輸出。最終排序的輸出被寫入一組雙路復制的GFS文件中(也就是說,程序輸出2TB的數據)。
如前所述,輸入數據被分成64MB的Block(M=15000)。我們把排序后的輸出結果分區后存儲到4000個文件(R=4000)。分區函數使用key的原始字節來把數據分區到R個片段中。
在這個benchmark測試中,我們使用的分區函數知道key的分區情況。通常對于排序程序來說,我們會增加一個預處理的MapReduce操作用于采樣key值的分布情況,通過采樣的數據來計算對最終排序處理的分區點。(從上到下分別是Input、Shuffle和Output)
圖三(a)顯示了這個排序程序的正常執行過程。左上的圖顯示了Input數據讀取的速度。數據讀取速度峰值會達到13GB/s,并且所有Map任務在大約200秒之前完成,然后迅速滑落至0。值得注意的是,排序程序輸入數據讀取速度小于分布式Grep程序。這是因為排序程序的Map任務花了大約一半的處理時間和I/O帶寬把中間輸出結果寫到本地硬盤。相應的分布式Grep程序的中間結果輸出幾乎可以忽略不計。
左邊中間的圖顯示了中間數據從Map任務發送到Reduce任務的網絡速度。這個過程從第一個Map任務完成之后就開始緩慢啟動(shuffling)了。圖示的第一個高峰是啟動了第一批約1700個Reduce任務(整個MapReduce分布到大概1700臺機器上,每臺機器1次最多執行1個Reduce任務)。排序程序運行大約300秒后,第一批啟動的Reduce任務只有部分完成,我們開始執行剩下的Reduce任務。所有的處理在大約600秒后結束。
左下圖表示Reduce任務把排序后的數據寫到最終的輸出文件的速度。在第一個shuffling階段結束和數據開始寫入磁盤之間有延時,這是因為worker機器正在忙于排序中間數據。磁盤寫入速度在2-4GB/s持續一段時間。輸出數據寫入磁盤大約持續850秒。計入初始啟動部分的時間,整個運算消耗了891秒。這個速度和TeraSort benchmark的最高紀錄1057秒相差不多。
需要注意的一點是:由于我們的本地化輸入數據優化策略,Input速率高于Shuffle速率和Output速率——大多數數據是從本地磁盤讀取的,并繞過我們相對受限的網絡帶寬。Shuffle速率高于Output速率,因為輸出階段會寫入兩份已排序數據的副本(我們寫兩份輸出的副本的原因是為了可靠性和可用性)。我們編寫兩個副本,因為這是底層文件系統提供的可靠性和可用性機制。如果底層文件系統使用糾刪碼(erasure coding)而不是復制,則寫入數據的網絡帶寬需求將減少。
?
5.4?高效的backup任務
圖三(b)展示關閉備用任務后排序程序執行情況。執行的過程和圖3(a)很相似,除了有一條非常長的尾巴,幾乎沒有任何寫操作發生。在960秒后,只有5個Reduce任務沒有完成。這些拖后腿的任務又執行了300秒才完成。整個計算消耗了1283秒,多了44%的執行時間。
?
5.5?失效的機器
在圖三(c)演示在排序程序執行的過程中,我們在程序開始后幾分鐘有意的kill了1746個worker中的200個。集群底層的調度立刻在這些機器上重新開始新的worker處理進程(因為只是worker機器上的處理進程被kill了,機器本身還在工作)。
圖三(c)顯示出了一個"負"的輸入數據讀取速度,這是因為一些已經完成的Map任務丟失了(由于相應的執行Map任務的worker進程被kill了),需要重新執行這些任務。相關Map任務很快就被重新執行了。整個運算在933秒內完成,包括了初始啟動時間(只比正常執行多消耗了5%的時間)。
?
6?經驗
我們在2003年1月完成了第一個版本的MapReduce庫,2003年8月的版本有了顯著的增強,這包括了輸入數據本地優化、worker機器之間的動態負載均衡等等。從那以后,我們驚喜的發現,MapReduce庫能廣泛應用于我們日常工作中遇到的各類問題。它現在在Google內部各個領域得到廣泛應用,包括:
圖四顯示了在我們的源代碼管理系統中,隨著時間推移,獨立的MapReduce程序數量的顯著增加。從2003年早些時候的0個增長到2004年9月份的差不多900個不同的程序。MapReduce的成功取決于采用MapReduce庫能夠在不到半個小時內寫出一個簡單的程序,這個簡單的程序能夠在上千臺機器組成的集群上做大規模并行處理,這極大的加快了開發和原形設計(prototyping cycle)的周期。另外,采用MapReduce庫,可以讓完全沒有分布式和/或并行系統開發經驗的程序員很容易的利用大量的資源,開發出分布式和/或并行處理的應用。
在每個任務結束的時候,MapReduce庫統計計算資源的使用狀況。在表1,我們列出了2004年8月份MapReduce運行的任務所占用的相關資源。
?
6.1?大規模索引
到目前為止,MapReduce最成功的應用就是重寫了Google網絡搜索服務所使用到的index系統。索引系統的輸入數據是網絡爬蟲抓取回來的海量的文檔,這些文檔數據都保存在GFS文件系統里。這些文檔的原始內容的大小超過了20TB。索引程序是通過一系列的MapReduce操作(大約5到10次)來建立索引。使用MapReduce(替換上一個特別設計的、分布式處理的索引程序)帶來這些好處:
?
7?相關工作
很多系統都提供了嚴格的編程模式,并且通過對編程的嚴格限制來實現并行計算。例如,一個結合函數可以通過把N個元素的數組的前綴在N個處理器上使用并行前綴算法,在log N的時間內計算完。根據我們對大型實際環境計算的經驗,可以將MapReduce視為對其中一些模型的簡化和提煉。(MapReduce can be considered a simplification and distillation of some of these models based on our experience with large real-world computations.)。更加值得驕傲的是,我們還實現了基于上千臺處理器的集群的容錯處理。相比而言,大部分并發處理系統都只在小規模的集群上實現,并且把容錯處理交給了程序員。
Bulk Synchronous Programming和一些MPI原語提供了更高級別的并行處理抽象,可以更容易的寫出并行處理的程序。MapReduce和這些系統的主要不同之處在于,MapReduce利用限制性編程模式實現了用戶程序的自動并發處理,并且提供了透明的容錯處理。
我們數據本地優化策略的靈感來源于active disks等技術,在active disks中,計算任務是盡量推送到數據所存儲的節點處理,這樣就減少了網絡和IO子系統的吞吐量。我們在掛載幾個硬盤的普通機器上執行我們的運算,而不是在磁盤處理器上執行我們的工作,但是達到的目的一樣的。
我們的備用任務機制和Charlotte System提出的eager調度機制比較類似。Eager調度機制的一個缺點是如果一個任務反復失效,那么整個計算就不能完成。我們通過忽略引起故障的記錄的方式在某種程度上解決了這個問題。
MapReduce的實現依賴于一個內部的集群管理系統,這個集群管理系統負責在一個超大的、共享機器的集群上分布和運行用戶任務。雖然這個不是本論文的重點,但是有必要提一下,這個集群管理系統在理念上和其它系統,如Condor是一樣。
MapReduce庫的排序機制和NOW-Sort的操作很類似。讀取輸入源的機器(map workers)把待排序的數據進行分區后,發送到R個Reduce worker中的一個進行處理。每個Reduce worker在本地對數據進行排序(盡可能在內存中排序)。當然,NOW-Sort沒有給用戶自定義的Map和Reduce函數的機會,因此不具備MapReduce庫廣泛的實用性。River提供了一個編程模型:處理進程通過分布式隊列傳送數據的方式進行互相通訊。和MapReduce類似,River系統嘗試在不對等的硬件環境下,或者在系統顛簸的情況下也能提供近似平均的性能。River是通過精心調度硬盤和網絡的通訊來平衡任務的完成時間。MapReduce庫采用了其它的方法。通過對編程模型進行限制,MapReduce框架把問題分解成為大量的"小"任務。這些任務在可用的worker集群上動態的調度,這樣更快的worker就可以執行更多的任務。通過對編程模型進行限制,我們可在工作接近完成的時候調度備用任務,在硬件配置不均衡的情況下減少完成的時間(比如有的worker慢、或者worker被某些操作阻塞了)。
BAD-FS采用了和MapReduce完全不同的編程模式,它是面向廣域網的。不過,這兩個系統有兩個基礎功能很類似。(1)兩個系統采用重新執行的方式來防止由于失效導致的數據丟失。(2)兩個都使用數據本地化調度策略,減少網絡通訊的數據量。
TACC是一個高可用網絡服務的簡化構造系統。和MapReduce一樣,它也依靠重新執行機制來實現容錯處理。
?
8?結束語
MapReduce編程模型在Google內部成功應用于多個領域。我們把這種成功歸結為幾個方面:首先,這種編程模式使得MapReduce庫易于使用,即便對于完全沒有并行或者分布式系統開發經驗的程序員而言也很容易。這是因為MapReduce封裝了并行處理、容錯處理、數據本地化優化、負載均衡等等技術難點的細節;其次,大量不同類型的問題都可以通過MapReduce簡單的解決。比如,MapReduce用于生成Google的網絡搜索服務所需要的數據、用來排序、用來數據挖掘、用于機器學習,以及很多其它的系統;第三,我們實現了一個在數千臺計算機組成的大型集群上靈活部署并運行的MapReduce。這個實現使得有效利用這些豐富的計算資源變得非常簡單,因此也適合用于解決Google遇到的其它很多需要大量計算的問題。
我們也從MapReduce開發過程中學到了不少東西。首先,限制性編程模式使得并行和分布式計算非常容易,也易于構造容錯的計算環境;其次,網絡帶寬是稀有資源。大量的系統優化以針對減少網絡傳輸量為目的:本地優化策略使大量的數據從本地磁盤讀取,中間文件寫入本地磁盤、并且只寫一份中間文件也節約了網絡帶寬;第三,多次執行相同的任務可以減少性能緩慢的機器帶來的負面影響,同時解決了由于機器失效導致的數據丟失問題。
?
附錄A、單詞頻率統計
本節包含了一個完整的程序,用于統計在一組命令行指定的輸入文件中,每一個不同的單詞出現頻率。
#include "mapreduce/mapreduce.h" // User’s map function class WordCounter : public Mapper { public: virtual void Map(const MapInput& input) { const string& text = input.value(); const int n = text.size();for (int i = 0; i < n; ) { // Skip past leading whitespace while((i < n) && isspace(text[i]))i++; // Find word end int start = i;while ((i < n) && !isspace(text[i]))i++; if (start < i) Emit(text.substr(start,i-start),"1");}} };REGISTER_MAPPER(WordCounter); // User’s reduce function class Adder : public Reducer {virtual void Reduce(ReduceInput* input) { // Iterate over all entries with the// same key and add the values int64 value = 0; while (!input->done()) { value += StringToInt(input->value()); input->NextValue(); } // Emit sum for input->key()Emit(IntToString(value)); } };REGISTER_REDUCER(Adder); int main(int argc, char** argv) {ParseCommandLineFlags(argc, argv);MapReduceSpecification spec; // Store list of input fifiles into "spec" for (int i = 1; i < argc; i++) {MapReduceInput* input = spec.add_input();input->set_format("text"); input->set_filepattern(argv[i]); input->set_mapper_class("WordCounter");} // Specify the output files: // /gfs/test/freq-00000-of-00100 // /gfs/test/freq-00001-of-00100 // … MapReduceOutput* out = spec.output(); out->set_fifilebase("/gfs/test/freq"); out->set_num_tasks(100); out->set_format("text"); out->set_reducer_class("Adder"); // Optional: do partial sums within map // tasks to save network bandwidth out->set_combiner_class("Adder"); //Tuning parameters: use at most 2000 //machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100);//Now run it MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // Done: ‘result’ structure contains info // about counters, time taken, number of // machines used, etc. return 0; }?
總結
以上是生活随笔為你收集整理的MapReduce 论文翻译的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Bigtable 论文翻译
- 下一篇: 安装Mongodb并解决用户授权问题