第五章-分布式并行编程框架MapReduce
第五章-分布式并行編程框架MapReduce
文章目錄
- 第五章-分布式并行編程框架MapReduce
- MapReduce概述
- 分布式并行編程
- MapReduce模型和函數
- MapReduce體系結構
- MapReduce工作流程
- 工作流程概述
- 各個執行階段
- shuffle過程
- 實例分析:WordCount
- MapReduce的具體應用
- MapReduce編程實踐
MapReduce概述
分布式并行編程
過去很長一段時間,CPU的性能都遵循“摩爾定律”:【當價格不變時,集成電路上可容納的元器件的數目,約每隔18個月便會增加一倍,性能也將提升一倍】。從2005年開始摩爾定律逐漸失效,需要處理的數據量快速增加,人們開始借助于分布式并行編程來提高程序性能。
分布式并行程序運行在大規模計算機集群上,可以并行執行大規模數據處理任務,從而獲得海量的計算能力。同時通過向集群中增加新的計算節點,就能很容易地實現集群計算能力的擴充。
谷歌公司最先提出了分布式并行編程模型 MapReduce,Hadoop MapReduce是它的開源實現 。谷歌的 MapReduce運行在分布式文件系統 GFS上,Hadoop MapReduce運行在分布式文件系統 HDFS上。相對而言,Hadoop MapReduce要比谷歌 MapReduce的使用門檻低很多,程序員即使沒有任何分布式編程開發經驗,也可以很輕松地開發出分布式程序部署到計算機集群上。
| 傳統并行編程框架 | 通常采用共享式架構(共享內存、共享存儲),底層通常采用統一的存儲區域網絡SAN | 容錯性差,其中一個硬件發生故障容易導致整個集群不可工作 | 通常采用刀片服務器,高速網絡以及共享存儲區域網絡 SAN,價格高,擴展性差 | 編程難度大,需要解決做什么和怎么做的問題,編程原理和多線程編程邏輯類似,需要借助互斥量、信號量、鎖等機制,實現不同任務之間的同步和通信 | 適用于實時、細粒度計算,尤其適用于計算密集型的應用 |
| MapReduce | 采用典型的非共享式架構 | 容錯性好,在整個集群中每個節點都有自己的內存和存儲,任何一個節點出現問題不會影響其他節點正常運行,同時系統中設計了冗余和容錯機制 | 整個集群可以隨意增加或減少相關的計算節點,普通PC機就可以實現,價格低廉,擴展性好 | 編程簡單,只需要告訴系統要解決什么問題,系統自動實現分布式部署,屏蔽分布式同步、通信、負載均衡、失敗恢復等底層細節 | 一般適用于非實時的批處理及數據密集型應用 |
MapReduce模型和函數
MapReduce將復雜的、運行于大規模集群上的并行計算過程高度地抽象到了兩個函數:Map和Reduce。
MapReduce采用“分而治之”策略,一個存儲在分布式文件系統中的大規模數據集,會被切分成許多獨立的分片(split),這些分片可以被多個 Map任務并行處理。MapReduce框架會為每個 Map任務輸入一個數據子集,Map任務生成的結果會繼續作為 Reduce任務的輸入,最終由 Reduce任務輸出最后結果,并寫入分布式文件系統。
這里要特別強調一下,適合用 MapReduce來處理的數據集需要滿足一個前提條件:待處理的數據集可以分解成許多個小的數據集,而且每一個小數據集都可以完全并行地進行處理。
MapReduce設計的一個理念就是“計算向數據靠攏”,而不是“數據向計算靠攏”,因為,移動數據需要大量的網絡傳輸開銷,在大規模數據環境下開銷更為驚人。所以,移動計算要比移動數據更加經濟。
MapReduce框架采用了Master/Slave架構,包括一個 Master和若干個Slave。Master上運行JobTracker, Slave上運行TaskTracker。
Map函數和 Reduce函數都是以<key, value>作為輸入,按一定的映射規則轉換成另一個或一批<key, value>進行輸出。
| Map | <k1,v1> | List(<k2,v2>) | 將小數據集(split)進一步解析成一批<key,value>對,輸入 Map函數中進行處理。每一個輸入的<k1,v1>會輸出一批<k2,v2>,<k2,v2>是計算的中間結果 |
| Reduce | <k2,List(v2)> | <k3,v3> | 輸入的中間結果<k2,List(v2)>中的 List(v2)表示是一批屬于同一個 k2的 value |
- Map函數將輸入的元素轉換成<key,value>形式的鍵值對,鍵和值的類型也是任意的。
- Reduce函數將輸入的一系列具有相同鍵的鍵值對以某種方式組合起來,輸出處理后的鍵值對,輸出結果合并為一個文件。
MapReduce體系結構
MapReduce體系結構主要由四個部分組成,分別是: Client、JobTracker、TaskTracker以及 Task。
Client:
- 用戶編寫的 MapReduce程序通過 Client提交到 JobTracker端
- 用戶可通過 Client提供的一些接口查看作業運行狀態
JobTracker:
- 負責資源監控和作業調度
- 監控所有 TaskTracker與 Job的健康狀況,一旦發現失敗,就將 相應的任務轉移到其他節點
- 會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,選擇合適的任務去使用這些資源
- 調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器
TaskTracker:
- 接收 JobTracker 發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等)
- TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況 和任務的運行進度匯報給 JobTracker
- TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、內 存等)。一個 Task 獲取到一個 slot 后才有機會運行,而 Hadoop調度器的作用就是將各個 TaskTracker上的空閑 slot分配給 Task使用。 slot 分為 Map slot 和 Reduce slot 兩種,分別供 MapTask 和 Reduce Task 使用
Task:
- Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
MapReduce工作流程
工作流程概述
MapReduce的核心思想是“分而治之”。一個大的 MapReduce作業,首先會被拆分成許多個 Map任務在多臺機器上并行處理,每個 Map任務通常運行在數據存儲的節點上。當 Map任務結束后,會生成以<key,value>形式表示的中間結果,這些中間結果會被分發到多個 Reduce任務在多臺機器上并行執行,具有相同 key的<key,value>會被發送到同一個 Reduce任務那里。Reduce任務會被中間結果進行匯總計算得到最后結果,并輸出到分布式文件系統中。
- 不同的 Map任務之間不會進行通信,不同的 Reduce任務之間也不會發生任何信息交換
- 只有當 Map任務全部結束后,Reduce過程才能開始
- 用戶不能顯式地從一臺機器向另一臺機器發送消息
- 所有的數據交換都是通過 MapReduce框架自身去實現的
- Map任務的輸入文件、Reduce任務的處理結果都是保存在分布式文件系統中,而 Map任務處理的中間結果保存在本地磁盤中。
各個執行階段
MapReduce算法的執行過程:
shuffle過程
Shuffle:是指對 Map輸出的結果進行分區、排序、合并、歸并等處理并交給 Reduce的過程,分為 Map端的操作和 Reduce端的操作。
|
|
實例分析:WordCount
WordCount程序任務
| 輸入 | 一個包含大量單詞的文本文件 |
| 輸出 | 文件中每個單詞及其出現次數(頻數),并按照單詞 字母順序排序,每個單詞和其頻數占一行,單詞和頻 數之間有間隔 |
一個WordCount執行過程的實例
MapReduce的具體應用
MapReduce可以很好地應用于各種計算問題:
- 關系代數運算(選擇、投影、并、交、差、連接)
- 分組與聚合運算
- 矩陣-向量乘法
- 矩陣乘法
在 MapReduce環境下執行兩個關系的連接操作的方法如下:
假設關系 R(A,B),S(B,C)都存儲在一個文件中,為了連接這些關系,必須把來自每個關系的各個元組都和一個鍵關聯,這個鍵就是屬性 B的值。可以使用 Map過程把來自 R的每個元組<a,b>轉換成一個鍵值對<b,<R,a>>,其中的鍵就是 b,值就是<R,a>。注意,這里把關系 R包含在值中,這樣做可以使得我們在 Reduce階段只把那些來自 R的元組和來自 S的元組進行匹配。
類似地,使用 Map過程把來自 S的每個元組<b,c>轉換成一個鍵值對<b,<S,c>>,鍵是 b,值是<S,c>。Reduce進程的任務就是,把來自關系 R和 S的具有共同屬性 B值的元組進行合并。這樣,所有具有特定 B值的元組必須被發送到同一個 Reduce進程。
MapReduce編程實踐
任務要求:用 MapReduce實現對輸入文件中的單詞做詞頻統計
實踐一共分為四步:
1.編寫 Map處理邏輯
public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}} }2.編寫 Reduce處理邏輯
public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);} }3.編寫 Main函數
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1); }編譯打包代碼請參考另一篇博客 簡單的MapReduce實踐
完整代碼:
import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }總結
以上是生活随笔為你收集整理的第五章-分布式并行编程框架MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 简单的MongoDB实践
- 下一篇: 决策树分类实验