Hadoop入门(六)Mapreduce
一、Mapreduce概述
MapReduce是一個(gè)編程模型,用以進(jìn)行大數(shù)據(jù)量的計(jì)算
?
二、Hadoop MapReduce
(1)MapReduce是什么
Hadoop MapReduce是一個(gè)軟件框架,基于該框架能夠容易地編寫應(yīng)用程序,這些應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大集群上,并以一種可靠的,具有容錯(cuò)能力的方式并行地處理上TB級(jí)別的海量數(shù)據(jù)集
Mapreduce的特點(diǎn):
?
(2)MapReduce做什么
MapReduce的思想就是“分而治之”
1)Mapper負(fù)責(zé)“分”
把復(fù)雜的任務(wù)分解為若干個(gè)“簡(jiǎn)單的任務(wù)”來處理。“簡(jiǎn)單的任務(wù)”包含三層含義:
2)Reducer負(fù)責(zé)對(duì)map階段的結(jié)果進(jìn)行匯總
至于需要多少個(gè)Reducer,可以根據(jù)具體問題,
通過在mapred-site.xml配置文件里設(shè)置參數(shù)mapred.reduce.tasks的值,缺省值為1。
?
三、MapReduce工作機(jī)制
作業(yè)執(zhí)行涉及4個(gè)獨(dú)立的實(shí)體
mapreduce作業(yè)工作流程圖
Mapreduce作業(yè)的4個(gè)對(duì)象
?
mapreduce運(yùn)行步驟1
首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業(yè)也就是job,
接下來就是提交job了,提交job是提交到JobTracker上的,這個(gè)時(shí)候JobTracker就會(huì)構(gòu)建這個(gè)job,具體就是分配一個(gè)新的job任務(wù)的ID值
接下來它會(huì)做檢查操作,這個(gè)檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運(yùn)行下去,JobTracker會(huì)拋出錯(cuò)誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯(cuò)誤,如果存在JobTracker會(huì)根據(jù)輸入計(jì)算輸入分片(Input Split),如果分片計(jì)算不出來也會(huì)拋出錯(cuò)誤,至于輸入分片我后面會(huì)做講解的,這些都做好了JobTracker就會(huì)配置Job需要的資源了。
分配好資源后,JobTracker就會(huì)初始化作業(yè),初始化主要做的是將Job放入一個(gè)內(nèi)部的隊(duì)列,讓配置好的作業(yè)調(diào)度器能調(diào)度到這個(gè)作業(yè),作業(yè)調(diào)度器會(huì)初始化這個(gè)job,初始化就是創(chuàng)建一個(gè)正在運(yùn)行的job對(duì)象(封裝任務(wù)和記錄信息),以便JobTracker跟蹤job的狀態(tài)和進(jìn)程。
mapreduce運(yùn)行步驟2
初始化完畢后,作業(yè)調(diào)度器會(huì)獲取輸入分片信息(input split),每個(gè)分片創(chuàng)建一個(gè)map任務(wù)。
接下來就是任務(wù)分配了,這個(gè)時(shí)候tasktracker會(huì)運(yùn)行一個(gè)簡(jiǎn)單的循環(huán)機(jī)制定期發(fā)送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個(gè)時(shí)間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監(jiān)控tasktracker是否存活,也可以獲取tasktracker處理的狀態(tài)和問題,同時(shí)tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。
任務(wù)分配好后就是執(zhí)行任務(wù)了。在任務(wù)執(zhí)行時(shí)候jobtracker可以通過心跳機(jī)制監(jiān)控tasktracker的狀態(tài)和進(jìn)度,同時(shí)也能計(jì)算出整個(gè)job的狀態(tài)和進(jìn)度,而tasktracker也可以本地監(jiān)控自己的狀態(tài)和進(jìn)度。當(dāng)jobtracker獲得了最后一個(gè)完成指定任務(wù)的tasktracker操作成功的通知時(shí)候,jobtracker會(huì)把整個(gè)job狀態(tài)置為成功,然后當(dāng)客戶端查詢job運(yùn)行狀態(tài)時(shí)候(注意:這個(gè)是異步操作),客戶端會(huì)查到j(luò)ob完成的通知的。如果job中途失敗,mapreduce也會(huì)有相應(yīng)機(jī)制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯(cuò)誤處理機(jī)制都能保證提交的job能正常完成。
?
四、mapreduce運(yùn)行機(jī)制
在Hadoop中,一個(gè)MapReduce作業(yè)會(huì)把輸入的數(shù)據(jù)集切分為若干獨(dú)立的數(shù)據(jù)塊,由Map任務(wù)以完全并行的方式處理。框架會(huì)對(duì)Map的輸出先進(jìn)行排序,然后把結(jié)果輸入給Reduce任務(wù)。
作業(yè)的輸入和輸出都會(huì)被存儲(chǔ)在文件系統(tǒng)中,整個(gè)框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí)行已經(jīng)關(guān)閉的任務(wù)。MapReduce框架和分布式文件系統(tǒng)是運(yùn)行在一組相同的節(jié)點(diǎn),計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)都是在一起的
(1) MapReduce的輸入輸出
一個(gè)MapReduce作業(yè)的輸入和輸出類型: 會(huì)有三組<key,value>鍵值對(duì)類型的存在
?
(2)Mapreduce作業(yè)的處理流程
?
按照時(shí)間順序包括:
輸入分片(input split)
map階段
shuffle階段:map shuffle(partition、sort/group、combiner、partition? merge)和reduce?shuffle(copy、merge(sort/group))
reduce階段
1)輸入分片(input split)
在進(jìn)行map計(jì)算之前,mapreduce會(huì)根據(jù)輸入文件計(jì)算輸入分片(input split),每個(gè)輸入分片(input split)針對(duì)一個(gè)map任務(wù)
輸入分片(input split)存儲(chǔ)的并非數(shù)據(jù)本身,而是一個(gè)分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組,輸入分片(input split)往往和hdfs的block(塊)關(guān)系很密切
?? ?假如我們?cè)O(shè)定hdfs的塊的大小是64mb,如果我們輸入有三個(gè)文件,大小分別是3mb、65mb和127mb,那么mapreduce會(huì)把3mb文件分為一個(gè)輸入分片(input split),65mb則是兩個(gè)輸入分片(input split)而127mb也是兩個(gè)輸入分片(input split)
? ? 即我們?nèi)绻趍ap計(jì)算前做輸入分片調(diào)整,例如合并小文件,那么就會(huì)有5個(gè)map任務(wù)將執(zhí)行,而且每個(gè)map執(zhí)行的數(shù)據(jù)大小不均,這個(gè)也是mapreduce優(yōu)化計(jì)算的一個(gè)關(guān)鍵點(diǎn)。
2)map階段
程序員編寫好的map函數(shù)了,因此map函數(shù)效率相對(duì)好控制,而且一般map操作都是本地化操作也就是在數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)上進(jìn)行;
3)combiner階段
combiner階段是程序員可以選擇的,combiner其實(shí)也是一種reduce操作,因此我們看見WordCount類里是用reduce進(jìn)行加載的。
Combiner是一個(gè)本地化的reduce操作,它是map運(yùn)算的后續(xù)操作,主要是在map計(jì)算出中間文件前做一個(gè)簡(jiǎn)單的合并重復(fù)key值的操作,例如我們對(duì)文件里的單詞頻率做統(tǒng)計(jì),map計(jì)算時(shí)候如果碰到一個(gè)hadoop的單詞就會(huì)記錄為1,但是這篇文章里hadoop可能會(huì)出現(xiàn)n多次,那么map輸出文件冗余就會(huì)很多,因此在reduce計(jì)算前對(duì)相同的key做一個(gè)合并操作,那么文件會(huì)變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計(jì)算力寬帶資源往往是計(jì)算的瓶頸也是最為寶貴的資源,但是combiner操作是有風(fēng)險(xiǎn)的,使用它的原則是combiner的輸入不會(huì)影響到reduce計(jì)算的最終輸入,
例如:
如果計(jì)算只是求總數(shù),最大值,最小值可以使用combiner,但是做平均值計(jì)算使用combiner的話,最終的reduce計(jì)算結(jié)果就會(huì)出錯(cuò)。
4)shuffle階段
將map的輸出作為reduce的輸入的過程就是shuffle了。
5)reduce階段
和map函數(shù)一樣也是程序員編寫的,最終結(jié)果是存儲(chǔ)在hdfs上的。
?
五、Mapreduce框架的相關(guān)問題
jobtracker的單點(diǎn)故障
jobtracker和hdfs的namenode一樣也存在單點(diǎn)故障,單點(diǎn)故障一直是hadoop被人詬病的大問題,為什么hadoop的做的文件系統(tǒng)和mapreduce計(jì)算框架都是高容錯(cuò)的,但是最重要的管理節(jié)點(diǎn)的故障機(jī)制卻如此不好,我認(rèn)為主要是namenode和jobtracker在實(shí)際運(yùn)行中都是在內(nèi)存操作,而做到內(nèi)存的容錯(cuò)就比較復(fù)雜了,只有當(dāng)內(nèi)存數(shù)據(jù)被持久化后容錯(cuò)才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個(gè)持久化都會(huì)有延遲,因此真的出故障,任然不能整體恢復(fù),另外hadoop框架里包含zookeeper框架,zookeeper可以結(jié)合jobtracker,用幾臺(tái)機(jī)器同時(shí)部署jobtracker,保證一臺(tái)出故障,有一臺(tái)馬上能補(bǔ)充上,不過這種方式也沒法恢復(fù)正在跑的mapreduce任務(wù)。
?
六、Mapreduce的單詞計(jì)數(shù)實(shí)例
public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//…?} }(1)Map
?public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{??private final static IntWritable one = new IntWritable(1);private Text word = new Text();???????public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}map的方法
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
這里有三個(gè)參數(shù),前面兩個(gè)Object key, Text value就是輸入的key和value,第三個(gè)參數(shù)Context context這是可以記錄輸入的key和value
例如:context.write(word, one);此外context還會(huì)記錄map運(yùn)算的狀態(tài)。
(2)reduce
?public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}reduce函數(shù)的方法
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}
reduce函數(shù)的輸入也是一個(gè)key/value的形式,
不過它的value是一個(gè)迭代器的形式Iterable<IntWritable> values,
也就是說reduce的輸入是一個(gè)key對(duì)應(yīng)一組的值的value,reduce也有context和map的context作用一致。
(3)main函數(shù)
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} Configuration conf = new Configuration();運(yùn)行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統(tǒng)配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時(shí)候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個(gè)是沒有深入思考mapreduce計(jì)算框架造成,我們程序員開發(fā)mapreduce時(shí)候只是在填空,在map函數(shù)和reduce函數(shù)里編寫實(shí)際進(jìn)行的業(yè)務(wù)邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。
? ? String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}If的語(yǔ)句好理解,就是運(yùn)行WordCount程序時(shí)候一定是兩個(gè)參數(shù),如果不是就會(huì)報(bào)錯(cuò)退出。至于第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,并根據(jù)需要為Configuration對(duì)象設(shè)置相應(yīng)的值,其實(shí)平時(shí)開發(fā)里我們不太常用它,而是讓類實(shí)現(xiàn)Tool接口,然后再main函數(shù)里使用ToolRunner運(yùn)行程序,而ToolRunner內(nèi)部會(huì)調(diào)用GenericOptionsParser。
? ? Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);第一行就是在構(gòu)建一個(gè)job,在mapreduce框架里一個(gè)mapreduce任務(wù)也叫mapreduce作業(yè)也叫做一個(gè)mapreduce的job,而具體的map和reduce運(yùn)算就是task了,這里我們構(gòu)建一個(gè)job,構(gòu)建時(shí)候有兩個(gè)參數(shù),一個(gè)是conf這個(gè)就不累述了,一個(gè)是這個(gè)job的名稱。
第二行就是裝載程序員編寫好的計(jì)算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實(shí)現(xiàn)map函數(shù)和reduce函數(shù),但是實(shí)際開發(fā)我們要實(shí)現(xiàn)三個(gè)類,第三個(gè)類是為了配置mapreduce如何運(yùn)行map和reduce函數(shù),準(zhǔn)確的說就是構(gòu)建一個(gè)mapreduce能執(zhí)行的job了,例如WordCount類。
第三行和第五行就是裝載map函數(shù)和reduce函數(shù)實(shí)現(xiàn)類了,這里多了個(gè)第四行,這個(gè)是裝載Combiner類,這個(gè)我后面講mapreduce運(yùn)行機(jī)制時(shí)候會(huì)講述,其實(shí)本例去掉第四行也沒有關(guān)系,但是使用了第四行理論上運(yùn)行效率會(huì)更好。
這個(gè)是定義輸出的key/value的類型,也就是最終存儲(chǔ)在hdfs上結(jié)果文件的key/value的類型。?
? FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);第一行就是構(gòu)建輸入的數(shù)據(jù)文件,
第二行是構(gòu)建輸出的數(shù)據(jù)文件,
最后一行如果job運(yùn)行成功了,我們的程序就會(huì)正常退出。FileInputFormat和FileOutputFormat可以設(shè)置輸入輸出文件路徑,
mapreduce計(jì)算時(shí)候,輸入文件必須存在,要不直Mr任務(wù)直接退出。輸出一般是一個(gè)文件夾,而且該文件夾不能存在。
?
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(六)Mapreduce的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么选择进口电脑椅怎么选择进口电脑椅子
- 下一篇: Hadoop入门(十二)Intellij