开启大数据时代谷歌三篇论文-Mapreduce
摘要
MapReduce 是一個編程模型,也是一個處理和生成超大數據集的算法模型的相關實現。用戶首先創建一 個 Map 函數處理一個基于 key/value pair 的數據集合,輸出中間的基于 key/value pair 的數據集合;然后再創建 一個 Reduce 函數用來合并所有的具有相同中間 key 值的中間 value 值。現實世界中有很多滿足上述處理模型 的例子,本論文將詳細描述這個模型。
MapReduce 架構的程序能夠在大量的普通配置的計算機上實現并行化處理。這個系統在運行時只關心: 如何分割輸入數據,在大量計算機組成的集群上的調度,集群中計算機的錯誤處理,管理集群中計算機之間 必要的通信。采用 MapReduce 架構可以使那些沒有并行計算和分布式處理系統開發經驗的程序員有效利用分 布式系統的豐富資源。
我們的 MapReduce 實現運行在規模可以靈活調整的由普通機器組成的集群上:一個典型的 MapReduce 計算往往由幾千臺機器組成、處理以 TB 計算的數據。程序員發現這個系統非常好用:已經實現了數以百計 的 MapReduce 程序,在 Google 的集群上,每天都有 1000 多個 MapReduce 程序在執行。
1 介紹
在過去的 5 年里,包括本文作者在內的 Google 的很多程序員,為了處理海量的原始數據,已經實現了數 以百計的、專用的計算方法。這些計算方法用來處理大量的原始數據,比如,文檔抓取(類似網絡爬蟲的程 序)、Web 請求日志等等;也為了計算處理各種類型的衍生數據,比如倒排索引、Web 文檔的圖結構的各種表 示形勢、每臺主機上網絡爬蟲抓取的頁面數量的匯總、每天被請求的最多的查詢的集合等等。大多數這樣的 數據處理運算在概念上很容易理解。然而由于輸入的數據量巨大,因此要想在可接受的時間內完成運算,只 有將這些計算分布在成百上千的主機上。如何處理并行計算、如何分發數據、如何處理錯誤?所有這些問題 綜合在一起,需要大量的代碼處理,因此也使得原本簡單的運算變得難以處理。
為了解決上述復雜的問題,我們設計一個新的抽象模型,使用這個抽象模型,我們只要表述我們想要執 行的簡單運算即可,而不必關心并行計算、容錯、數據分布、負載均衡等復雜的細節,這些問題都被封裝在 了一個庫里面。設計這個抽象模型的靈感來自 Lisp 和許多其他函數式語言的 Map 和 Reduce 的原語。我們意 識到我們大多數的運算都包含這樣的操作:在輸入數據的“邏輯”記錄上應用 Map 操作得出一個中間 key/value pair 集合,然后在所有具有相同 key 值的 value 值上應用 Reduce 操作,從而達到合并中間的數據,得到一個
想要的結果的目的。使用 MapReduce 模型,再結合用戶實現的 Map 和 Reduce 函數,我們就可以非常容易的 實現大規模并行化計算;通過 MapReduce 模型自帶的“再次執行”(re-execution)功能,也提供了初級的容
災實現方案。
這個工作(實現一個 MapReduce 框架模型)的主要貢獻是通過簡單的接口來實現自動的并行化和大規模
的分布式計算,通過使用 MapReduce 模型接口實現在大量普通的 PC 機上高性能計算。 第二部分描述基本的編程模型和一些使用案例。 第三部分描述了一個經過裁剪的、適合我們的基于集群的計算環境的 MapReduce 實現。 第四部分描述我們認為在 MapReduce 編程模型中一些實用的技巧。 第五部分對于各種不同的任務,測量我們 MapReduce 實現的性能。
第六部分揭示了在 Google 內部如何使用 MapReduce 作為基礎重寫我們的索引系統產品,包括其它一些 使用 MapReduce 的經驗。 第七部分討論相關的和未來的工作。
2 編程模型
MapReduce 編程模型的原理是:利用一個輸入 key/value pair 集合來產生一個輸出的 key/value pair 集合。 MapReduce 庫的用戶用兩個函數表達這個計算:Map 和 Reduce。
用戶自定義的 Map 函數接受一個輸入的 key/value pair 值,然后產生一個中間 key/value pair 值的集合。 MapReduce 庫把所有具有相同中間 key 值 I 的中間 value 值集合在一起后傳遞給 reduce 函數。
用戶自定義的 Reduce 函數接受一個中間 key 的值 I 和相關的一個 value 值的集合。Reduce 函數合并這些 value 值,形成一個較小的 value 值的集合。一般的,每次 Reduce 函數調用只產生 0 或 1 個輸出 value 值。通 常我們通過一個迭代器把中間 value 值提供給 Reduce 函數,這樣我們就可以處理無法全部放入內存中的大量 的 value 值的集合。
2.1 例子
例如,計算一個大的文檔集合中每個單詞出現的次數,下面是偽代碼段:
map(String key,String value):
? ?// key:document name
? // value: document contents
? ?for each word w in value:
? ? ? EmitIntermediate(w,"1")
reduce(String key,Iterator values):
? ?// key : a word
? ?// values: a list of counts
? ?int result = 0;
? ?for each v in values:
? ? ? ? ?result += ParseInt(v);
? ?Emit(AsString(result));
Map 函數輸出文檔中的每個詞、以及這個詞的出現次數(在這個簡單的例子里就是 1)。Reduce 函數把 Map 函數產生的每一個特定的詞的計數累加起來。
另外,用戶編寫代碼,使用輸入和輸出文件的名字、可選的調節參數來完成一個符合 MapReduce 模型規 范的對象,然后調用 MapReduce 函數,并把這個規范對象傳遞給它。用戶的代碼和 MapReduce 庫鏈接在一起 (用 C++實現)。附錄 A 包含了這個實例的全部程序代碼。
2.2 類型
盡管在前面例子的偽代碼中使用了以字符串表示的輸入輸出值,但是在概念上,用戶定義的 Map 和 Reduce
函數都有相關聯的類型:
map(k1,v1) ->list(k2,v2) reduce(k2,list(v2)) ->list(v2)
比如,輸入的 key 和 value 值與輸出的 key 和 value 值在類型上推導的域不同。此外,中間 key 和 value 值與輸出 key 和 value 值在類型上推導的域相同。2
我們的 C++中使用字符串類型作為用戶自定義函數的輸入輸出,用戶在自己的代碼中對字符串進行適當 的類型轉換。
2.3 更多的例子
這里還有一些有趣的簡單例子,可以很容易的使用 MapReduce 模型來表示:分布式的 Grep:Map 函數輸出匹配某個模式的一行,Reduce 函數是一個恒等函數,即把中間數據復制到 輸出。計算 URL 訪問頻率:Map 函數處理日志中 web 頁面請求的記錄,然后輸出(URL,1)。Reduce 函數把相同URL 的 value 值都累加起來,產生(URL,記錄總數)結果。倒轉網絡鏈接圖:Map 函數在源頁面(source)中搜索所有的鏈接目標(target)并輸出為(target,source)。Reduce 函數把給定鏈接目標(target)的鏈接組合成一個列表,輸出(target,list(source))。 每個主機的檢索詞向量:檢索詞向量用一個(詞,頻率)列表來概述出現在文檔或文檔集中的最重要的一些 詞。Map 函數為每一個輸入文檔輸出(主機名,檢索詞向量),其中主機名來自文檔的 URL。Reduce 函數接收給 定主機的所有文檔的檢索詞向量,并把這些檢索詞向量加在一起,丟棄掉低頻的檢索詞,輸出一個最終的(主機名,檢索詞向量)。
倒排索引:Map 函數分析每個文檔輸出一個(詞,文檔號)的列表,Reduce 函數的輸入是一個給定詞的所有(詞,文檔號),排序所有的文檔號,輸出(詞,list(文檔號))。所有的輸出集合形成一個簡單的倒排索引,它 以一種簡單的算法跟蹤詞在文檔中的位置。
分布式排序:Map 函數從每個記錄提取 key,輸出(key,record)。Reduce 函數不改變任何的值。這個運算 依賴分區機制(在 4.1 描述)和排序屬性(在 4.2 描述)。
3 實現
MapReduce 模型可以有多種不同的實現方式。如何正確選擇取決于具體的環境。例如,一種實現方式適 用于小型的共享內存方式的機器,另外一種實現方式則適用于大型 NUMA 架構的多處理器的主機,而有的實 現方式更適合大型的網絡連接集群。
本章節描述一個適用于 Google 內部廣泛使用的運算環境的實現:用以太網交換機連接、由普通 PC 機組 成的大型集群。在我們的環境里包括:
x86 架構、運行 Linux 操作系統、雙處理器、2-4GB 內存的機器。
普通的網絡硬件設備,每個機器的帶寬為百兆或者千兆,但是遠小于網絡的平均帶寬的一半。
集群中包含成百上千的機器,因此,機器故障是常態。
存儲為廉價的內置 IDE 硬盤。一個內部分布式文件系統用來管理存儲在這些磁盤上的數據。文件系
統通過數據復制來在不可靠的硬件上保證數據的可靠性和有效性。
用戶提交工作(job)給調度系統。每個工作(job)都包含一系列的任務(task),調度系統將這些任務調度到集群中多臺可用的機器上。
3.1 執行概括
通過將 Map 調用的輸入數據自動分割為 M 個數據片段的集合,Map 調用被分布到多臺機器上執行。輸入的數據片段能夠在不同的機器上并行處理。使用分區函數將 Map 調用產生的中間 key 值分成 R 個不同分區(例如,hash(key) mod R),Reduce 調用也被分布到多臺機器上執行。分區數量(R)和分區函數由用戶來指定。
?
? ? ? ??
圖 1 展示了我們的 MapReduce 實現中操作的全部流程。當用戶調用 MapReduce 函數時,將發生下面的一 系列動作(下面的序號和圖 1 中的序號一一對應):
用戶程序首先調用的 MapReduce 庫將輸入文件分成 M 個數據片度,每個數據片段的大小一般從 16MB 到 64MB(可以通過可選的參數來控制每個數據片段的大小)。然后用戶程序在機群中創建大量 的程序副本。
這些程序副本中的有一個特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配 任務。有 M 個 Map 任務和 R 個 Reduce 任務將被分配,master 將一個 Map 任務或 Reduce 任務分配 給一個空閑的 worker。
被分配了 map 任務的 worker 程序讀取相關的輸入數據片段,從輸入的數據片段中解析出 key/value pair,然后把 key/value pair 傳遞給用戶自定義的 Map 函數,由 Map 函數生成并輸出的中間 key/value pair,并緩存在內存中。
緩存中的 key/value pair 通過分區函數分成 R 個區域,之后周期性的寫入到本地磁盤上。緩存的 key/value pair 在本地磁盤上的存儲位置將被回傳給 master,由 master 負責把這些存儲位置再傳送給 Reduce worker。
當 Reduce worker 程序接收到 master 程序發來的數據存儲位置信息后,使用 RPC 從 Map worker 所在 主機的磁盤上讀取這些緩存數據。當 Reduce worker 讀取了所有的中間數據后,通過對 key 進行排序后使得具有相同 key 值的數據聚合在一起。由于許多不同的 key 值會映射到相同的 Reduce 任務上,因此必須進行排序。如果中間數據太大無法在內存中完成排序,那么就要在外部進行排序。
Reduce worker 程序遍歷排序后的中間數據,對于每一個唯一的中間 key 值,Reduce worker 程序將這 個 key 值和它相關的中間 value 值的集合傳遞給用戶自定義的 Reduce 函數。Reduce 函數的輸出被追 加到所屬分區的輸出文件。
當所有的 Map 和 Reduce 任務都完成之后,master 喚醒用戶程序。在這個時候,在用戶程序里的對MapReduce 調用才返回。
在成功完成任務之后,MapReduce 的輸出存放在 R 個輸出文件中(對應每個 Reduce 任務產生一個輸出文件,文件名由用戶指定)。一般情況下,用戶不需要將這 R 個輸出文件合并成一個文件–他們經常把這些文 件作為另外一個 MapReduce 的輸入,或者在另外一個可以處理多個分割文件的分布式應用中使用。
3.2 Master 數據結構
Master 持有一些數據結構,它存儲每一個 Map 和 Reduce 任務的狀態(空閑、工作中或完成),以及 Worker 機器(非空閑任務的機器)的標識。
Master 就像一個數據管道,中間文件存儲區域的位置信息通過這個管道從 Map 傳遞到 Reduce。因此, 對于每個已經完成的 Map 任務,master 存儲了 Map 任務產生的 R 個中間文件存儲區域的大小和位置。當 Map 任務完成時,Master 接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工作的 Reduce 任務。
3.3 容錯
因為 MapReduce 庫的設計初衷是使用由成百上千的機器組成的集群來處理超大規模的數據,所以,這個庫必須要能很好的處理機器故障。
3.3.1 worker 故障
master 周期性的 ping 每個 worker。如果在一個約定的時間范圍內沒有收到 worker 返回的信息,master 將 把這個 worker 標記為失效。所有由這個失效的 worker 完成的 Map 任務被重設為初始的空閑狀態,之后這些 任務就可以被安排給其他的 worker。同樣的,worker 失效時正在運行的 Map 或 Reduce 任務也將被重新置為 空閑狀態,等待重新調度。
當 worker 故障時,由于已經完成的 Map 任務的輸出存儲在這臺機器上,Map 任務的輸出已不可訪問了,
因此必須重新執行。而已經完成的 Reduce 任務的輸出存儲在全局文件系統上,因此不需要再次執行。
當一個 Map 任務首先被 worker A 執行,之后由于 worker A 失效了又被調度到 worker B 執行,這個“重 新執行”的動作會被通知給所有執行 Reduce 任務的 worker。任何還沒有從 worker A 讀取數據的 Reduce 任務 將從 worker B 讀取數據。
MapReduce 可以處理大規模 worker 失效的情況。比如,在一個 MapReduce 操作執行期間,在正在運行 的集群上進行網絡維護引起 80 臺機器在幾分鐘內不可訪問了,MapReduce master 只需要簡單的再次執行那些 不可訪問的 worker 完成的工作,之后繼續執行未完成的任務,直到最終完成這個 MapReduce 操作。
3.3.2 master 失敗
一個簡單的解決辦法是讓 master 周期性的將上面描述的數據結構(alex 注:指 3.2 節)的寫入磁盤,即 檢查點(checkpoint)。如果這個 master 任務失效了,可以從最后一個檢查點(checkpoint)開始啟動另一個 master 進程。然而,由于只有一個 master 進程,master 失效后再恢復是比較麻煩的,因此我們現在的實現是 如果 master 失效,就中止 MapReduce 運算。客戶可以檢查到這個狀態,并且可以根據需要重新執行 MapReduce 操作。
3.3.3 在失效方面的處理機制
(alex 注:原文為”semantics in the presence of failures”)
當用戶提供的 Map 和 Reduce 操作是輸入確定性函數(即相同的輸入產生相同的輸出)時,我們的分布 式實現在任何情況下的輸出都和所有程序沒有出現任何錯誤、順序的執行產生的輸出是一樣的。
我們依賴對 Map 和 Reduce 任務的輸出是原子提交的來完成這個特性。每個工作中的任務把它的輸出寫 到私有的臨時文件中。每個 Reduce 任務生成一個這樣的文件,而每個 Map 任務則生成 R 個這樣的文件(一 個 Reduce 任務對應一個文件)。當一個 Map 任務完成的時,worker 發送一個包含 R 個臨時文件名的完成消息 給 master。如果 master 從一個已經完成的 Map 任務再次接收到到一個完成消息,master 將忽略這個消息;否 則,master 將這 R 個文件的名字記錄在數據結構里。
當 Reduce 任務完成時,Reduce worker 進程以原子的方式把臨時文件重命名為最終的輸出文件。如果同 一個 Reduce 任務在多臺機器上執行,針對同一個最終的輸出文件將有多個重命名操作執行。我們依賴底層文 件系統提供的重命名操作的原子性來保證最終的文件系統狀態僅僅包含一個 Reduce 任務產生的數據。
使用 MapReduce 模型的程序員可以很容易的理解他們程序的行為,因為我們絕大多數的 Map 和 Reduce 操作是確定性的,而且存在這樣的一個事實:我們的失效處理機制等價于一個順序的執行的操作。當 Map 或 /和 Reduce 操作是不確定性的時候,我們提供雖然較弱但是依然合理的處理機制。當使用非確定操作的時候, 一個 Reduce 任務 R1 的輸出等價于一個非確定性程序順序執行產生時的輸出。但是,另一個 Reduce 任務 R2的輸出也許符合一個不同的非確定順序程序執行產生的 R2 的輸出。考慮 Map 任務 M 和Reduce 任務 R1、R2 的情況。我們設定 e(Ri)是 Ri 已經提交的執行過程(有且僅有一個這樣的執行過程)。當 e(R1)讀取了由 M 一次執行產生的輸出,而 e(R2)讀取了由 M 的另一次執行產生的 輸出,導致了較弱的失效處理。
3.4 存儲位置
在我們的計算運行環境中,網絡帶寬是一個相當匱乏的資源。我們通過盡量把輸入數據(由 GFS 管理)存 儲在集群中機器的本地磁盤上來節省網絡帶寬。GFS 把每個文件按 64MB 一個 Block 分隔,每個 Block 保存 在多臺機器上,環境中就存放了多份拷貝(一般是 3 個拷貝)。MapReduce 的 master 在調度 Map 任務時會考慮 輸入文件的位置信息,盡量將一個 Map 任務調度在包含相關輸入數據拷貝的機器上執行;如果上述努力失敗 了,master 將嘗試在保存有輸入數據拷貝的機器附近的機器上執行 Map 任務(例如,分配到一個和包含輸入數 據的機器在一個 switch 里的 worker 機器上執行)。當在一個足夠大的 cluster 集群上運行大型 MapReduce 操作 的時候,大部分的輸入數據都能從本地機器讀取,因此消耗非常少的網絡帶寬。
3.5 任務粒度
如前所述,我們把 Map 拆分成了 M 個片段、把 Reduce 拆分成 R 個片段執行。理想情況下,M 和 R 應當 比集群中 worker 的機器數量要多得多。在每臺 worker 機器都執行大量的不同任務能夠提高集群的動態的負載 均衡能力,并且能夠加快故障恢復的速度:失效機器上執行的大量 Map 任務都可以分布到所有其他的 worker 機器上去執行。
但是實際上,在我們的具體實現中對 M 和 R 的取值都有一定的客觀限制,因為 master 必須執行 O(M+R) 次調度,并且在內存中保存 O(M*R)個狀態(對影響內存使用的因素還是比較小的:O(M*R)塊狀態,大概每 對 Map 任務/Reduce 任務 1 個字節就可以了)。
更進一步,R 值通常是由用戶指定的,因為每個 Reduce 任務最終都會生成一個獨立的輸出文件。實際使 用時我們也傾向于選擇合適的 M 值,以使得每一個獨立任務都是處理大約 16M 到 64M 的輸入數據(這樣, 上面描寫的輸入數據本地存儲優化策略才最有效),另外,我們把 R 值設置為我們想使用的 worker 機器數量 的小的倍數。我們通常會用這樣的比例來執行 MapReduce:M=200000,R=5000,使用 2000 臺 worker 機器。
3.6 備用任務
影響一個 MapReduce 的總執行時間最通常的因素是“落伍者”:在運算過程中,如果有一臺機器花了很 長的時間才完成最后幾個 Map 或 Reduce 任務,導致 MapReduce 操作總的執行時間超過預期。出現“落伍者” 的原因非常多。比如:如果一個機器的硬盤出了問題,在讀取的時候要經常的進行讀取糾錯操作,導致讀取
數據的速度從 30M/s 降低到 1M/s。如果 cluster 的調度系統在這臺機器上又調度了其他的任務,由于 CPU、內 存、本地硬盤和網絡帶寬等競爭因素的存在,導致執行 MapReduce 代碼的執行效率更加緩慢。我們最近遇到 的一個問題是由于機器的初始化代碼有 bug,導致關閉了的處理器的緩存:在這些機器上執行任務的性能和正常情況相差上百倍。我們有一個通用的機制來減少“落伍者”出現的情況。當一個 MapReduce 操作接近完成的時候,master調度備用(backup)任務進程來執行剩下的、處于處理中狀態(in-progress)的任務。無論是最初的執行進程、 還是備用(backup)任務進程完成了任務,我們都把這個任務標記成為已經完成。我們調優了這個機制,通 常只會占用比正常操作多幾個百分點的計算資源。我們發現采用這樣的機制對于減少超大 MapReduce 操作的 總處理時間效果顯著。例如,在 5.3 節描述的排序任務,在關閉掉備用任務的情況下要多花 44%的時間完成 排序任務。
4 技巧
雖然簡單的 Map 和 Reduce 函數提供的基本功能已經能夠滿足大部分的計算需要,我們還是發掘出了一些有價值的擴展功能。本節將描述這些擴展功能。
4.1 分區函數
MapReduce 的使用者通常會指定 Reduce 任務和 Reduce 任務輸出文件的數量(R)。我們在中間 key 上使 用分區函數來對數據進行分區,之后再輸入到后續任務執行進程。一個缺省的分區函數是使用 hash 方法(比如, hash(key) mod R)進行分區。hash 方法能產生非常平衡的分區。然而,有的時候,其它的一些分區函數對 key 值進行的分區將非常有用。比如,輸出的 key 值是 URLs,我們希望每個主機的所有條目保持在同一個輸出文 件中。為了支持類似的情況,MapReduce 庫的用戶需要提供專門的分區函數。例如,使用“hash(Hostname(urlkey)) mod R”作為分區函數就可以把所有來自同一個主機的 URLs 保存在同一個輸出文件中。
4.2 順序保證
我們確保在給定的分區中,中間 key/value pair 數據的處理順序是按照 key 值增量順序處理的。這樣的順 序保證對每個分成生成一個有序的輸出文件,這對于需要對輸出文件按 key 值隨機存取的應用非常有意義, 對在排序輸出的數據集也很有幫助。
4.3 Combiner 函數
在某些情況下,Map 函數產生的中間 key 值的重復數據會占很大的比重,并且,用戶自定義的 Reduce 函數滿足結合律和交換律。在 2.1 節的詞數統計程序是個很好的例子。由于詞頻率傾向于一個 zipf 分布(齊夫分布),每個 Map 任務將產生成千上萬個這樣的記錄<the,1>。所有的這些記錄將通過網絡被發送到一個單獨的 Reduce 任務,然后由這個 Reduce 任務把所有這些記錄累加起來產生一個數字。我們允許用戶指定一個可選 的 combiner 函數,combiner 函數首先在本地將這些記錄進行一次合并,然后將合并的結果再通過網絡發送出去。
Combiner 函數在每臺執行 Map 任務的機器上都會被執行一次。一般情況下,Combiner 和 Reduce 函數是一樣的。Combiner 函數和 Reduce 函數之間唯一的區別是 MapReduce 庫怎樣控制函數的輸出。Reduce 函數的 輸出被保存在最終的輸出文件里,而 Combiner 函數的輸出被寫到中間文件里,然后被發送給 Reduce 任務。
部分的合并中間結果可以顯著的提高一些 MapReduce 操作的速度。附錄 A 包含一個使用 combiner 函數 的例子。
4.4 輸入和輸出的類型
MapReduce 庫支持幾種不同的格式的輸入數據。比如,文本模式的輸入數據的每一行被視為是一個 key/value pair。key 是文件的偏移量,value 是那一行的內容。另外一種常見的格式是以 key 進行排序來存儲 的 key/value pair 的序列。每種輸入類型的實現都必須能夠把輸入數據分割成數據片段,該數據片段能夠由單 獨的 Map 任務來進行后續處理(例如,文本模式的范圍分割必須確保僅僅在每行的邊界進行范圍分割)。雖然 大多數 MapReduce 的使用者僅僅使用很少的預定義輸入類型就滿足要求了,但是使用者依然可以通過提供一 個簡單的 Reader 接口實現就能夠支持一個新的輸入類型。
Reader 并非一定要從文件中讀取數據,比如,我們可以很容易的實現一個從數據庫里讀記錄的 Reader, 或者從內存中的數據結構讀取數據的 Reader。 類似的,我們提供了一些預定義的輸出數據的類型,通過這些預定義類型能夠產生不同格式的數據。用 戶采用類似添加新的輸入數據類型的方式增加新的輸出類型。
4.5 副作用
在某些情況下,MapReduce 的使用者發現,如果在 Map 和/或 Reduce 操作過程中增加輔助的輸出文件會 比較省事。我們依靠程序 writer 把這種“副作用”變成原子的和冪等的3。通常應用程序首先把輸出結果寫到 一個臨時文件中,在輸出全部數據之后,在使用系統級的原子操作 rename 重新命名這個臨時文件。 如果一個任務產生了多個輸出文件,我們沒有提供類似兩階段提交的原子操作支持這種情況。因此,對 于會產生多個輸出文件、并且對于跨文件有一致性要求的任務,都必須是確定性的任務。但是在實際應用過 程中,這個限制還沒有給我們帶來過麻煩。
4.6 跳過損壞的記錄
有時候,用戶程序中的 bug 導致 Map 或者 Reduce 函數在處理某些記錄的時候 crash 掉,MapReduce 操作 無法順利完成。慣常的做法是修復 bug 后再次執行 MapReduce 操作,但是,有時候找出這些 bug 并修復它們 不是一件容易的事情;這些 bug 也許是在第三方庫里邊,而我們手頭沒有這些庫的源代碼。而且在很多時候, 忽略一些有問題的記錄也是可以接受的,比如在一個巨大的數據集上進行統計分析的時候。我們提供了一種 執行模式,在這種模式下,為了保證保證整個處理能繼續進行,MapReduce 會檢測哪些記錄導致確定性的 crash, 并且跳過這些記錄不處理。
每個 worker 進程都設置了信號處理函數捕獲內存段異常(segmentation violation)和總線錯誤(bus error)。 在執行 Map 或者 Reduce 操作之前,MapReduce 庫通過全局變量保存記錄序號。如果用戶程序觸發了一個系 統信號,消息處理函數將用“最后一口氣”通過 UDP 包向 master 發送處理的最后一條記錄的序號。當 master 看到在處理某條特定記錄不止失敗一次時,master 就標志著條記錄需要被跳過,并且在下次重新執行相關的 Map 或者 Reduce 任務的時候跳過這條記錄。
4.7 本地執行
調試 Map 和 Reduce 函數的 bug 是非常困難的,因為實際執行操作時不但是分布在系統中執行的,而且 通常是在好幾千臺計算機上執行,具體的執行位置是由 master 進行動態調度的,這又大大增加了調試的難度。 為了簡化調試、profile 和小規模測試,我們開發了一套 MapReduce 庫的本地實現版本,通過使用本地版本的 MapReduce 庫,MapReduce 操作在本地計算機上順序的執行。用戶可以控制 MapReduce 操作的執行,可以把 操作限制到特定的 Map 任務上。用戶通過設定特別的標志來在本地執行他們的程序,之后就可以很容易的使 用本地調試和測試工具(比如 gdb)。
4.8 狀態信息
master 使用嵌入式的 HTTP 服務器(如 Jetty)顯示一組狀態信息頁面,用戶可以監控各種執行狀態。狀 態信息頁面顯示了包括計算執行的進度,比如已經完成了多少任務、有多少任務正在處理、輸入的字節數、 中間數據的字節數、輸出的字節數、處理百分比等等。頁面還包含了指向每個任務的 stderr 和 stdout 文件的鏈 接。用戶根據這些數據預測計算需要執行大約多長時間、是否需要增加額外的計算資源。這些頁面也可以用 來分析什么時候計算執行的比預期的要慢。
另外,處于最頂層的狀態頁面顯示了哪些 worker 失效了,以及他們失效的時候正在運行的 Map 和 Reduce 任務。這些信息對于調試用戶代碼中的 bug 很有幫助。
4.9 計數器
MapReduce 庫使用計數器統計不同事件發生次數。比如,用戶可能想統計已經處理了多少個單詞、已經 索引的多少篇 German 文檔等等。
為了使用這個特性,用戶在程序中創建一個命名的計數器對象,在 Map 和 Reduce 函數中相應的增加計 數器的值。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
? ? for each word w in contents:
? ? ? ? if (IsCapitalized(w)):
? ? ? ? ? ? uppercase->Increment();
? ? ? ? EmitIntermediate(w, “1′′);
這些計數器的值周期性的從各個單獨的 worker 機器上傳遞給 maste(r 附加在 ping 的應答包中傳遞)。master 把執行成功的 Map 和 Reduce 任務的計數器值進行累計,當 MapReduce 操作完成之后,返回給用戶代碼。
計數器當前的值也會顯示在 master 的狀態頁面上,這樣用戶就可以看到當前計算的進度。當累加計數器 的值的時候,master 要檢查重復運行的 Map 或者 Reduce 任務,避免重復累加(之前提到的備用任務和失效 后重新執行任務這兩種情況會導致相同的任務被多次執行)。
有些計數器的值是由 MapReduce 庫自動維持的,比如已經處理的輸入的 key/value pair 的數量、輸出的 key/value pair 的數量等等。
計數器機制對于 MapReduce 操作的完整性檢查非常有用。比如,在某些 MapReduce 操作中,用戶需要確 保輸出的 key value pair 精確的等于輸入的 key value pair,或者處理的 German 文檔數量在處理的整個文檔數 量中屬于合理范圍。
5 性能
本節我們用在一個大型集群上運行的兩個計算來衡量 MapReduce 的性能。一個計算在大約 1TB 的數據中 進行特定的模式匹配,另一個計算對大約 1TB 的數據進行排序。
這兩個程序在大量的使用 MapReduce 的實際應用中是非常典型的 — 一類是對數據格式進行轉換,從一 種表現形式轉換為另外一種表現形式;另一類是從海量數據中抽取少部分的用戶感興趣的數據。
5.1 集群配置
所有這些程序都運行在一個大約由 1800 臺機器構成的集群上。每臺機器配置 2 個 2G 主頻、支持超線程 的 Intel Xeon 處理器,4GB 的物理內存,兩個 160GB 的 IDE 硬盤和一個千兆以太網卡。這些機器部署在一個 兩層的樹形交換網絡中,在 root 節點大概有 100-200GBPS 的傳輸帶寬。所有這些機器都采用相同的部署(對 等部署),因此任意兩點之間的網絡來回時間小于 1 毫秒。
在 4GB 內存里,大概有 1-1.5G 用于運行在集群上的其他任務。測試程序在周末下午開始執行,這時主機 的 CPU、磁盤和網絡基本上處于空閑狀態。
5.2 GREP
這個分布式的 grep 程序需要掃描大概 10 的 10 次方個由 100 個字節組成的記錄,查找出現概率較小的 3 個字符的模式(這個模式在 92337 個記錄中出現)。輸入數據被拆分成大約 64M 的 Block(M=15000),整個 輸出數據存放在一個文件中(R=1)。
5.2 GREP
這個分布式的 grep 程序需要掃描大概 10 的 10 次方個由 100 個字節組成的記錄,查找出現概率較小的 3 個字符的模式(這個模式在 92337 個記錄中出現)。輸入數據被拆分成大約 64M 的 Block(M=15000),整個 輸出數據存放在一個文件中(R=1)。
? ? ? ? ? ? ? ? ? ? ? ??
圖 2 顯示了這個運算隨時間的處理過程。其中 Y 軸表示輸入數據的處理速度。處理速度隨著參與 MapReduce 計算的機器數量的增加而增加,當 1764 臺 worker 參與計算的時,處理速度達到了 30GB/s。當 Map 任務結束的時候,即在計算開始后 80 秒,輸入的處理速度降到 0。整個計算過程從開始到結束一共花了 大概 150 秒。這包括了大約一分鐘的初始啟動階段。初始啟動階段消耗的時間包括了是把這個程序傳送到各 個 worker 機器上的時間、等待 GFS 文件系統打開 1000 個輸入文件集合的時間、獲取相關的文件本地位置優 化信息的時間。
5.3 排序
排序程序處理 10 的 10 次方個 100 個字節組成的記錄(大概 1TB 的數據)。這個程序模仿 TeraSortbenchmark[10]。排序程序由不到 50 行代碼組成。只有三行的 Map 函數從文本行中解析出 10 個字節的 key 值作為排序的 key,并且把這個 key 和原始文本行作為中間的 key/value pair 值輸出。我們使用了一個內置的恒等函數作為 Reduce 操作函數。這個函數把中間的 key/value pair 值不作任何改變輸出。最終排序結果輸出到兩路復制的GFS 文件系統(也就是說,程序輸出 2TB 的數據)。如前所述,輸入數據被分成 64MB 的 Block(M=15000)。我們把排序后的輸出結果分區后存儲到 4000個文件(R=4000)。分區函數使用 key 的原始字節來把數據分區到 R 個片段中。在這個 benchmark 測試中,我們使用的分區函數知道 key 的分區情況。通常對于排序程序來說,我們會增加一個預處理的 MapReduce 操作用于采樣 key 值的分布情況,通過采樣的數據來計算對最終排序處理的分區點。
?
圖三(a)顯示了這個排序程序的正常執行過程。左上的圖顯示了輸入數據讀取的速度。數據讀取速度峰 值會達到 13GB/s,并且所有 Map 任務完成之后,即大約 200 秒之后迅速滑落到 0。值得注意的是,排序程序 輸入數據讀取速度小于分布式 grep 程序。這是因為排序程序的 Map 任務花了大約一半的處理時間和 I/O 帶寬 把中間輸出結果寫到本地硬盤。相應的分布式 grep 程序的中間結果輸出幾乎可以忽略不計。
左邊中間的圖顯示了中間數據從 Map 任務發送到 Reduce 任務的網絡速度。這個過程從第一個 Map 任務 完成之后就開始緩慢啟動了。圖示的第一個高峰是啟動了第一批大概 1700 個 Reduce 任務(整個 MapReduce 分布到大概 1700 臺機器上,每臺機器 1 次最多執行 1 個 Reduce 任務)。排序程序運行大約 300 秒后,第一批 啟動的 Reduce 任務有些完成了,我們開始執行剩下的 Reduce 任務。所有的處理在大約 600 秒后結束。
左下圖表示 Reduce 任務把排序后的數據寫到最終的輸出文件的速度。在第一個排序階段結束和數據開始 寫入磁盤之間有一個小的延時,這是因為 worker 機器正在忙于排序中間數據。磁盤寫入速度在 2-4GB/s 持續 一段時間。輸出數據寫入磁盤大約持續 850 秒。計入初始啟動部分的時間,整個運算消耗了 891 秒。這個速度和 TeraSort benchmark[18]的最高紀錄 1057 秒相差不多。 還有一些值得注意的現象:輸入數據的讀取速度比排序速度和輸出數據寫入磁盤速度要高不少,這是因為我們的輸入數據本地化優化策略起了作用 — 絕大部分數據都是從本地硬盤讀取的,從而節省了網絡帶寬。 排序速度比輸出數據寫入到磁盤的速度快,這是因為輸出數據寫了兩份(我們使用了 2 路的 GFS 文件系統, 寫入復制節點的原因是為了保證數據可靠性和可用性)。我們把輸出數據寫入到兩個復制節點的原因是因為這 是底層文件系統的保證數據可靠性和可用性的實現機制。如果底層文件系統使用類似容錯編碼[14(erasure coding)的方式而不是復制的方式保證數據的可靠性和可用性,那么在輸出數據寫入磁盤的時候,就可以降低 網絡帶寬的使用。
5.4 高效的 backup 任務
圖三(b)顯示了關閉了備用任務后排序程序執行情況。執行的過程和圖 3(a)很相似,除了輸出數據寫 磁盤的動作在時間上拖了一個很長的尾巴,而且在這段時間里,幾乎沒有什么寫入動作。在 960 秒后,只有 5 個 Reduce 任務沒有完成。這些拖后腿的任務又執行了 300 秒才完成。整個計算消耗了 1283 秒,多了 44% 的執行時間。
5.5 失效的機器
在圖三(c)中演示的排序程序執行的過程中,我們在程序開始后幾分鐘有意的 kill 了 1746 個 worker 中 的 200 個。集群底層的調度立刻在這些機器上重新開始新的 worker 處理進程(因為只是 worker 機器上的處理 進程被 kill 了,機器本身還在工作)。
圖三(c)顯示出了一個“負”的輸入數據讀取速度,這是因為一些已經完成的 Map 任務丟失了(由于 相應的執行 Map 任務的 worker 進程被 kill 了),需要重新執行這些任務。相關 Map 任務很快就被重新執行了。 整個運算在 933 秒內完成,包括了初始啟動時間(只比正常執行多消耗了 5%的時間)。
6 經驗
我們在 2003 年 1 月完成了第一個版本的 MapReduce 庫,在 2003 年 8 月的版本有了顯著的增強,這包括 了輸入數據本地優化、worker 機器之間的動態負載均衡等等。從那以后,我們驚喜的發現,MapReduce 庫能 廣泛應用于我們日常工作中遇到的各類問題。它現在在 Google 內部各個領域得到廣泛應用,包括:
大規模機器學習問
Google News 和 Froogle 產品的集群問題
從公眾查詢產品(比如 Google 的 Zeitgeist)的報告中抽取數據。
從大量的新應用和新產品的網頁中提取有用信息(比如,從大量的位置搜索網頁中抽取地理位置信
息)。
大規模的圖形計算。
? ? ? ? ? ? ? ?
圖四顯示了在我們的源代碼管理系統中,隨著時間推移,獨立的 MapReduce 程序數量的顯著增加。從 2003 年早些時候的 0 個增長到 2004 年 9 月份的差不多 900 個不同的程序。MapReduce 的成功取決于采用 MapReduce 庫能夠在不到半個小時時間內寫出一個簡單的程序,這個簡單的程序能夠在上千臺機器的組成的集群上做大 規模并發處理,這極大的加快了開發和原形設計的周期。另外,采用 MapReduce 庫,可以讓完全沒有分布式 和/或并行系統開發經驗的程序員很容易的利用大量的資源,開發出分布式和/或并行處理的應用。
? ? ? ? ? ? ? ??
在每個任務結束的時候,MapReduce 庫統計計算資源的使用狀況。在表 1,我們列出了 2004 年 8 月份 MapReduce 運行的任務所占用的相關資源。
6.1 大規模索引
到目前為止,MapReduce 最成功的應用就是重寫了 Google 網絡搜索服務所使用到的 index 系統。索引系 統的輸入數據是網絡爬蟲抓取回來的海量的文檔,這些文檔數據都保存在 GFS 文件系統里。這些文檔原始內 容4的大小超過了 20TB。索引程序是通過一系列的 MapReduce 操作(大約 5 到 10 次)來建立索引。使用 MapReduce(替換上一個特別設計的、分布式處理的索引程序)帶來這些好處:
實現索引部分的代碼簡單、小巧、容易理解,因為對于容錯、分布式以及并行計算的處理都是 MapReduce 庫提供的。比如,使用 MapReduce 庫,計算的代碼行數從原來的 3800 行 C++代碼減少到大概 700 行代碼。
MapReduce 庫的性能已經足夠好了,因此我們可以把在概念上不相關的計算步驟分開處理,而不是混在 一起以期減少數據傳遞的額外消耗。概念上不相關的計算步驟的隔離也使得我們可以很容易改變索引處理方 式。比如,對之前的索引系統的一個小更改可能要耗費好幾個月的時間,但是在使用 MapReduce 的新系統上, 這樣的更改只需要花幾天時間就可以了。
索引系統的操作管理更容易了。因為由機器失效、機器處理速度緩慢、以及網絡的瞬間阻塞等引起的絕 大部分問題都已經由MapReduce 庫解決了,不再需要操作人員的介入了。另外,我們可以通過在索引系統集 群中增加機器的簡單方法提高整體處理性能?
7 相關工作
很多系統都提供了嚴格的編程模式,并且通過對編程的嚴格限制來實現并行計算。例如,一個結合函數可以通過把 N 個元素的數組的前綴在 N 個處理器上使用并行前綴算法,在 log N 的時間內計算完[6,9,13]。
MapReduce 可以看作是我們結合在真實環境下處理海量數據的經驗,對這些經典模型進行簡化和萃取的成果。 更加值得驕傲的是,我們還實現了基于上千臺處理器的集群的容錯處理。相比而言,大部分并發處理系統都 只在小規模的集群上實現,并且把容錯處理交給了程序員。
Bulk Synchronous Programming[17]和一些 MPI 原語[11]提供了更高級別的并行處理抽象,可以更容易寫 出并行處理的程序。MapReduce 和這些系統的關鍵不同之處在于,MapReduce 利用限制性編程模式實現了用 戶程序的自動并發處理,并且提供了透明的容錯處理。
我們數據本地優化策略的靈感來源于 active disks[12,15]等技術,在 active disks 中,計算任務是盡量推送 到數據存儲的節點處理6,這樣就減少了網絡和 IO 子系統的吞吐量。我們在掛載幾個硬盤的普通機器上執行 我們的運算,而不是在磁盤處理器上執行我們的工作,但是達到的目的一樣的。
我們的備用任務機制和 Charlotte System[3]提出的 eager 調度機制比較類似。Eager 調度機制的一個缺點是 如果一個任務反復失效,那么整個計算就不能完成。我們通過忽略引起故障的記錄的方式在某種程度上解決 了這個問題。
MapReduce 的實現依賴于一個內部的集群管理系統,這個集群管理系統負責在一個超大的、共享機器的 集群上分布和運行用戶任務。雖然這個不是本論文的重點,但是有必要提一下,這個集群管理系統在理念上 和其它系統,如 Condor[16]是一樣。
MapReduce 庫的排序機制和 NOW-Sort[1]的操作上很類似。讀取輸入源的機器(map workers)把待排序 的數據進行分區后,發送到 R 個 Reduce worker 中的一個進行處理。每個 Reduce worker 在本地對數據進行排 序(盡可能在內存中排序)。當然,NOW-Sort 沒有給用戶自定義的 Map 和 Reduce 函數的機會,因此不具備 MapReduce 庫廣泛的實用性。
River[2]提供了一個編程模型:處理進程通過分布式隊列傳送數據的方式進行互相通訊。和 MapReduce 類似,River 系統嘗試在不對等的硬件環境下,或者在系統顛簸的情況下也能提供近似平均的性能。River 是 通過精心調度硬盤和網絡的通訊來平衡任務的完成時間。MapReduce 庫采用了其它的方法。通過對編程模型 進行限制,MapReduce 框架把問題分解成為大量的“小”任務。這些任務在可用的 worker 集群上動態的調度, 這樣快速的 worker 就可以執行更多的任務。通過對編程模型進行限制,我們可用在工作接近完成的時候調度 備用任務,縮短在硬件配置不均衡的情況下縮小整個操作完成的時間(比如有的機器性能差、或者機器被某 些操作阻塞了)。
BAD-FS[5]采用了和 MapReduce 完全不同的編程模式,它是面向廣域網(alex 注:wide-area network)的。不過,這兩個系統有兩個基礎功能很類似。(1)兩個系統采用重新執行的方式來防止由于失效導致的數據丟失。(2)兩個都使用數據本地化調度策略,減少網絡通訊的數據量。 TACC[7]是一個用于簡化構造高可用性網絡服務的系統。和 MapReduce 一樣,它也依靠重新執行機制來實現的容錯處理。
8 結束語
MapReduce 編程模型在 Google 內部成功應用于多個領域。我們把這種成功歸結為幾個方面:首先,由于 MapReduce 封裝了并行處理、容錯處理、數據本地化優化、負載均衡等等技術難點的細節,這使得 MapReduce 庫易于使用。即便對于完全沒有并行或者分布式系統開發經驗的程序員而言;其次,大量不同類型的問題都 可以通過 MapReduce 簡單的解決。比如,MapReduce 用于生成 Google 的網絡搜索服務所需要的數據、用來 排序、用來數據挖掘、用于機器學習,以及很多其它的系統;第三,我們實現了一個在數千臺計算機組成的 大型集群上靈活部署運行的 MapReduce。這個實現使得有效利用這些豐富的計算資源變得非常簡單,因此也 適合用來解決 Google 遇到的其他很多需要大量計算的問題。
我們也從 MapReduce 開發過程中學到了不少東西。首先,約束編程模式使得并行和分布式計算非常容易, 也易于構造容錯的計算環境;其次,網絡帶寬是稀有資源。大量的系統優化是針對減少網絡傳輸量為目的的: 本地優化策略使大量的數據從本地磁盤讀取,中間文件寫入本地磁盤、并且只寫一份中間文件也節約了網絡 帶寬;第三,多次執行相同的任務可以減少性能緩慢的機器帶來的負面影響(alex 注:即硬件配置的不平衡), 同時解決了由于機器失效導致的數據丟失問題。
?
總結
以上是生活随笔為你收集整理的开启大数据时代谷歌三篇论文-Mapreduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 1-spark学习笔记-大数据概述
- 下一篇: 2-spark学习笔记-spark发展概