MapReduce的自制Writable分组输出及组内排序
-
問題描述:
輸入文件格式如下:
name1 ? ?2
name3 ? ?4
name1 ? ?6
name1 ? ?1
name3 ? ?3
name1 ? ?0
要求輸出的文件格式如下:
name1 ? ?0,1,2,6
name3 ? ?3,4
要求是按照第一列分組,name1與name3也是按照順序排列的,組內升序排序。
思路:
常規的輸出,無法排序key所對應的多個值的順序。為了排序組內中的值,需要將key與value放在同一個組。Job中有兩個方法setGroupingComparatorClass和setSortComparatorClass,可以利用這兩個方法來實現組內排序。但是這些排序都是基于key的,則就要將key和value定義成組合鍵。
但是必須要保證第一列相同的全部都放在同一個分區中,則就需要自定義分區,分區的時候只考慮第一列的值。由于partitioner僅僅能保證每一個reducer接受同一個name的所有記錄,但是reducer仍然是通過鍵進行分組的分區,也就說該分區中還是按照鍵來分成不同的組,還需要分組只參考name值
先按照name分組,再在name中內部進行排序。
解決方法:
運用自定義組合鍵的策略,將name和1定義為一個組合鍵。在分區的時候只參考name的值,即繼承partitioner。
?由于要按照name分組,則就需要定義分組策略,然后設置setGroupingComparatorClass。
setGroupingComparatorClass主要定義哪些key可以放置在一組,分組的時候會對組合鍵進行比較,由于這里只需要考慮組合鍵中的一個值,則定義實現一個WritableComparator,設置比較策略。
對于組內的排序,可以利用setSortComparatorClass來實現,
這個方法主要用于定義key如何進行排序在它們傳遞給reducer之前,
這里就可以來進行組內排序。
具體代碼:
? ? ?Hadoop版本號:hadoop1.1.2
自定義組合鍵
view sourceprint? 01.package?whut; 02.import?java.io.DataInput; 03.import?java.io.DataOutput; 04.import?java.io.IOException; 05.import?org.apache.hadoop.io.IntWritable; 06.import?org.apache.hadoop.io.Text; 07.import?org.apache.hadoop.io.WritableComparable; 08.//自定義組合鍵策略 09.//java基本類型數據 10.public?class?TextInt?implements?WritableComparable{ 11.//直接利用java的基本數據類型 12.private?String firstKey; 13.private?int?secondKey; 14.//必須要有一個默認的構造函數 15.public?String getFirstKey() { 16.return?firstKey; 17.} 18.public?void?setFirstKey(String firstKey) { 19.this.firstKey = firstKey; 20.} 21.public?int?getSecondKey() { 22.return?secondKey; 23.} 24.public?void?setSecondKey(int?secondKey) { 25.this.secondKey = secondKey; 26.} 27.? 28.@Override 29.public?void?write(DataOutput out)?throws?IOException { 30.// TODO Auto-generated method stub 31.out.writeUTF(firstKey); 32.out.writeInt(secondKey); 33.} 34.@Override 35.public?void?readFields(DataInput in)?throws?IOException { 36.// TODO Auto-generated method stub 37.firstKey=in.readUTF(); 38.secondKey=in.readInt(); 39.} 40.//map的鍵的比較就是根據這個方法來進行的 41.@Override 42.public?int?compareTo(Object o) { 43.// TODO Auto-generated method stub 44.TextInt ti=(TextInt)o; 45.//利用這個來控制升序或降序 46.//this本對象寫在前面代表是升序 47.//this本對象寫在后面代表是降序 48.return?this.getFirstKey().compareTo(ti.getFirstKey()); 49.} 50.}分組策略
view sourceprint? 01.package?whut; 02.import?org.apache.hadoop.io.WritableComparable; 03.import?org.apache.hadoop.io.WritableComparator; 04.//主要就是對于分組進行排序,分組只按照組建鍵中的一個值進行分組 05.public?class?TextComparator?extends?WritableComparator { 06.//必須要調用父類的構造器 07.protected?TextComparator() { 08.super(TextInt.class,true);//注冊comparator 09.} 10.@Override 11.public?int?compare(WritableComparable a, WritableComparable b) { 12.// TODO Auto-generated method stub 13.TextInt ti1=(TextInt)a; 14.TextInt ti2=(TextInt)b; 15.return?ti1.getFirstKey().compareTo(ti2.getFirstKey()); 16.} 17.}
?組內排序策略
view sourceprint? 01.package?whut; 02.import?org.apache.hadoop.io.WritableComparable; 03.import?org.apache.hadoop.io.WritableComparator; 04.//分組內部進行排序,按照第二個字段進行排序 05.public?class?TextIntComparator?extends?WritableComparator { 06.public?TextIntComparator() 07.{ 08.super(TextInt.class,true); 09.} 10.//這里可以進行排序的方式管理 11.//必須保證是同一個分組的 12.//a與b進行比較 13.//如果a在前b在后,則會產生升序 14.//如果a在后b在前,則會產生降序 15.@Override 16.public?int?compare(WritableComparable a, WritableComparable b) { 17.// TODO Auto-generated method stub 18.TextInt ti1=(TextInt)a; 19.TextInt ti2=(TextInt)b; 20.//首先要保證是同一個組內,同一個組的標識就是第一個字段相同 21.if(!ti1.getFirstKey().equals(ti2.getFirstKey())) 22.return?ti1.getFirstKey().compareTo(ti2.getFirstKey()); 23.else 24.return?ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1 25.} 26.? 27.}
?分區策略
view sourceprint? 01.package?whut; 02.import?org.apache.hadoop.io.IntWritable; 03.import?org.apache.hadoop.mapreduce.Partitioner; 04.//參數為map的輸出類型 05.public?class?KeyPartitioner?extends?Partitioner<TextInt, IntWritable> { 06.@Override 07.public?int?getPartition(TextInt key, IntWritable value,?int?numPartitions) { 08.// TODO Auto-generated method stub 09.return?(key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; 10.} 11.}
?MapReduce策略
view sourceprint? 001.package?whut; 002.import?java.io.IOException; 003.import?org.apache.hadoop.conf.Configuration; 004.import?org.apache.hadoop.conf.Configured; 005.import?org.apache.hadoop.fs.Path; 006.import?org.apache.hadoop.io.IntWritable; 007.import?org.apache.hadoop.io.Text; 008.import?org.apache.hadoop.mapreduce.Job; 009.import?org.apache.hadoop.mapreduce.Mapper; 010.import?org.apache.hadoop.mapreduce.Reducer; 011.import?org.apache.hadoop.mapreduce.Mapper.Context; 012.import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 013.import?org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 014.import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 015.import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 016.import?org.apache.hadoop.util.Tool; 017.import?org.apache.hadoop.util.ToolRunner; 018.//需要對數據進行分組以及組內排序的時候 019.public?class?SortMain?extends?Configured?implements?Tool{ 020.//這里設置輸入文格式為KeyValueTextInputFormat 021.//name1 5 022.//默認輸入格式都是Text,Text 023.public?static?class?GroupMapper?extends 024.Mapper<Text, Text, TextInt, IntWritable>? { 025.public?IntWritable second=new?IntWritable(); 026.public?TextInt tx=new?TextInt(); 027.@Override 028.protected?void?map(Text key, Text value, Context context) 029.throws?IOException, InterruptedException { 030.String lineKey=key.toString(); 031.String lineValue=value.toString(); 032.int?lineInt=Integer.parseInt(lineValue); 033.tx.setFirstKey(lineKey); 034.tx.setSecondKey(lineInt); 035.second.set(lineInt); 036.context.write(tx, second); 037.} 038.} 039.//設置reduce 040.public?static?class?GroupReduce?extends?Reducer<TextInt, IntWritable, Text, Text> 041.{ 042.@Override 043.protected?void?reduce(TextInt key, Iterable<IntWritable> values, 044.Context context) 045.throws?IOException, InterruptedException { 046.StringBuffer sb=new?StringBuffer(); 047.for(IntWritable val:values) 048.{ 049.sb.append(val+","); 050.} 051.if(sb.length()>0) 052.{ 053.sb.deleteCharAt(sb.length()-1); 054.} 055.context.write(new?Text(key.getFirstKey()),?new?Text(sb.toString())); 056.} 057.} 058.? 059.@Override 060.public?int?run(String[] args)?throws?Exception { 061.// TODO Auto-generated method stub 062.Configuration conf=getConf(); 063.Job job=new?Job(conf,"SecondarySort"); 064.job.setJarByClass(SortMain.class); 065.// 設置輸入文件的路徑,已經上傳在HDFS 066.FileInputFormat.addInputPath(job,?new?Path(args[0])); 067.// 設置輸出文件的路徑,輸出文件也存在HDFS中,但是輸出目錄不能已經存在 068.FileOutputFormat.setOutputPath(job,?new?Path(args[1])); 069.? 070.job.setMapperClass(GroupMapper.class); 071.job.setReducerClass(GroupReduce.class); 072.//設置分區方法 073.job.setPartitionerClass(KeyPartitioner.class); 074.? 075.//下面這兩個都是針對map端的 076.//設置分組的策略,哪些key可以放置到一組中 077.job.setGroupingComparatorClass(TextComparator.class); 078.//設置key如何進行排序在傳遞給reducer之前. 079.//這里就可以設置對組內如何排序的方法 080./*************關鍵點**********/ 081.job.setSortComparatorClass(TextIntComparator.class); 082.//設置輸入文件格式 083.job.setInputFormatClass(KeyValueTextInputFormat.class); 084.//使用默認的輸出格式即TextInputFormat 085.//設置map的輸出key和value類型 086.job.setMapOutputKeyClass(TextInt.class); 087.job.setMapOutputValueClass(IntWritable.class); 088.//設置reduce的輸出key和value類型 089.//job.setOutputFormatClass(TextOutputFormat.class); 090.job.setOutputKeyClass(Text.class); 091.job.setOutputValueClass(Text.class); 092.job.waitForCompletion(true); 093.int?exitCode=job.isSuccessful()?0:1; 094.return?exitCode; 095.} 096.? 097.public?static?void?main(String[] args)??throws?Exception 098.{ 099.int?exitCode=ToolRunner.run(new?SortMain(), args); 100.System.exit(exitCode); 101.} 102.}
?注意事項
? ?1,設置分組排序按照升序還是降序是在自定義WritableComparable中的compareTo()方法實現的,具體升序或者降序的設置在代碼中已經注釋說明
? ?2,設置組內值進行升序還是降序的排序是在組內排序策略中的compare()方法注釋說明的。
? ?3,這里同時最重要的一點是,將第二列即放在組合鍵中,又作為value,這樣對于組合鍵排序也就相當于對于value進行排序了。
? ?4,在自定義組合鍵的時候,對于組合鍵中的數據的基本類型可以采用Java的基本類型也可以采用Hadoop的基本數據類型,對于Hadoop的基本數據類型一定要記得初始化new一個基本數據類型對象。對于組合鍵類,必須要有默認的構造方法。
-
總結
以上是生活随笔為你收集整理的MapReduce的自制Writable分组输出及组内排序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 内存有限的情况下 Spark 如何处理
- 下一篇: MapReduce TopK统计加排序