MapReduce DataJoin 链接多数据源
-
主要介紹用DataJoin類來鏈接多數(shù)據源,先看一下例子,假設二個數(shù)據源customs和orders
customer ID?????? Name????? PhomeNumber
1??????????????????????? 趙一??????? 025-5455-566
2??????????????????????? 錢二??????? 025-4587-565
3??????????????????????? 孫三??????? 021-5845-5875
客戶的訂單號:
Customer ID???? order ID???? Price??? Data
2????????????????????????? 1?????????????? 93?????? 2008-01-08
3????????????????????????? 2?????????????? 43?????? 2012-01-21
1????????????????????????? 3?????????????? 43?????? 2012-05-12
2????????????????????????? 4?????????????? 32?????? 2012-5-14
問題:現(xiàn)在要生成訂單
customer ID??? name??? PhomeNumber???? Price???? Data
2????????????????????? 錢二???? 025-4587-565??????? 93????????? 2008-01-08
上面是一個例子,下面介紹一下hadoop中DataJoin類具體的做法。
首先,需要為不同數(shù)據源下的每個數(shù)據定義一個數(shù)據標簽,這一點不難理解,就是標記數(shù)據的出處。
其次,需要為每個待鏈接的數(shù)據記錄確定一個鏈接主鍵,這一點不難理解。DataJoin類庫分別在map階段和Reduce階段提供一個處理框架,盡可能幫助程序員完成一些處理的工作,僅僅留下一些必須工作,由程序完成。
Map階段
DataJoin類庫里有一個抽象基類DataJoinMapperBase,該基類實現(xiàn)了map方法,該方法為對每個數(shù)據源下的文本的記錄生成一個帶表見的數(shù)據記錄對象。但是程序必須指定它是來自于哪個數(shù)據源,即Tag,還要指定它的主鍵是什么即GroupKey。如果指定了Tag和GroupKey,那么map將會生成一下的記錄,customer表為例customers???????? 1??????????????? 趙一??????? 025-5455-566;?????? customers???????? 2??????????????? 錢二??????? 025-4587-565;
Map過程中Tag和GroupKey都是程序員給定,所以要肯定要就有接口供程序員去實現(xiàn),DataJoinMapperBase實現(xiàn)下面3個接口。
abstract Text gernerateInputTag(String inuptFile), 看方法名就知道是設置Tag。
abstract Text generateGroupKey(TaggedMapOutput lineRecord), 該方法是設置GroupKey,其中,lineRecord是數(shù)據源中的一行數(shù)據,該方法可以在這一行數(shù)據上設置任意的GroupKey為主鍵。
abstract TaggedMapOutput generateMapOutput(object value), 該抽象方法用于把數(shù)據源中的原始數(shù)據記錄包裝成一個帶標簽的數(shù)據源。TaggedMapOutputs是一行記錄的數(shù)據類型。代碼如下:
?
view sourceprint? 01.import?org.apache.hadoop.contrib.utils.join.*; 02.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 03.import?org.apache.hadoop.io.Text; 04.? 05.public?class?MapClass?extends?DataJoinMapperBase{ 06.? 07.@Override 08.protected?Text generateGroupKey(TaggedMapOutput arg0) { 09.String line = ((Text)arg0.getData()).toString(); 10.String[] tokens = line.split(","); 11.String groupKey = tokens[0]; 12.return?new?Text(groupKey); 13.} 14.? 15.@Override 16.protected?Text generateInputTag(String arg0) { 17.? 18.String dataSource = arg0.split("-")[0]; 19.return?new?Text(dataSource); 20.} 21.? 22.@Override 23.protected?TaggedMapOutput generateTaggedMapOutput(Object arg0) { 24.TaggedWritable tw =?new?TaggedWritable((Text)arg0); 25.tw.setTag(this.inputTag); 26.return?tw; 27.} 28.}
view sourceprint? 01.import?java.io.DataInput; 02.import?java.io.DataOutput; 03.import?java.io.IOException; 04.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 05.import?org.apache.hadoop.io.Text; 06.import?org.apache.hadoop.io.Writable; 07.? 08.public?class?TaggedWritable?extends?TaggedMapOutput{ 09.? 10.private?Writable data; 11.public?TaggedWritable(Writable data) {? 12.this.tag =?new?Text("");? 13.this.data = data;? 14.} 15.? 16.@Override 17.public?Writable getData() { 18.return?data; 19.} 20.? 21.@Override 22.public?void?readFields(DataInput arg0)?throws?IOException { 23.this.tag.readFields(arg0); 24.this.data.readFields(arg0); 25.} 26.? 27.@Override 28.public?void?write(DataOutput arg0)?throws?IOException { 29.this.tag.write(arg0); 30.this.data.write(arg0);??? 31.} 32.}
每個記錄的數(shù)據源標簽可以由generateInputTag()產生,通過setTag()方法設置記錄的Tag。
note:1.該記錄不是關系數(shù)據庫,是文本文件,2.?TaggedMapOutput在import org.apache.hadoop.contrib.utils.join.*頭文件中,有的時候在eclipse下,每個這個頭文件,這時 ? 只要找到你的hadoop的目錄下contrib/datajoin文件加,把jar文件導入eclipse中即可。
Reduce 階段
DataJoinReduceBase中已經實現(xiàn)reduce()方法,具有同一GroupKey的數(shù)據分到同一Reduce中,通過reduce的方法將對來自不同的數(shù)據源和據用相同的GroupKey做一次叉積組合。這個比較難懂,舉個例子:
customers???????? 2??????????????? 錢二??????? 025-4587-565;
orders????? 2??????????????? 1?????????????? 93?????? 2008-01-08;?
orders 2?????????? 4?????????????? 32?????? 2012-5-14
?
按照map()結果的數(shù)據,就是下表給出的結果(3個記錄),他們都有一個共同的GroupKey,帶來自于二個數(shù)據源,所以叉積的結果為
customers???????? 2??????????????? 錢二??????? 025-4587-565
orders????? 2??????????????? 1?????????????? 93?????? 2008-01-08
?customers???????? 2??????????????? 錢二??????? 025-4587-565orders 2?????????? 4?????????????? 32?????? 2012-5-14
?如果Reduce階段看懂了,基本上這個就搞定了,Reduce是系統(tǒng)做的,不需要用戶重載,接下來的工作就是要實現(xiàn)一個combine()函數(shù),它的作用是將每個叉積合并起來,形成訂單的格式。
代碼如下:
view sourceprint? 01.import?org.apache.hadoop.conf.Configuration; 02.import?org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; 03.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 04.import?org.apache.hadoop.fs.Path; 05.import?org.apache.hadoop.io.Text; 06.import?org.apache.hadoop.mapred.JobClient; 07.import?org.apache.hadoop.mapred.JobConf; 08.import?org.apache.hadoop.mapred.jobcontrol.Job; 09.import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10.import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11.? 12.public?class?ReduceClass?extends?DataJoinReducerBase{ 13.? 14.@Override 15.protected?TaggedMapOutput combine(Object[] tags, Object[] values) { 16.if(tags.length<2)return?null; 17.StringBuffer joinData =?new?StringBuffer(); 18.int?count=0; 19.? 20.for(Object value: values){ 21.joinData.append(","); 22.TaggedWritable tw = (TaggedWritable)value; 23.String recordLine = ((Text)tw.getData()).toString(); 24.String[] tokens = recordLine.split(",",2); 25.if(count==0) joinData.append(tokens[0]); 26.joinData.append(tokens[1]); 27.} 28.? 29.TaggedWritable rtv =?new?TaggedWritable(new?Text(new?String(joinData))); 30.rtv.setTag((Text)tags[0]); 31.return?rtv; 32.} 33.? 34.public?static?void?main(String[] args){ 35.? 36.Configuration conf =?new?Configuration();?? 37.JobConf job =?new?JobConf(conf, ReduceClass.class);? 38.? 39.Path in =?new?Path(args[0]);? 40.Path out =?new?Path(args[1]);? 41.FileInputFormat.setInputPaths(job, in);? 42.FileOutputFormat.setOutputPath(job, out);? 43.job.setJobName("DataJoin");? 44.job.setMapperClass(MapClass.class);? 45.job.setReducerClass(ReduceClass.class);? 46.? 47.job.setInputFormat(TextInputFormat.class);? 48.job.setOutputFormat(TextOutputFormat.class);? 49.job.setOutputKeyClass(Text.class);? 50.job.setOutputValueClass(TaggedWritable.class);? 51.job.set("mapred.textoutputformat.separator",?",");? 52.JobClient.runJob(job); 53.? 54.} 55.}
總結
以上是生活随笔為你收集整理的MapReduce DataJoin 链接多数据源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce TopK统计加排序
- 下一篇: Hadoop源码导入Eclipse