MapReduce数据连接
生活随笔
收集整理的這篇文章主要介紹了
MapReduce数据连接
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
對于不同文件里的數據,有時候有相應關系,須要進行連接(join),獲得一個新的文件以便進行分析。比方有兩個輸入文件a.txt,b.txt,當中的數據格式分別例如以下
1 a 2 b 3 c 4 d
1 good 2 bad 3 ok 4 hello
須要將其連接成一個新的例如以下的文件:
a good b bad c ok d hello
處理步驟能夠分成兩步:
1.map階段,將兩個輸入文件里的數據進行打散,例如以下:
1 a 1 good 2 b 2 bad 3 c 3 ok 4 d 4 hello
2.reduce階段,進行數據的連接操作,此處數據較簡單,僅僅要推斷map結果的value的長度是不是1就決定是新的鍵還是值。 package cn.zhf.hadoop;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;public class SingleJoin extends Configured implements Tool{public static void main(String[] args) throws Exception {Tool tool = new SingleJoin();ToolRunner.run(tool, args);print(tool);}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = getConf();Job job = new Job();job.setJarByClass(getClass());FileSystem fs = FileSystem.get(conf);fs.delete(new Path("out"),true);FileInputFormat.addInputPath(job, new Path("a.txt"));FileInputFormat.addInputPath(job, new Path("b.txt"));FileOutputFormat.setOutputPath(job,new Path("out"));job.setMapperClass(JoinMapper.class);job.setReducerClass(JoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);return 0;}public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] str = value.toString().split(" ");context.write(new Text(str[0]), new Text(str[1]));}}public static class JoinReducer extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{Iterator<Text> iterator = values.iterator();Text keyy = new Text();Text valuee = new Text();while(iterator.hasNext()){Text temp = iterator.next();if(temp.toString().length() == 1){keyy.set(temp);valuee.set(iterator.next());}else{valuee.set(temp);keyy.set(iterator.next());}}context.write(keyy, valuee);}}public static void print(Tool tool) throws IOException{FileSystem fs = FileSystem.get(tool.getConf());Path path = new Path("out/part-r-00000");FSDataInputStream fsin = fs.open(path);int length = 0;byte[] buff = new byte[128];while((length = fsin.read(buff,0,128)) != -1)System.out.println(new String(buff,0,length));} }
reference:《MapReduce2.0源代碼分析及編程實踐》
總結
以上是生活随笔為你收集整理的MapReduce数据连接的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DB2 导入导出总结
- 下一篇: proc文件系统探索 之 根目录下的文件