hadoop join之semi join
SemiJoin,也叫半連接,是從分布式數(shù)據(jù)庫中借鑒過來的方法。它的產(chǎn)生動機是:對于reduce side join,跨機器的數(shù)據(jù)傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數(shù)據(jù),則可以大大節(jié)省網(wǎng)絡(luò)IO。實現(xiàn)方法很簡單:選取一個小表,假設(shè)是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內(nèi)存中。在map階段,使用DistributedCache將File3復(fù)制到各個TaskTracker上,然后將File2中不在File3中的key對應(yīng)的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。此實例中,還是采用第一個實例中的數(shù)據(jù),假如我們只過濾sex為1的user,并將key存于user_id文件中(注意:每行的數(shù)據(jù)一定要帶上雙引號啊),如下:
"ID"
"1"
"2"
"3"
"5"
"6"
"8"
"9"
完整代碼如下,此實例中我們采用新的API來寫:
public class SemiJoin extends Configured implements Tool {public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{// 用于緩存user_id文件中的數(shù)據(jù)private Set<String> userIds = new HashSet<String>();private Text key = new Text();private Text value = new Text();private String[] keyValue;// 此方法會在map方法執(zhí)行之前執(zhí)行@Overrideprotected void setup(Context context) throws IOException, InterruptedException{BufferedReader in = null;try{// 從當(dāng)前作業(yè)中獲取要緩存的文件Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());String userId = null;for (Path path : paths){if (path.toString().contains("user_id")){in = new BufferedReader(new FileReader(path.toString()));while (null != (userId = in.readLine())){userIds.add(userId);}}}}catch (IOException e){e.printStackTrace();}finally{try{if(in != null){in.close(); }}catch (IOException e){e.printStackTrace();}}}public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{// 在map階段過濾掉不需要的數(shù)據(jù)this.keyValue = value.toString().split(",");if(userIds.contains(keyValue[0])){this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}}public static class Reduce extends Reducer<Text, Text, Text, Text>{private Text value = new Text();private StringBuilder sb;public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{sb = new StringBuilder();for(Text val : values){sb.append(val.toString());sb.append(",");}this.value.set(sb.deleteCharAt(sb.length()-1).toString());context.write(key, this.value);}}public int run(String[] args) throws Exception{Job job = new Job(getConf(), "SemiJoin");job.setJobName("SemiJoin");job.setJarByClass(SemiJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();// 我們把第一個參數(shù)的地址作為要緩存的文件路徑DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());FileInputFormat.addInputPath(job, new Path(otherArgs[1]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(), new SemiJoin(), args);System.exit(res);}}轉(zhuǎn)發(fā):https://blog.csdn.net/huashetianzu/article/details/7823326
總結(jié)
以上是生活随笔為你收集整理的hadoop join之semi join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: @value 静态变量_面试官:为什么静
- 下一篇: ad中电容用什么封装_用什么来降低噪声?