Hadoop入门(十五)Mapreduce的数据排序程序
"數(shù)據(jù)排序"是許多實(shí)際任務(wù)執(zhí)行時(shí)要完成的第一項(xiàng)工作,比如學(xué)生成績(jī)?cè)u(píng)比、數(shù)據(jù)建立索引等。這個(gè)實(shí)例和數(shù)據(jù)去重類似,都是先對(duì)原始數(shù)據(jù)進(jìn)行初步處理,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)
1 實(shí)例描述
對(duì)輸入文件中數(shù)據(jù)進(jìn)行排序。輸入文件中的每行內(nèi)容均為一個(gè)數(shù)字,即一個(gè)數(shù)據(jù)。要求在輸出中每行有兩個(gè)間隔的數(shù)字,其中,第一個(gè)代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個(gè)代表原始數(shù)據(jù)。
樣例輸入如下所示:?
1)file1 ?
2)file2 ?
5956 22 650 923)file3
26 54 6期望輸出:
1??? 2 2??? 6 3??? 15 4??? 22 5??? 26 6??? 32 7??? 32 8??? 54 9??? 92 10??? 650 11??? 654 12??? 756 13??? 5956 14??? 65223?
2 問(wèn)題分析
這個(gè)實(shí)例僅僅要求對(duì)輸入數(shù)據(jù)進(jìn)行排序
分析:
? ?MapReduce過(guò)程中就有排序,它的默認(rèn)排序規(guī)則按照key值進(jìn)行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對(duì)key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序?qū)ψ址判颉?br />
使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個(gè)全局變量,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個(gè)程序中沒(méi)有配置Combiner,也就是在MapReduce過(guò)程中不使用Combiner。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。
?
3.實(shí)現(xiàn)步驟
?
?
4.關(guān)鍵代碼
正序:
package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class Sort {public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable v = new IntWritable(Integer.parseInt(value.toString().trim()));context.write(v, new IntWritable(1));}}public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {int count = 1;@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable v: values) {context.write(new IntWritable(count ++), key);}}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/sort/input";String output = "/sort/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Sort");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Sort.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);} }?
?
逆序:
package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class Sort {public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable v = new IntWritable(Integer.parseInt(value.toString().trim()));context.write(v, new IntWritable(1));}}public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {int count = 1;@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable v: values) {context.write(new IntWritable(count ++), key);}}}public static class SortComparator implements RawComparator<IntWritable> {@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return IntWritable.Comparator.compareBytes(b2, s2, l2, b1, s1, l1);}@Overridepublic int compare(IntWritable o1, IntWritable o2) {return o2.get() - o1.get();}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/sort/input";String output = "/sort/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Sort");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Sort.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));job.setSortComparatorClass(SortComparator.class);boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);} }?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(十五)Mapreduce的数据排序程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 电脑无线网总是断网的解决办法电脑无线网老
- 下一篇: 中小型企业文档管理和工作流程解决方案中小