public class MyJoin
{public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{//最好在map方法外定義變量,以減少map計(jì)算時(shí)創(chuàng)建對(duì)象的個(gè)數(shù)private Text key = new Text();private Text value = new Text();private String[] keyValue = null;@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{//采用的數(shù)據(jù)輸入格式是TextInputFormat,//文件被分為一系列以換行或者制表符結(jié)束的行,//key是每一行的位置(偏移量,LongWritable類(lèi)型),//value是每一行的內(nèi)容,Text類(lèi)型,所有我們要把key從value中解析出來(lái)keyValue = value.toString().split(",", 2);this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}public static class Reduce extends Reducer<Text, Text, Text, Text>{//最好在reduce方法外定義變量,以減少reduce計(jì)算時(shí)創(chuàng)建對(duì)象的個(gè)數(shù)private Text value = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{StringBuilder valueStr = new StringBuilder();//values中的每一個(gè)值是不同數(shù)據(jù)文件中的具有相同key的值//即是map中輸出的多個(gè)文件相同key的value值集合for(Text val : values){valueStr.append(val);valueStr.append(",");}this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());context.write(key, this.value);}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "MyJoin");job.setJarByClass(MyJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);//job.setCombinerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//分別采用TextInputFormat和TextOutputFormat作為數(shù)據(jù)的輸入和輸出格式//如果不設(shè)置,這也是Hadoop默認(rèn)的操作方式j(luò)ob.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}