java多个mapreduce_一个简单的MapReduce示例(多个MapReduce任务处理)
一、需求
有一個列表,只有兩列:id、pro,記錄了id與pro的對應(yīng)關(guān)系,但是在同一個id下,pro有可能是重復(fù)的。
現(xiàn)在需要寫一個程序,統(tǒng)計一下每個id下有多少個不重復(fù)的pro。
為了寫一個完整的示例,我使用了多job!
二、文件目錄
|- OutCount //單Job的,本次試驗沒有使用到,這里寫出來供參考
|-OutCount2|-OutCountMapper|-OutCountMapper2|-OutCountReduce|- OutCountReduce2
三、樣本數(shù)據(jù)(部分)
2,10000088379
9,10000088379
6,10000088379
1,10000088379
8,10000088379
0,10000088379
1,10000088379
4,10000091621
3,10000091621
2,10000091621
0,10000091621
6,10000091621
2,10000091621
0,10000091621
0,10000091621
9,10000091621
2,10000091621
四、Java代碼
1、OutCountMapper.java
importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/*** created by wangjunfu on 2017-05-25.
* 4個泛型中,前兩個是指定mapper輸入數(shù)據(jù)的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型
* map 和 reduce 的數(shù)據(jù)輸入輸出都是以 key-value對的形式封裝的
* 默認情況下,Map框架傳遞給我們的mapper的輸入數(shù)據(jù)中,key是要處理的文本中一行的起始偏移量(選用LongWritable),value是這一行的內(nèi)容(VALUEIN選用Text)
* 在wordcount中,經(jīng)過mapper處理數(shù)據(jù)后,得到的是這樣的結(jié)果,所以KEYOUT選用Text,VAULEOUT選用IntWritable*/
public class OutCountMapper extends Mapper{//MapReduce框架每讀一行數(shù)據(jù)就調(diào)用一次map方法
public void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//數(shù)據(jù)格式:uid skuid
String oneline = value.toString().replace(',', '_').trim();//去重思路:Map的key具有數(shù)據(jù)去重的功能,以整個數(shù)據(jù)作為key發(fā)送出去, value為null
context.write(new Text(oneline), new Text(""));/*// 這里需要說明一下,我們現(xiàn)在的樣本是標準的,一行一個樣本。
// 有的情況下一行多個,那就需要進行分割。
// 對這一行的文本按特定分隔符切分
String[] words = oneline.split("\t");
for (String word : words) {
// 遍歷這個單詞數(shù)組,輸出為key-value形式 key:單詞 value : 1
context.write(new Text(word), new IntWritable(1));
}*/}
}
2、OutCountReduce.java
importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;/*** created by wangjunfu on 2017-05-25.
* 經(jīng)過mapper處理后的數(shù)據(jù)會被reducer拉取過來,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致
* 經(jīng)過reducer處理后的數(shù)據(jù)格式為,所以KEYOUT為Text,VALUEOUT為IntWritable*/
public class OutCountReduce extends Reducer{//當mapper框架將相同的key的數(shù)據(jù)處理完成后,reducer框架會將mapper框架輸出的數(shù)據(jù)變成。//例如,在wordcount中會將mapper框架輸出的所有變?yōu)?#xff0c;即這里的,然后將作為reduce函數(shù)的輸入//這個將在下面reduce2 中得到體現(xiàn)
public void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {
context.write(key,new Text(""));
}
}
3、OutCountMapper2.java
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/*** created by wangjunfu on 2017-05-27.
* 將原始數(shù)據(jù)作為map輸出的key設(shè)置為int類型。map會自動的根據(jù)key進行排序*/
public class OutCountMapper2 extends Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//數(shù)據(jù)格式:uid_skuid
String oneline =value.toString();//將這條數(shù)據(jù)中的uid 發(fā)出去, value為計算one
context.write(new Text(oneline.split("_")[0]), one);
}
}
4、OutCountReduce2.java
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;importjava.util.Iterator;/*** created by wangjunfu on 2017-05-27.
* 按統(tǒng)計數(shù)排序:將values作為次序key,將map排序好的key作為value輸出*/
public class OutCountReduce2 extends Reducer{public void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {int sum = 0;//迭代器,訪問容器中的元素,為容器而生
Iterator itr =values.iterator();while(itr.hasNext()) {
sum+=itr.next().get();
}/*// 這種遍歷也可以
// 遍歷v2的list,進行累加求和
for (IntWritable v2 : itr) {
sum = v2.get();
}*/
//按統(tǒng)計數(shù)排序:將values作為次序key,將map排序好的key作為value輸出//context.write(new IntWritable(sum), key);//需要再起一個 map-reduce
context.write(key, newIntWritable(sum));
}
}
5、OutCount2.java
importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapred.JobConf;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;importorg.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 需求:給定一個列表uid skuid,求出uid下不重復(fù)的skuid數(shù)據(jù);然后再按統(tǒng)計大小排序。
* 涉及到多job 處理。
* created by wangjunfu on 2017-05-27.*/
public classOutCount2 {public static void main(String[] args) throwsException {
JobConf conf= new JobConf(OutCount.class);//第一個job的配置
Job job1 = new Job(conf, "Join1");
job1.setJarByClass(OutCount.class);
job1.setMapperClass(OutCountMapper.class);
job1.setReducerClass(OutCountReduce.class);
job1.setMapOutputKeyClass(Text.class); //map階段的輸出的key
job1.setMapOutputValueClass(Text.class); //map階段的輸出的value
job1.setOutputKeyClass(Text.class); //reduce階段的輸出的key
job1.setOutputValueClass(Text.class); //reduce階段的輸出的value//job-1 加入控制容器
ControlledJob ctrljob1 = newControlledJob(conf);
ctrljob1.setJob(job1);//job-1 的輸入輸出文件路徑
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1,new Path(args[1]));//第二個job的配置
Job job2 = new Job(conf, "Join2");
job2.setJarByClass(OutCount.class); //設(shè)置job所在的類在哪個jar包
job2.setMapperClass(OutCountMapper2.class); //指定job所用的mappe類
job2.setReducerClass(OutCountReduce2.class); //指定job所用的reducer類//指定mapper輸出類型和reducer輸出類型//由于在wordcount中mapper和reducer的輸出類型一致,//所以使用setOutputKeyClass和setOutputValueClass方法可以同時設(shè)定mapper和reducer的輸出類型//如果mapper和reducer的輸出類型不一致時,可以使用setMapOutputKeyClass和setMapOutputValueClass單獨設(shè)置mapper的輸出類型
job2.setMapOutputKeyClass(Text.class); //map階段的輸出的key
job2.setMapOutputValueClass(IntWritable.class); //map階段的輸出的value
job2.setOutputKeyClass(Text.class); //reduce階段的輸出的key
job2.setOutputValueClass(IntWritable.class); //reduce階段的輸出的value//job-2 加入控制容器
ControlledJob ctrljob2 = newControlledJob(conf);
ctrljob2.setJob(job2);//設(shè)置多個作業(yè)直接的依賴關(guān)系//job-2 的啟動,依賴于job-1作業(yè)的完成
ctrljob2.addDependingJob(ctrljob1);//輸入路徑是上一個作業(yè)的輸出路徑,因此這里填args[1],要和上面對應(yīng)好
FileInputFormat.addInputPath(job2, new Path(args[1]));//輸出路徑從新傳入一個參數(shù),這里需要注意,因為我們最后的輸出文件一定要是沒有出現(xiàn)過得//因此我們在這里new Path(args[2])因為args[2]在上面沒有用過,只要和上面不同就可以了
FileOutputFormat.setOutputPath(job2, new Path(args[2]));//主的控制容器,控制上面的總的兩個子作業(yè)
JobControl jobCtrl = new JobControl("myOutCount");//添加到總的JobControl里,進行控制
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);//在線程啟動,記住一定要有這個
Thread t = newThread(jobCtrl);
t.start();while (true) {if(jobCtrl.allFinished()) {//如果作業(yè)成功完成,就打印成功作業(yè)的信息
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();break;
}
}
}
}
6、OutCount.java
單Job的,本次試驗沒有使用到,這里寫出來供參考
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.util.GenericOptionsParser;/*** 需求:給定一個列表uid skuid,求出uid下不重復(fù)的skuid數(shù)據(jù);然后再按統(tǒng)計大小排序。
* 涉及到多job 處理。
* created by wangjunfu on 2017-05-25.*/
public classOutCount {public static void main(String[] args) throwsException {
Configuration conf= new Configuration(); //指定作業(yè)執(zhí)行規(guī)范
String[] otherArgs = newGenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {
System.err.println("Usage:wordcount ");
System.exit(2);
}
Job job= new Job(conf, "word count"); //指定job名稱,及運行對象
job.setJarByClass(OutCount.class);
job.setMapperClass(OutCountMapper.class); //指定map函數(shù)
job.setCombinerClass(OutCountReduce.class); //是否需要conbiner整合
job.setReducerClass(OutCountReduce.class); //指定reduce函數(shù)
job.setOutputKeyClass(Text.class); //輸出key格式
job.setOutputValueClass(IntWritable.class); //輸出value格式
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //處理文件路徑
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //結(jié)果輸出路徑//將job提交給集群運行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
五、結(jié)果
11 0
11 1
7 2
10 3
10 4
9 5
10 6
7 7
13 8
9 9
總結(jié)
以上是生活随笔為你收集整理的java多个mapreduce_一个简单的MapReduce示例(多个MapReduce任务处理)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: kademlia java_分布式哈希表
- 下一篇: java通过spring获取配置文件_s
