Hadoop入门(十四)Mapreduce的数据去重程序
1 實(shí)例描述
對數(shù)據(jù)文件中的數(shù)據(jù)進(jìn)行去重。數(shù)據(jù)文件中的每行都是一個數(shù)據(jù)
樣例輸入如下所示:
1)file1
2)file2
2012-3-1 b 2012-3-2 a 2012-3-3 b 2012-3-4 d 2012-3-5 a 2012-3-6 c 2012-3-7 d 2012-3-3 c期望輸出:
2012-3-1 a 2012-3-1 b 2012-3-2 a 2012-3-2 b 2012-3-3 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-6 c 2012-3-7 c 2012-3-7 d2 問題分析
數(shù)據(jù)去重的最終目標(biāo)是讓原始數(shù)據(jù)中出現(xiàn)次數(shù)超過一次的數(shù)據(jù)在輸出文件中只出現(xiàn)一次。
分析:
根據(jù)reduce的過程特性,會自動根據(jù)key來計(jì)算輸入的value集合
把數(shù)據(jù)作為key輸出給reduce,無論這個數(shù)據(jù)出現(xiàn)多少次,reduce最終結(jié)果中key只能輸出一次。
?
3.實(shí)現(xiàn)步驟
實(shí)例中每個數(shù)據(jù)代表輸入文件中的一行內(nèi)容,map階段采用Hadoop默認(rèn)的作業(yè)輸入方式。
? ? ? ?將value設(shè)置為key,并直接輸出。 map輸出數(shù)據(jù)的key為數(shù)據(jù),將value設(shè)置成空值
2. ? ?在MapReduce流程中,map的輸出<key,value>經(jīng)過shuffle過程聚集成<key,value-list>后會交給reduce
3. ? ?reduce階段不管每個key有多少個value,它直接將輸入的key復(fù)制為輸出的key,并輸出(輸出中的value被設(shè)置成空)。
?
4.關(guān)鍵代碼
package com.mk.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException; import java.net.URI;public class Distinct {public static class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {value.set(value.toString().trim());context.write(value, NullWritable.get());}}public static class DistinctReducer extends Reducer<Text,NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/distinct/input";String output = "/distinct/output";Configuration conf = new Configuration();if(System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform","true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path,true);Job job = new Job(conf,"Distinct");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(Distinct.class);job.setMapperClass(DistinctMapper.class);job.setCombinerClass(DistinctReducer.class);job.setReducerClass(DistinctReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" +ret);} }?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(十四)Mapreduce的数据去重程序的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 推荐一款电脑端软件启动加密软件电脑加密软
- 下一篇: 电脑无线网总是断网的解决办法电脑无线网老