分布式离线计算—MapReduce—基本原理
原文作者:黎先生
原文地址:MapReduce基本原理及應用
目錄
一、MapReduce模型簡介
1. Map和Reduce函數
2. MapReduce體系結構
3. MapReduce工作流程
4. MapReduce應用程序執行過程
二 、WordCount運行實例
1.?WordCount的Map過程
2. WordCount的Reduce過程
3. WordCount源碼
一、MapReduce模型簡介
MapReduce將復雜的、運行于大規模集群上的并行計算過程高度地抽象到了兩個函數:Map和Reduce。它采用“分而治之”策略,一個存儲在分布式文件系統中的大規模數據集,會被切分成許多獨立的分片(split),這些分片可以被多個Map任務并行處理
1. Map和Reduce函數
Map和Reduce2. MapReduce體系結構
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task
1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端 用戶可通過Client提供的一些接口查看作業運行狀態
2)JobTracker
JobTracker負責資源監控和作業調度 JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點 JobTracker 會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,
選擇合適的任務去使用這些資源
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、內存等)。一個Task 獲取到
一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
3. MapReduce工作流程
1) 工作流程概述
- 不同的Map任務之間不會進行通信
- 不同的Reduce任務之間也不會發生任何信息交換
- 用戶不能顯式地從一臺機器向另一臺機器發送消息
- 所有的數據交換都是通過MapReduce框架自身去實現的
2) MapReduce各個執行階段
4. MapReduce應用程序執行過程
二 、WordCount運行實例
工作流程是Input從HDFS里面并行讀取文本中的內容,經過MapReduce模型,最終把分析出來的結果用Output封裝,持久化到HDFS中
1.?WordCount的Map過程
使用三個Map任務并行讀取三行文件中的內容,對讀取的單詞進行map操作,每個單詞都以<key, value>形式生成
Map端源碼:
public class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } }?
2. WordCount的Reduce過程
Reduce操作是對Map的結果進行排序、合并等操作最后得出詞頻
Reduce端源碼
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } }3. WordCount源碼
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } } public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordMapper.class); job.setCombinerClass(WordReducer.class); job.setReducerClass(WordReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }總結
以上是生活随笔為你收集整理的分布式离线计算—MapReduce—基本原理的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 分布式离线计算—MapReduce—为什
- 下一篇: 文件系统是什么?
