生活随笔
收集整理的這篇文章主要介紹了
Hadoop中Partition解析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.解析Partition
Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作后,通過OutputFormat,進行輸出,下面我們就來分析參與這個過程的類。
Mapper的結果,可能送到Combiner做合并,Combiner在系統中并沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。Mapper最終處理的鍵值對<key, value>,是需要送到Reducer去合并的,合并的時候,有相同key的鍵/值對會送到同一個Reducer那。哪個key到哪個Reducer的分配過程,是由Partitioner規定的。它只有一個方法,
[java]?view plaincopyprint?
getPartition(Text?key,?Text?value,?int?numPartitions)?? 輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。就是指定Mappr輸出的鍵值對到哪一個reducer上去。系統缺省的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,得到對應的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個reducre上。如果有N個reducer,編號就為0,1,2,3……(N-1)。
Reducer是所有用戶定制Reducer類的基類,和Mapper類似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含義和Mapper相同,reduce是真正合并Mapper結果的地方,它的輸入是key和這個key對應的所有value的一個迭代器,同時還包括Reducer的上下文。系統中定義了兩個非常簡單的Reducer,IntSumReducer和LongSumReducer,分別用于對整形/長整型的value求和。
Reduce的結果,通過Reducer.Context的方法collect輸出到文件中,和輸入類似,Hadoop引入了OutputFormat。OutputFormat依賴兩個輔助接口:RecordWriter和OutputCommitter,來處理輸出。RecordWriter提供了write方法,用于輸出<key, value>和close方法,用于關閉對應的輸出。OutputCommitter提供了一系列方法,用戶通過實現這些方法,可以定制OutputFormat生存期某些階段需要的特殊操作。我們在TaskInputOutputContext中討論過這些方法(明顯,TaskInputOutputContext是OutputFormat和Reducer間的橋梁)。OutputFormat和RecordWriter分別對應著InputFormat和RecordReader,系統提供了空輸出NullOutputFormat(什么結果都不輸出,NullOutputFormat.RecordWriter只是示例,系統中沒有定義),LazyOutputFormat(沒在類圖中出現,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat輸出。
基于文件的輸出FileOutputFormat利用了一些配置項配合工作,包括:
mapred.output.compress:是否壓縮;
mapred.output.compression.codec:壓縮方法;
mapred.output.dir:輸出路徑;
mapred.work.output.dir:輸出工作路徑。
FileOutputFormat還依賴于FileOutputCommitter,通過FileOutputCommitter提供一些和Job,Task相關的臨時文件管理功能。如FileOutputCommitter的setupJob,會在輸出路徑下創建一個名為_temporary的臨時目錄,cleanupJob則會刪除這個目錄。
SequenceFileOutputFormat輸出和TextOutputFormat輸出分別對應輸入的SequenceFileInputFormat和TextInputFormat。
2.代碼實例
[java]?view plaincopyprint?
package?org.apache.hadoop.examples;????import?java.io.IOException;??import?java.util.*;??import?org.apache.hadoop.fs.Path;??import?org.apache.hadoop.conf.*;??import?org.apache.hadoop.io.*;??import?org.apache.hadoop.mapred.*;??import?org.apache.hadoop.util.*;???????????????????public?class?MyPartitioner?{????????????public?static?class?MyMap?extends?MapReduceBase?implements??????????????Mapper<LongWritable,?Text,?Text,?Text>?{??????????public?void?map(LongWritable?key,?Text?value,??????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)??????????????????throws?IOException?{??????????????String[]?arr_value?=?value.toString().split("\t");????????????????????????????????????????Text?word1?=?new?Text();??????????????Text?word2?=?new?Text();??????????????if?(arr_value.length?>?3)?{??????????????????word1.set("long");??????????????????word2.set(value);??????????????}?else?if?(arr_value.length?<?3)?{??????????????????word1.set("short");??????????????????word2.set(value);??????????????}?else?{??????????????????word1.set("right");??????????????????word2.set(value);??????????????}??????????????output.collect(word1,?word2);??????????}??????}????????????public?static?class?MyReduce?extends?MapReduceBase?implements??????????????Reducer<Text,?Text,?Text,?Text>?{??????????public?void?reduce(Text?key,?Iterator<Text>?values,??????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)??????????????????throws?IOException?{??????????????int?sum?=?0;??????????????System.out.println(key);??????????????while?(values.hasNext())?{??????????????????output.collect(key,?new?Text(values.next().getBytes()));??????????????????}??????????}??????}??????????????public?static?class?MyPartitionerPar?implements?Partitioner<Text,?Text>?{????????????????????????@Override??????????public?int?getPartition(Text?key,?Text?value,?int?numPartitions)?{????????????????????????????int?result?=?0;??????????????System.out.println("numPartitions--"?+?numPartitions);??????????????if?(key.toString().equals("long"))?{??????????????????result?=?0?%?numPartitions;??????????????}?else?if?(key.toString().equals("short"))?{??????????????????result?=?1?%?numPartitions;??????????????}?else?if?(key.toString().equals("right"))?{??????????????????result?=?2?%?numPartitions;??????????????}??????????????System.out.println("result--"?+?result);??????????????return?result;??????????}????????????????????@Override??????????public?void?configure(JobConf?arg0)???????????{????????????????????????}??????}??????????????public?static?void?main(String[]?args)?throws?Exception?{??????????JobConf?conf?=?new?JobConf(MyPartitioner.class);??????????conf.setJobName("MyPartitioner");??????????????????????????????conf.setNumReduceTasks(3);????????????conf.setMapOutputKeyClass(Text.class);??????????conf.setMapOutputValueClass(Text.class);??????????????????????conf.setPartitionerClass(MyPartitionerPar.class);????????????conf.setOutputKeyClass(Text.class);??????????conf.setOutputValueClass(Text.class);??????????????????????conf.setMapperClass(MyMap.class);??????????conf.setReducerClass(MyReduce.class);????????????conf.setInputFormat(TextInputFormat.class);??????????conf.setOutputFormat(TextOutputFormat.class);????????????FileInputFormat.setInputPaths(conf,?new?Path(args[0]));??????????FileOutputFormat.setOutputPath(conf,?new?Path(args[1]));????????????JobClient.runJob(conf);??????}??}??
本文轉自xwdreamer博客園博客,原文鏈接:http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的Hadoop中Partition解析的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。