hadoop join之map side join
在本例中,我們?nèi)匀徊捎蒙弦焕械臄?shù)據(jù)文件。之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位于不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數(shù)據(jù)傳輸。Map side join是針對以下場景進行的優(yōu)化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至于小表可以直接存放到內(nèi)存中。這樣,我們可以將小表復制多份,讓每個map task內(nèi)存中存在一份(比如存放到hash table中),然后只掃描大表:對于大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接后輸出即可。為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:(1)用戶使用靜態(tài)方法DistributedCache.addCacheFile()指定要復制的文件,它的參數(shù)是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作業(yè)啟動之前會獲取這個URI列表,并將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,并使用標準的文件讀寫API讀取相應的文件。
本實例中的運行參數(shù)需要三個,加入在hdfs中有兩個目錄input和input2,其中input2存放user.csv,input存放order.csv,則運行命令格式如下:hadoop jar xxx.jar JoinWithDistribute input2/user.csv input output。
具體實例如下,此實例我們采用舊的API來寫
public class JoinWithDistribute extends Configured implements Tool {public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{//用于緩存小表的數(shù)據(jù),在這里我們緩存user.csv文件中的數(shù)據(jù)private Map<String, String> users = new HashMap<String, String>();private Text outKey = new Text();private Text outValue = new Text();//此方法會在map方法執(zhí)行之前執(zhí)行@Overridepublic void configure(JobConf job){BufferedReader in = null;try{//從當前作業(yè)中獲取要緩存的文件Path[] paths = DistributedCache.getLocalCacheFiles(job);String user = null;String[] userInfo = null;for (Path path : paths){if (path.toString().contains("user.csv")){in = new BufferedReader(new FileReader(path.toString()));while (null != (user = in.readLine())){userInfo = user.split(",", 2);//緩存文件中的數(shù)據(jù)users.put(userInfo[0], userInfo[1]);}}}}catch (IOException e){e.printStackTrace();}finally{try{in.close();}catch (IOException e){e.printStackTrace();}}}public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException{//首先獲取order文件中每條記錄的userId,//再去緩存中取得相同userId的user記錄,合并兩記錄并輸出之。String[] order = value.toString().split(",");String user = users.get(order[0]);if(user != null){outKey.set(user);outValue.set(order[1]);output.collect(outKey, outValue);}}}public int run(String[] args) throws Exception{JobConf job = new JobConf(getConf(), JoinWithDistribute.class);job.setJobName("JoinWithDistribute");job.setMapperClass(MapClass.class);job.setNumReduceTasks(0);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//我們把第一個參數(shù)的地址作為要緩存的文件路徑DistributedCache.addCacheFile(new Path(args[0]).toUri(), job);FileInputFormat.setInputPaths(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path(args[2]));JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception{int res = ToolRunner.run(new Configuration(), new JoinWithDistribute(), args);System.exit(res);}}轉(zhuǎn)發(fā):https://blog.csdn.net/huashetianzu/article/details/7821674
總結(jié)
以上是生活随笔為你收集整理的hadoop join之map side join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c语言头文件和源文件_C语言头文件防卫式
- 下一篇: win7优化设置_win7蓝牙怎么打开?