MapReduce:Simplified Data Processing on Large Clusters中文版from百度文库
超大集群的簡單數據處理
轉自百度文庫
Jeffrey Dean Sanjay Ghemawat
jeff@google.com , sanjay@google.com
Google , Inc.
摘要
MapReduce是一個編程模式,它是與處理/產生海量數據集的實現相關。用戶指定一個map函數,通過這個map函數處理key/value(鍵/值)對,并且產生一系列的中間key/value對,并且使用reduce函數來合并所有的具有相同key值的中間鍵值對中的值部分。現實生活中的很多任務的實現都是基于這個模式的,正如本文稍后會講述的那樣。
使用這樣的函數形式實現的程序可以自動分布到一個由普通機器組成的超大集群上并發執行。run-time系統會解決輸入數據的分布細節,跨越機器集群的程序執行調度,處理機器的失效,并且管理機器之間的通訊請求。這樣的模式允許程序員可以不需要有什么并發處理或者分布式系統的經驗,就可以處理超大的分布式系統得資源。
我們的MapReduce系統的實現運行在一個由普通機器組成的大型集群上,并且有著很高的擴展性:一個典型的MapReduce計算處理通常分布到上千臺機器上來處理上TB的數據。程序員會發現這樣的系統很容易使用:已經開發出來了上百個MapReduce程序,并且每天在Google的集群上有上千個MapReduce job正在執行。
?
1 介紹
在過去的5年內,Google的創造者和其他人實現了上百個用于特別計算目的的程序來出來海量的原始數據,比如蠕蟲文檔,web請求log,等等,用于計算出不同的數據,比如降序索引,不同的圖示展示的web文檔,蠕蟲采集的每個host的page數量摘要,給定日期內最常用的查詢等等。絕大部分計算都是概念上很簡潔的。不過,輸入的數據通常是非常巨大的,并且為了能在合理時間內執行完畢,其上的計算必須分布到上百個或者上千個計算機上去執行。如何并發計算,如何分布數據,如何處理失敗等等相關問題合并在一起就會導致原本簡單的計算掩埋在為了解決這些問題而引入的很復雜的代碼中。
因為這種復雜度,我們設計了一種新的東西來讓我們能夠方便處理這樣的簡單計算。這些簡單計算原本很簡單,但是由于考慮到并發處理細節,容錯細節,以及數據分布細節,負載均衡等等細節問題,而導致代碼非常復雜。所以我們抽象這些公共的細節到一個lib中。這種抽象是源自Lisp以及其他很多面向功能的語言的map和reduce概念。我們認識到大部分操作都和map操作相關,這些map操作都是運算在輸入記錄的每個邏輯”record”上,并且map操作為了產生一組中間的key/value鍵值對,并且接著在所有相同key的中間結果上執行reduce操作,這樣就可以合并適當的數據。我們的函數模式是使用用戶定義的map和reduce操作,這樣可以讓我們并發執行大規模的運算,并且使用重新執行的方式作為容錯的優先機制。
MapReduce的主要貢獻在于提供了一個簡單強大的接口,通過這個接口,可以把大尺度的計算自動的并發和分布執行。使用這個接口,可以通過普通PC的巨大集群,來達到極高的性能。
第二節講述了基本的編程模式,并且給出了一些例子。第三節講述了一個面向我們基于集群的計算環境的MapReduce的實現。第四節講述了一些我們建議的精巧編程模式。第五節講述了在不同任務下我們的MapReduce實現的性能比較。第六節講述了在Google中的MapReduce應用以及嘗試重寫了我們產品的索引系統。第七節講述了相關工作和未來的工作。
2 編程模式
我們的運算處理一組輸入的(input)鍵值對(key/valuepairs),并且產生一組輸出的(output)鍵值對。MapReduce函數庫德用戶用兩個函數來表達這樣的計算:Map和Reduce。
Map函數,是用戶自定義的的函數,處理輸入的鍵值對,并且產生一組中間的(intermediate)鍵值對。MapReduce函數庫稽核所有相同的中間鍵值鍵I的值,并且發送給Reduce函數進行處理。
Reduce函數同樣也是用戶提供的,它處理中間鍵值I,以及這個中間鍵值相關的值集合。這個函數合并這些值,最后形成一個相對較小的值集合。通常一個單次Reduce執行會產生0個或者1個輸出值。提供給Reduce函數的中間值是通過一個iterator來提供的。這就讓我們可以處理超過內存容量的值列表。
2.1 例子
?
我們考慮這樣一個例子,在很大的文檔集合中統計每一個單詞出現的次數。我們寫出類似如下的偽代碼:
map(String key, String value):
?????? // key: document name
?????? // value: document contents
?????? for each word w in value:
????????????? EmitIntermediate(w, "1");
?
reduce(String key, Iterator values):
?????? // key: a word
?????? // values: a list of counts
?????? int result = 0;
?????? for each v in values:
????????????? result += ParseInt(v);
?????? Emit(AsString(result));
?
map函數檢查每一個單詞,并且對每一個單詞增加1到其對應的計數器(在這個例子里就是’1’).reduce函數把特定單詞的所有出現的次數進行合并。
此外,我們還要寫代碼來對mapreduce specification對象進行賦值,設定輸入和輸出的文件名,以及設定一些參數。接著我們調用MapReduce函數,把這個對象作為參數調用過去。我們把MapReduce函數庫(C++函數庫)和我們的程序鏈接在一起。附件1有完整的這個例子的代碼。
?
2.2 類型
?
即使上邊的例子是用字符串作為輸入和輸入出的,從概念上講,使用者提供的map和reduce函數有著如下相關類型:
map (k1,v1) ?????????????? à??? list(k2,v2)
reduce (k2,list(v2)) ??????????? à??? list(v2)
也就是,輸入的鍵和值和輸出的鍵值是屬于不同的域的。進一步說,中間的鍵值是和輸出的鍵值屬于相同的域的。(比如map的輸出,就是作為reduce的輸入)。
我們的C++實現上,把字符串作為用戶定義函數的輸入和輸出,由用戶代碼來自己識別字符串到合適的類型。
?
2.3 其他例子
?
這里有一些簡單有趣的例子,都可以簡單的通過MapReduce計算模型來展示:
分布式Grep:??????????? 如果map函數檢查輸入行,滿足條件的時候,map函數就把本行輸出。reduce函數就是一個直通函數,簡單的把中間數據輸出就可以了。
URL訪問頻率統計:? map函數處理webpag請求和應答(URL,1)的log。Reduce函數把所有相同的URL的值合并,并且輸出一個成對的(URL,總個數)。
逆向Web-Link 圖:?? map函數輸出所有包含指向target URL的source網頁,用(target,source)這樣的結構對輸出。Reduce函數局和所有關聯相同target URL的source列表,并且輸出一個(target,list(source))這樣的結構。
主機關鍵向量指標(Term-Vector per Hosts):?? 關鍵詞向量指標簡而言之就是在一個文檔或者一組文檔中的重點次出現的頻率,用(word,frequency)表達。map函數計算每一個輸入文檔(主機名字是從文檔的URL取出的)的關鍵詞向量,然后輸出(hostname,關鍵詞向量(Term-Vector))。reduce函數處理所有相同host的所有文檔關鍵詞向量。去掉不常用的關鍵詞,并且輸出最終的(hostname,關鍵詞向量)對。
逆序索引:???????????????? map函數分析每一個文檔,并且產生一個序列(word,documentID)組。reduce函數處理指定word的所有的序列組,并且對相關的document ID進行排序,輸出一個(word,list(document ID))組。所有的輸出組,組成一個簡單的逆序索引。通過這種方法可以很容易保持關鍵詞在文檔庫中的位置。
分布式排序:??????????????????? map函數從每條記錄中抽取關鍵字,并且產生(key,record)對。reduce函數原樣輸出所有的關鍵字對。這個算法是與4.1節描述的分布式處理相關的,并且排序是在4.2節描述的。
3 實現
MapReduce接口可以有很多種不同的實現。應當根據不同的環境選擇不同的實現。比如,一個實現可以適用于小型的共享內存的機器,另一個實現可能是基于大型NUMA多處理器系統,還可能有為大規模計算機集群的實現。
本屆描述了Google廣泛使用的計算環境:用交換機網絡[4]連接的,由普通PC構成的超大集群。在我們的環境里:
(1)???? 每個節點通常是雙x86處理器,運行Linux,每臺機器2-4GB內存。
(2)???? 使用的網絡設備都是常用的。一般在節點上使用的是100M/或者千M網絡,一般情況下都用不到一半的網絡帶寬。
(3)???? 一個cluster中常常有成百上千臺機器,所以,機器故障是家常便飯。
(4)???? 存儲時使用的便宜的IDE硬盤,直接放在每一個機器上。并且有一個分布式的文件系統來管理這些分布在各個機器上的硬盤。文件系統通過復制的方法來在不可靠的硬件上保證可用性和可靠性。
(5)???? 用戶向調度系統提交請求。每一個請求都包含一組任務,映射到這個計算機cluster里的一組機器上執行。
?
3.1 執行概覽
?
Map操作通過把輸入數據進行分區(partition)(比如分為M塊),就可以分布到不同的機器上執行了。輸入塊的拆成多塊,可以并行在不同機器上執行。Reduce操作是通過對中間產生的key的分布來進行分布的,中間產生的key可以根據某種分區函數進行分布(比如hash(key) mod R),分布成為R塊。分區(R)的數量和分區函數都是由用戶指定的。
?
?
?
圖1是我們實現的MapReduce操作的整體數據流。當用戶程序調用MapReduce函數,就會引起如下的操作(圖一中的數字標示和下表的數字標示相同)。
?
1. 用戶程序中的MapReduce函數庫首先把輸入文件分成M塊,每塊大概16M到64M(可以通過參數決定)。接著在cluster的機器上執行處理程序。
2. 這些分排的執行程序中有一個程序比較特別,它是主控程序master。剩下的執行程序都是作為master分排工作的worker。總共有M個map任務和R個reduce任務需要分排。master選擇空閑的worker并且分配這些map任務或者reduce任務
3. 一個分配了map任務的worker讀取并處理相關的輸入小塊。他處理輸入的數據,并且將分析出的key/value對傳遞給用戶定義的map函數。map函數產生的中間結果key/value對暫時緩沖到內存。
4. 這些緩沖到內存的中間結果將被定時刷寫到本地硬盤,這些數據通過分區函數分成R個區。這些中間結果在本地硬盤的位置信息將被發送回master,然后這個master負責把這些位置信息傳送給reduce的worker。
5. 當master通知reduce的worker關于中間key/value對的位置時,他調用remote procedure來從map worker的本地硬盤上讀取緩沖的中間數據。當reduce的worker讀到了所有的中間數據,他就使用中間key進行排序,這樣可以使得相同key的值都在一起。因為有許多不同key的map都對應相同的reduce任務,所以,排序是必須的。如果中間結果集太大了,那么就需要使用外排序。
6. reduce worker根據每一個唯一中間key來遍歷所有的排序后的中間數據,并且把key和相關的中間結果值集合傳遞給用戶定義的reduce函數。reduce函數的對于本reduce區塊的輸出到一個最終的輸出文件。
7. 當所有的map任務和reduce任務都已經完成了的時候,master激活用戶程序。在這時候MapReduce返回用戶程序的調用點。
?
當這些成功結束以后,mapreduce的執行數據存放在總計R個輸出文件中(每個都是由reduce任務產生的,這些文件名是用戶指定的)。通常,用戶不需要合并這R個輸出文件到一個文件,他們通常把這些文件作為輸入傳遞到另一個MapReduce調用,或者用另一個分布式應用來處理這些文件,并且這些分布式應用把這些文件看成為輸入文件由于分區(partition)成為的多個塊文件。
3.2 Master的數據結構
?
master需要保存一定的數據結構。對于每一個map和reduce任務來說,都需要保存它的狀態(idle,in-progress或者completed),并且識別不同的worker機器(對于非idel的任務狀態)。
master是一個由map任務產生的中間區域文件位置信息到reduce任務的一個管道。因此,對于每一個完成得map任務,master保存下來這個map任務產生的R中間區域文件信息的位置和大小。對于這個位置和大小信息是當接收到map任務完成得時候做的。這些信息是增量推送到處于in-progress狀態的reduce任務的worker上的。
?
3.3 容錯考慮
?
由于MapReduce函數庫是設計用于在成百上千臺機器上處理海量數據的,所以這個函數庫必須考慮到機器故障的容錯處理。
?
Worker失效的考慮
master會定期ping每一個worker機器。如果在一定時間內沒有worker機器的返回,master就認為這個worker失效了。所有這臺worker完成的map任務都被設置成為他們的初始idel狀態,并且因此可以被其他worker所調度執行。類似的,所有這個機器上正在處理的map 任務或者reduce任務都被設置成為idle狀態,可以被其他worker所重新執行。
在失效機器上的已經完成的map任務還需要再次重新執行,這是因為中間結果存放在這個失效的機器上,所以導致中間結果無法訪問。已經完成的recude任務無需再次執行,因為他們的結果已經保存在全局的文件系統中了。
當map任務首先由Aworker執行,隨后被Bworker執行的時候(因為A失效了),所有執行reduce任務的worker都會被通知。所有還沒有來得及從A上讀取數據的worker都會從B上讀取數據。
MapReduce可以有效地支持到很大尺度的worker失效的情況。比如,在一個MapReduce操作中,在一個網絡例行維護中,可能會導致每次大約有80臺機器在幾分鐘之內不能訪問。MapReduce的master制式簡單的把這些不能訪問的worker上的工作再執行一次,并且繼續調度進程,最后完成MapReduce的操作。
?
Master失效
?
在master中,定期會設定checkpoint,寫出master的數據結構。如果master任務失效了,可以從上次最后一個checkpoint開始啟動另一個master進程。不過,由于只有一個master在運行,所以他如果失效就比較麻煩,因此我們當前的實現上,是如果master失效了,就終止MapReduce執行。客戶端可以檢測這種失效并且如果需要就重新嘗試MapReduce操作。
?
失效的處理設計
當用戶提供的map和reduce函數對于他們的輸入來說是確定性的函數,我們的分布式的輸出就應當和在一個整個程序沒有失敗的連續執行相同。
我們依靠對map和reduce任務的輸出進行原子提交來完成這樣的可靠性。每一個in-progress任務把輸出寫道一個私有的臨時文件中。reduce任務產生一個這樣的文件,map任務產生R個這樣的任務(每一個對應一個reduce任務)。當一個map任務完成的時候,worker發送一個消息給master,并且這個消息中包含了這個R臨時文件的名字。如果master又收到一個已經完成的map任務的完成消息,他就忽略這個消息。否則,他就在master數據結構中記錄這個R文件。
當一個reduce任務完成的時候,reduce worker自動把臨時輸出的文件名改為正式的輸出文件。如果再多臺機器上有相同的reduce任務執行,那么就會有多個針對最終輸出文件的更名動作。我們依靠文件系統提供的原子操作’改名字’,來保證最終的文件系統狀態中記錄的是其中一個reduce任務的輸出。
我們的絕大部分map和reduce操作都是確定性的,實際上在語義角度,這個map和reduce并發執行和順序執行市一樣的,這就使得程序員很容易推測程序行為。當map和reduce操作是非確定性的時候,我們有稍弱的但是依舊是有道理的錯誤處理機制。對于非確定性操作來說,特定reduce任務R1的輸出,與,非確定性的順序執行的程序對R1的輸出是等價的。另外,另一個reduce任務R2的輸出,是和另一個順序執行的非確定性程序對應的R2輸出相關的。
考慮map任務M和reduce任務R1,R2。我們設定e(Ri)為已經提交的Ri執行(有且僅有一個這樣的執行)。當e(R1)處理得是M的一次執行,而e(R2)是處理M的另一次執行的時候,那么就會導致稍弱的失效處理了。
?
3.4 存儲位置
?
在我們的環境下,網絡帶寬資源是相對缺乏的。我們用盡量讓輸入數據保存在構成集群機器的本地硬盤上(通過GFS管理[8])的方式來減少網絡帶寬的開銷。GFS把文件分成64M一塊,并且每一塊都有幾個拷貝(通常是3個拷貝),分布到不同的機器上。MapReduce的master有輸入文件組的位置信息,并且嘗試分派map任務在對應包含了相關輸入數據塊的設備上執行。如果不能分配map任務到對應其輸入數據的機器上執行,他就嘗試分配map任務到盡量靠近這個任務的輸入數據庫的機器上執行(比如,分配到一個和包含輸入數據塊在一個switch網段的worker機器上執行)。當在一個足夠大的cluster集群上運行大型MapReduce操作的時候,大部分輸入數據都是在本地機器讀取的,他們消耗比較少的網絡帶寬。
3.5 任務顆粒度
?
如果上邊我們講的,我們把map階段拆分到M小塊,并且reduce階段拆分到R小塊執行。在理想狀態下,M和R應當比worker機器數量要多得多。每一個worker機器都通過執行大量的任務來提高動態的負載均衡能力,并且能夠加快故障恢復的速度:這個失效機器上執行的大量map任務都可以分布到所有其他worker機器上執行。
但是我們的實現中,實際上對于M和R的取值有一定的限制,因為master必須執行O(M+R)次調度,并且在內存中保存O(M*R)個狀態。(對影響內存使用的因素還是比較小的:O(M*R)塊狀態,大概每對map任務/reduce任務1個字節就可以了)
進一步來說,用戶通常會指定R的值,因為每一個reduce任務最終都是一個獨立的輸出文件。在實際中,我們傾向于調整M的值,使得每一個獨立任務都是處理大約16M到64M的輸入數據(這樣,上面描寫的本地優化策略會最有效),另外,我們使R比較小,這樣使得R占用不多的worker機器。我們通常會用這樣的比例來執行MapReduce: M=200,000,R=5,000,使用2,000臺worker機器。
3.6 備用任務
?
通常情況下,一個MapReduce的總執行時間會受到最后的幾個”拖后腿”的任務影響:在計算過程中,會有一個機器過了比正常執行時間長得多的時間還沒有執行完map或者reduce任務,導致MapReduce總任務不能按時完成。出現拖后腿的情況有很多原因。比如:一個機器的硬盤有點問題,經常需要反復讀取糾錯,然后把讀取輸入數據的性能從30M/s降低到1M/s。cluster調度系統已經在某臺機器上調度了其他的任務,所以因為CPU/內存/本地硬盤/網絡帶寬等競爭的關系,導致執行MapReduce的代碼性能比較慢。我們最近出現的一個問題是機器的啟動代碼有問題,導致關閉了cpu的cache:在這些機器上的任務性能有上百倍的影響。
我們有一個通用的機制來減少拖后腿的情況。當MapReduce操作接近完成的時候,master調度備用進程來執行那些剩下的in-progress狀態的任務。無論當最初的任務還是backup任務執行完成的時候,都把這個任務標記成為已經完成。我們調優了這個機制,通常只會占用多幾個百分點的機器資源。但是我們發現這樣做以后對于減少超大MapReduce操作的總處理時間來說非常有效。例如,在5.3節描述的排序任務,在關閉掉備用任務的情況下,要比有備用任務的情況下多花44%的時間。
?
4 技巧
雖然簡單寫map和reduce函數實現基本功能就已經對大部分需要都足夠了,我們還是開發了一些有用的擴展,這些在本節詳細描述。
?
4.1 分區函數
?
MapReduce的使用者通過指定(R)來給出reduce 任務/輸出文件的數量。他們處理的數據在這些任務上通過對中間結果key得分區函數來進行分區。缺省的分區函數時使用hash函數(例如hash(key)mod R)。這一般就可以得到分散均勻的分區。不過,在某些情況下,對key用其他的函數進行分區可能更有用。比如,某些情況下key是URL,那么我們希望所有對單個host的入口URL都保存在相同的輸出文件。為了支持類似的情況,MapReduce函數庫可以讓用戶提供一個特定的分區函數。比如使用hash(hostname(urlkey))mod R作為分區函數,這樣可以讓指向同一個hostname的URL分配到相同的輸出文件中。
4.2 順序保證
?
我們確保在給定的分區中,中間鍵值對key/value的處理順序是根據key增量處理的。這樣的順序保證可以很容易生成每一個分區有序的輸出文件,這對于輸出文件格式需要支持客戶端的對key的隨機存取的時候就很有用,或者對輸出數據集再作排序就很容易。
4.3 combiner函數
?
在某些情況下,允許中間結果key重復會占據相當的比重,并且用戶定義的reduce函數滿足結合律和交換律。比如2.1節的一個統計單詞出現次數的例子。由于word的頻率趨勢符合Zipf 分布(齊夫分布),每一個map任務都回產生成百上千的<the,1>這樣格式的記錄。所有這些記錄都通過網絡發送給一個單個的reduce任務,通過reduce函數進行相加,最后產生單個數字。我們允許用戶指定一個可選的組合函數Combiner函數,先在本地進行合并以下,然后再通過網絡發送。
Combiner函數在每一個map任務的機器上執行。通常這個combiner函數的代碼和reduce的代碼實現上都是一樣的。reduce函數和combiner函數唯一的不同就是MapReduce對于這兩個函數的輸出處理上不同。對于reduce函數的輸出是直接寫到最終的輸出文件。對于combiner函數來說,輸出是寫到中間文件,并且會被發送到reduce任務中去。
部分使用combiner函數可以顯著提高某些類型的MapReduce操作。附錄A有這樣的使用combiner的例子。
4.4 輸入和輸出類型
?
MapReduce函數庫提供了讀取幾種不同格式的輸入的支持。例如,”text”模式下,每行輸入都被看成一個key/value對:key是在文件的偏移量,value是行的內容。另一個寵用格式保存了根據key進行排序key/value對的順序。每一個輸入類型的實現都知道如何把輸入為了分別得map任務而進行有效分隔(比如,text模式下的分隔就是要確保分隔的邊界只能按照行來進行分隔)。用戶可以通過簡單的提供reader接口來進行新的輸入類型的支持。不過大部分用戶都只用一小部分預先定義的輸入類型。
reader函數不需要提供從文件讀取數據。例如,我們很容易定義一個reader函數從數據庫讀取數據,或者從保存在內存中的數據結構中讀取數據。
類似的,我們提供了一組用于輸出的類型,可以產生不同格式的數據,并且用戶也可以很簡單的增加新的輸出類型。
4.5 邊界效應
?
在某些情況下,MapReduce的使用上,如果再map操作或者reduce操作時,增加輔助的輸出文件,會比較有用。我們依靠程序來提供這樣的邊界原子操作。通常應用程序寫一個臨時文件并且用系統的原子操作:改名字操作,來再這個文件寫完的時候,一次把這個文件改名改掉。
對于單個任務產生的多個輸出文件來說,我們沒有提供其上的兩階段提交的原子操作支持。因此,對于產生多個輸出文件的,對于跨文件有一致性要求的任務,都必須是確定性的任務。這個限制到現在為止還沒有真正在實際中遇到過。
4.6 跳過損壞的記錄
?
某些情況下,用戶程序的代碼會讓map或者reduce函數在處理某些記錄的時候crash掉。這種情況下MapReduce操作就不能完成。一般的做法是改掉bug然后再執行,但是有時候這種先改掉bug的方式不太可行;也許是因為bug是在第三方的lib里邊,它的原代碼不存在等等。并且,很多時候,忽略一些記錄不處理也是可以接受的,比如,在一個大數據集上進行統計分析的時候,就可以忽略有問題的少量記錄。我們提供了一種執行模式,在這種執行模式下,MapReduce會檢測到哪些記錄會導致確定的crash,并且跳過這些記錄不處理,使得整個處理能繼續進行。
每一個worker處理進程都有一個signal handler,可以捕獲內存段異常和總線錯誤。在執行用戶map或者reduce操作之前,MapReduce函數庫通過全局變量保存記錄序號。如果用戶代碼產生了這個信號,signal handler于是用”最后一口氣”通過UDP包向master發送上次處理的最后一條記錄的序號。當master看到在這個特定記錄上,有不止一個失效的時候,他就標志著條記錄需要被跳過,,并且在下次重新執行相關的Map或者Reduce任務的時候跳過這條記錄。
4.7 本地執行
?
因為實際執行操作時分布在系統中執行的,通常是在好幾千臺計算機上執行得,并且是由master機器進行動態調度的任務,所以對map和reduce函數的調試就比較麻煩。為了能夠讓調試方便,profiling和小規模測試,我們開發了一套MapReduce的本地實現,也就是說,MapReduce函數庫在本地機器上順序執行所有的MapReduce操作。用戶可以控制執行,這樣計算可以限制到特定的map任務上。用戶可以通過設定特別的標志來執行他們的程序,同時也可以很容易的使用調試和測試工具(比如gdb)等等。
4.8 狀態信息
?
master內部有一個HTTP服務器,并且可以輸出狀態報告。狀態頁提供了計算的進度報告,比如有多少任務已經完成,有多少任務正在處理,輸入的字節數,中間數據的字節數,輸出的字節數,處理百分比,等等。這些頁面也包括了指向每個任務輸出的標準錯誤和輸出的標準文件的連接。用戶可以根據這些數據來預測計算需要大約執行多長時間,是否需要為這個計算增加額外的計算資源。這些頁面也可以用來分析為何計算執行的會比預期的慢。
此外,最上層的狀態頁面也顯示了哪些worker失效了,以及他們失效的時候上面運行的map和reduce任務。這些信息對于調試用戶代碼中的bug很有幫助。
4.9 計數器
?
MapReduce函數庫提供了用于統計不同事件發生次數的計數器。比如,用戶可能想統計所有已經索引的German文檔數量或者已經處理了多少單詞的數量,等等。
為了使用這樣的特性,用戶代碼創建一個叫做counter的對象,并且在map和reduce函數中在適當的時候增加counter的值。例如:
?
Counter* uppercase;
uppercase = GetCounter("uppercase");
?
map(String name, String contents):
?????? for each word w in contents:
????????????? if (IsCapitalized(w)):
???????????????????? uppercase->Increment();
????????????? EmitIntermediate(w, "1");
?
?
?
這些counter的值,會定時從各個單獨的worker機器上傳遞給master(通過ping的應答包傳遞)。master把執行成功的map或者reduce任務的counter值進行累計,并且當MapReduce操作完成之后,返回給用戶代碼。當前counter值也會顯示在master的狀態頁面,這樣人可以看到計算現場的進度。當累計counter的值的時候,master會檢查是否有對同一個map或者reduce任務的相同累計,避免累計重復。(backup任務或者機器失效導致的重新執行map任務或者reduce任務或導致這個counter重復執行,所以需要檢查,避免master進行重復統計)。
部分計數器的值是由MapReduce函數庫進行自動維持的,比如已經處理的輸入的key/value對的數量,或者輸出的key/value鍵值對等等。
counter特性對于MapReduce操作的完整性檢查非常有用。比如,在某些MapReduce操作中,用戶程序需要確保輸出的鍵值對精確的等于處理的輸入鍵值對,或者處理得German文檔數量是在處理的整個文檔數量中屬于合理范圍內。
5 性能
在本節,我們用在一個大型集群上運行的兩個計算來衡量MapReduce的性能。一個計算用來在一個大概1TB的數據中查找特定的匹配串。另一個計算排序大概1TB的數據。
這兩個程序代表了大量的用MapReduce實現的真實的程序的主要類型-一類是對數據進行洗牌,另一類是從海量數據集中抽取少部分的關心的數據。
5.1 集群配置
?
所有這些程序都是運行在一個大約有1800臺機器的集群上。每臺機器配置2個2G Intel Xeon支持超線程的處理器, 4GB內存,兩個160GBIDE硬盤,一個千兆網卡。這些機器部署在一個由兩層的,樹形交換網絡中,在最上層大概有100-200G的聚合貸款。所有這些機器都有相同的部署(對等部署),因此任意兩點之間的來回時間小于1毫秒。
在4GB內存里,大概有1-1.5G用于運行在集群上的其他任務。這個程序是在周末下午執行的,這時候的CPU,磁盤和網絡基本上屬于空閑狀態。
?
5.2 GREP
?
grep程序需要掃描大概10的10次方個由100個字節組成的記錄,查找比較少見的3個字符的查找串(這個查找串在92,337個記錄中存在)。輸入的記錄被拆分成大約64M一個的塊(M=15000),整個輸出方在一個文件中(R=1)。
?
?
?
圖2表示了這個程序隨時間的處理過程。Y軸是輸入數據的處理速度。處理速度逐漸隨著參與MapReduce計算的機器增加而增加,當1764臺worker開始工作的時候,達到了30G/s的速度。當map任務結束的時候,在計算開始后80秒,輸入的速度降到0。整個計算過程從開始到結束一共花了大概150秒。這包括了大約一分鐘的開頭啟動部分。開頭的部分是用來把這個程序傳播到各個worker機器上的時間,并且等待GFS系統打開100個輸入文件集合并且獲得相關的文件位置優化信息。
?
5.3 SORT排序
?
SORT程序排序10的10次方個100個字節組成的記錄(大概1TB的數據)。這個程序是仿制TeraSort benchmark[10]的。
sort程序是由不到50行用戶代碼組成。三行的map函數從文本行中解出10個字節的排序key,并且把這個key和原始行作為中間結果key/value鍵值對輸出。我們使用了一個內嵌的identitiy函數作為reduce的操作。這個函數把中間結果key/value鍵值對不變的作為輸出的key/value鍵值對。最終排序輸出寫到一個兩路復制的GFS文件中(就是說,程序的輸出會寫2TB的數據)。
就像前邊講的,輸入數據分成64MB每塊(M=15000)。我們把排序后的輸出分區成為4000個文件(R=4000)。分區函數使用key的原始字節來吧數據分區到R個小塊中。
我們這個benchmark中的分區函數自身知道key的分區情況。通常對于排序程序來說,我們會增加一個預處理的MapReduce操作,這個操作用于采樣key的情況,并且用這個采樣的key的分布情況來計算對最終排序處理得分區點。
?
?
?
圖三是這個排序程序的正常執行過程。左上的圖表示了輸入數據讀取的速度。數據讀取速度會達到13G/s,并且在不到200秒所有map任務完成之后迅速滑落到0。我們注意到數據讀取速度小于grep粒子。這是因為排序map任務劃了大概一半時間和I/O帶寬寫入中間輸出到本地硬盤。相對應的grep中間結果輸出幾乎可以忽略不計。
左邊中間的圖是map任務把中間數據發送到reduce任務的網絡速度。這個排序過程自從第一個任務完成之后就開始了。圖示上的第一個高峰是啟動了第一批大概1700個reduce任務(整個MapReduce分布到大概1700臺機器上,每臺機器一次大概執行1個reduce任務)。大概計算開始300秒以后,這些第一批reduce任務完成了,并且我們開始執行剩下的reduce任務。所有這些排序任務會在計算開始后大概600秒結束。
左下的圖表示reduce任務把排序后的數據寫到最終的輸出文件的速度。在第一個排序期結束后到寫盤開始之前有一個小延時,這是因為機器正在忙于內部排序中間數據。寫盤速度持續大概2-4G/s。在計算開始后大概850秒左右寫盤完成。包括啟動部分,整個計算用了891秒。這個和TeraSort benchmark[18]的最高紀錄1057秒差不多。
需要注意的事情是:輸入速度要比排序速度和輸出速度快,這是因為我們本地化的優化策略,絕大部分數據都是從本地硬盤讀取而上去了我們相關的網絡消耗。排序速度比輸出速度快,這是因為輸出階段寫了兩份排序后的速度(我們寫兩份的原因是為了可靠性可可用性的原因)。我們寫兩份的原因是因為底層文件系統的可靠性和可用性的要求。如果底層文件系統用類似容錯編碼[14](erasure coding)的方式,而不采用復制寫的方式,在寫盤階段可以降低網絡帶寬的要求。
5.4 高效的backup任務
?
在圖三(b),是我們在關閉掉backup任務的時候,sort程序的執行情況。執行流和上邊講述的圖3(a)很類似,但是這個關閉掉backup任務的時候,執行的尾巴很長,并且執行的尾巴沒有什么有效的寫盤動作。在960秒以后,除了5個reduce以外,其他reduce任務都已經完成。不過這些拖后腿的任務又執行了300秒才完成。整個計算化了1283秒,多了44%的執行時間。
?
?
5.5 失效的機器
?
在圖三(c)中,我們演示了在sort程序執行過程中故意暫時殺掉1746個worker中的200個worker進程的執行情況。底層的集群調度立刻在這些機器上重新創建了新的worker處理(因為我們只是把這些機器上的處理進程殺掉,而機器依舊是可以操作的)。
因為已經完成的map work丟失了(由于相關的map worker被殺掉了),需要重新再作,所以worker死掉會導致一個負數的輸入速率。相關map任務的重新執行很快就重新執行了。整個計算過程在933秒內完成,包括了前邊的啟動時間(只比正常執行時間多了5%的時間)。
6 經驗
我們在2003年1月寫了第一個版本的MapReduce函數庫,并且在2003年8月作了顯著的增強,包括了本地優化,worker機器之間的動態負載均衡等等。自那以后,MapReduce函數庫就廣泛用于我們日常處理的問題。它現在在Google內部各個領域內廣泛應用,包括:
?
。大尺度的計算機學習問題。
。Google News和Froogle產品的集群問題。
。從公眾查詢產品(比如Google的Zeitgeist)的報告中抽取數據。
。從web網頁作新試驗和抽取新的產品(例如,從大量的webpage中的本地查找抽取物理位置信息)。
。大尺度的圖型計算。
?
?
?
?
?
| 任務數 平均任務完成時間 使用的機器時間 | 29423 634秒 79,186天 |
| 讀取的輸入數據 產生的中間數據 寫出的輸出數據 | 3,288TB 758TB 193TB |
| 每個job平均worker機器數 每個job平均死掉work數 每個job平均map任務 每個job平均reduce任務 | 157 1.2 3,351 55 |
| map唯一實現 reduce的唯一實現 map/reduce的combiner實現 | 395 296 426 |
表1:MapReduce2004年8月的執行情況
?
圖四顯示了我們的源代碼管理系統中,隨著時間推移,MapReduce程序的顯著增加,從2003年早先時候的0個增長到2004年9月份的差不多900個不同的程序。MapReduce之所以這樣成功是因為他能夠在不到半小時時間內寫出一個簡單的能夠應用于上千臺機器的大規模并發程序,并且極大的提高了開發和原形設計的周期效率。并且,他可以讓一個完全沒有分布式和/或并行系統經驗的程序員,能夠很容易的開發處理海量數據的程序。
在每一個任務結束的時候,MapReduce函數庫記錄使用的計算資源的狀態。在表1,我們列出了2004年8月份MapReduce運行的任務所占用的相關資源。
6.1 大尺度的索引
?
到目前為止,最成功的MapReduce的應用就是重寫了Google web 搜索服務所使用到的index系統。索引系統處理蠕蟲系統抓回來的超大量的數據,這些數據保存在GFS文件里。普通這些文檔的大小是超過了20TB的數據。索引程序是通過一系列的,大概5到10次MapReduce操作來建立索引。通過利用MapReduce(替換掉上一個版本的特別設計的分布處理的索引程序版本)有這樣一些好處:
?
l? 索引代碼很簡單,很小,很容易理解。因為對于容錯的處理代碼,分布以及并行處理代碼都通過MapReduce函數庫封裝了,所以索引代碼很簡單,很小,很容易理解。例如,當使用MapReduce函數庫的時候,計算的代碼行數從原來的3800行C++代碼一下減少到大概700行代碼。
l? MapReduce的函數庫的性能已經非常好,所以我們可以把概念上不相關的計算步驟分開處理,而不是混在一起以期減少處理次數。這使得我們容易改變索引處理方式。比如,我們對老索引系統的一個小更改可能要好幾個月的時間,但是在新系統內,只需要花幾天時間就可以了。
l? 索引系統的操作更容易了,這是因為機器的失效,速度慢的機器,以及網絡風暴都已經由MapReduce自己解決了,而不需要操作人員的交互。此外,我們可以簡單的通過對索引系統增加機器的方式提高處理性能。
?
7 相關工作
?
很多系統都提供了嚴格的編程模式,并且通過對編程的嚴格限制來實現自動的并行計算。例如,一個結合函數可以在一個N個元素的所有前綴上進行計算,并且使用并發前綴計算,會在在N個并發節點上會耗費log N的時間[6,9,13]。MapReduce是這些模式下的,一個我們基于超大系統的現實經驗的一個簡化和精煉。并且,我們還提供了基于上千臺處理器的容錯實現。而大部分并發處理系統都只在小規模的尺度上實現,并且機器的容錯還是程序員來操心的。
Bulk Synchronous Programming[17]以及一些MPI primitives[11]提供了更高級別的抽象,可以更容易寫出并行處理的程序。這些系統和MapReduce系統的不同之處在于,MapReduce是通過限制性編程模式自動實現用戶程序的并發處理,并且提供了透明的容錯處理。
我們本地的優化策略是受active disks[12,15]等技術的影響的,在active disks中,計算任務是盡量推送到數據在本地磁盤的節點處理,這樣就減少了網絡系統的I/O吞吐。我們是在直接附帶幾個硬盤的通機器上執行我們的計算工作,不是在磁盤處理器上執行我們的工作,但是總的效果是一樣的。
我們的backup task機制和早先CharlotteSystem[3]的機制比較類似。早先的簡單調度的一個缺點是如果一個任務導致反復失效,那么整個計算就不能完成。我們通過在故障情況下跳過故障記錄的方式,在某種程度上解決了這個問題。
MapReduce的實現依賴于一個內部的集群管理系統,這個集群管理系統負責在一個超大共享機器組上分布和運行用戶任務。雖然這個不是本論文的重點,集群管理系統在理念上和Condor[16]等其他系統一樣。
MapReduce函數庫的排序部分和NOW-Sort[1]的操作上很類似。源機器(map workers)把待排序的數據進行分區,并且發送到R個reduce worker中的一個進行處理。每一個reduce worker作本地排序(盡可能在內存排序)。當然NOW-Sort沒有刻意用戶定義的Map和Reduce函數,而我們的函數庫有,所以我們的函數庫可以有很高的適應性。
River[2]提供了一個編程模式,在這樣的編程模式下,處理進程可以通過分布式查詢來互相傳送數據的方式進行通訊。和MapReduce類似,River系統嘗試提供對不同應用有近似平均的性能,即使在不對等的硬件環境下或者在系統顛簸的情況下也能提供近似平均的性能。River是通過精心調度硬盤和網絡的通訊,來平衡任務的完成時間。MapReduce的框架是通過限制性編程模式,來把問題分解成為大量的任務。每一個任務都是動態調度到可用的worker上執行,這樣快速的worker可以執行更多的任務。限制性編程模式同樣允許我們在接近計算完成的時候調度backup 任務,在出現處理不均勻的情況下,大量的縮小整個完成的時間(比如在有慢機或者阻塞的worker的時候)。
BAD-FS[5]和MapReduce的編程模式完全不同,它不像MapReduce是基于很大的網絡計算的。不過,這兩個系統有兩個基本原理很類似。(1)兩個系統都使用重復執行來防止由于失效導致的數據丟失。(2)兩個都使用數據本地化調度策略,使得處理盡可能在本地數據上進行,減少通過網絡通訊的數據量。
TACC[7]是一個用于簡單構造高可用性網絡服務的系統。就像MapReduce,它依靠重新執行機制來實現的容錯處理。
8 結束語
MapReduce的編程模式在Google成功應用于許多方面。我們把這種成功應用歸結為幾個方面:首先,這個編程模式易于使用,即使程序員沒有并行或者分布式系統經驗,由于MapReduce封裝了并行的細節和容錯處理,本地化計算,負載均衡等等,所以,使得編程非常容易。其次,大量不同的問題都可以簡單通過MapReduce來解決。例如,MapReduce用于產生Google的web搜索服務所需要的數據,用來排序,用來數據挖掘,用于機器智能學習,以及很多其他系統。第三,我們已經在一個好幾千臺計算機的大型集群上開發實現了這個MapReduce。這個實現使得對于這些機器資源的利用非常簡單,并且因此也適用于解決Google遇到的其他很多需要大量計算的問題。
我們也從MapReduce上學到了不少內容。首先,先執行編程模式使得并行和分布式計算非常容易,并且也易于構造這樣的容錯計算環境。其次,網絡帶寬是系統的資源的瓶頸。我們系統的一系列優化都使因此針對減少網絡傳輸量為目的的:本地優化使得我們讀取數據時,是從本地磁盤讀取的,并且寫出單個中間數據文件到本地磁盤也節約了網絡帶寬。第三,冗余執行可以減少慢機器帶來的影響,并且解決由于機器失效導致的數據丟失問題。
?
9 感謝
Josh Levenberg校定和擴展了用戶級別的MapReduce API,并且結合他的適用經驗和其他人的改進建議,增加了很多新的功能。MapReduce使用Google文件系統GFS[8]來作為數據和輸出。我們還感謝Percy Liang Olcan Sercinoglu 在開發用于MapReduce的集群管理系統得工作。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach 為本論文提出了寶貴的意見。OSDI的無名審閱者,以及我們的審核者Eric Brewer,在論文應當如何改進方面給出了有益的意見。最后,我們感謝Google的工程部的所有MapReduce的用戶,感謝他們提供了有用的反饋,以及建議,以及錯誤報告等等。
?
10 參考資料
[1] ??????? Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.
[2] ??????? Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10.22, Atlanta, Georgia, May 1999.
[3] ??????? Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.
[5]????????? John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] ??????? Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] ??????? Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.
[8] ??????? Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12
[9] ??????? S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10]??????? Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] ????? William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] ????? L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] ????? Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.
[15] ????? Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.
[16] ????? Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] ????? L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] ????? Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
?
A 單詞頻率統計
本節包含了一個完整的程序,用于統計在一組命令行指定的輸入文件中,每一個不同的單詞出現頻率。
#include "mapreduce/mapreduce.h"
?
// User's map function
class WordCounter : public Mapper {
?????? public:
????????????? virtual void Map(const MapInput& input) {
???????????????????? const string& text = input.value();
???????????????????? const int n = text.size();
???????????????????? for (int i = 0; i < n; ) {
??????????????????????????? // Skip past leading whitespace
??????????????????????????? while ((i < n) && isspace(text[i]))
?????????????????????????????????? i++;
?
???????????????????? // Find word end
???????????????????? int start = i;
???????????????????? while ((i < n) && !isspace(text[i]))
??????????????????????????? i++;
???????????????????? if (start < i)
??????????????????????????? Emit(text.substr(start,i-start),"1");
????????????? }
?????? }
};
?
REGISTER_MAPPER(WordCounter);
?
// User's reduce function
class Adder : public Reducer {
?????? virtual void Reduce(ReduceInput* input) {
????????????? // Iterate over all entries with the
????????????? // same key and add the values
????????????? int64 value = 0;
????????????? while (!input->done()) {
???????????????????? value += StringToInt(input->value());
???????????????????? input->NextValue();
????????????? }
?
????????????? // Emit sum for input->key()
????????????? Emit(IntToString(value));
?????? }
};
?
REGISTER_REDUCER(Adder);
?
int main(int argc, char** argv) {
?????? ParseCommandLineFlags(argc, argv);
??????
?????? MapReduceSpecification spec;
??????
?????? // Store list of input files into "spec"
?????? for (int i = 1; i < argc; i++) {
????????????? MapReduceInput* input = spec.add_input();
????????????? input->set_format("text");
????????????? input->set_filepattern(argv[i]);
????????????? input->set_mapper_class("WordCounter");
?????? }
?
?????? // Specify the output files:
?????? // /gfs/test/freq-00000-of-00100
?????? // /gfs/test/freq-00001-of-00100
?????? // ...
?????? MapReduceOutput* out = spec.output();
?????? out->set_filebase("/gfs/test/freq");
?????? out->set_num_tasks(100);
?????? out->set_format("text");
?????? out->set_reducer_class("Adder");
??????
?????? // Optional: do partial sums within map
?????? // tasks to save network bandwidth
?????? out->set_combiner_class("Adder");
?
?????? // Tuning parameters: use at most 2000
?????? // machines and 100 MB of memory per task
?????? spec.set_machines(2000);
?????? spec.set_map_megabytes(100);
?????? spec.set_reduce_megabytes(100);
??????
?????? // Now run it
?????? MapReduceResult result;
?????? if (!MapReduce(spec, &result)) abort();
??????
?????? // Done: 'result' structure contains info
?????? // about counters, time taken, number of
?????? // machines used, etc.
?????? return 0;
}
?
?
?
?
?
?
?
?
B 譯者
?
崮山路上走9遍2005-8-8于大連完稿
BLOG: sharp838.mblogger.cn
EMAIL: sharp838@21cn.com;guangweishi@gmail.com
?
所有的版權歸于原作者。
?
感謝:朱朱,洋洋,sophia
?
?
?
?
?
?
轉載于:https://www.cnblogs.com/ww-worm/p/3331001.html
總結
以上是生活随笔為你收集整理的MapReduce:Simplified Data Processing on Large Clusters中文版from百度文库的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Win8 .NET Framework
- 下一篇: Linux系统安装VM-Tools