MapReduce实现join操作
生活随笔
收集整理的這篇文章主要介紹了
MapReduce实现join操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前陣子把MapReduce實現join操作的算法設想清楚了,但一直沒有在代碼層面落地。今天終于費了些功夫把整個流程走了一遭,期間經歷了諸多麻煩并最終得以將其一一搞定,再次深切體會到,什么叫從計算模型到算法實現還有很多路要走。
create?table?if?not?exists?m_ys_lab_jointest_a?(?? ?????id?bigint,?? ?????name?string?? )?? row?format?delimited?? fields?terminated?by?'9'?? lines?terminated?by?'10'?? stored?as?textfile;?? 數據:
(2)m_ys_lab_jointest_b(以下簡稱表B) 建表語句為: [sql]?view plaincopyprint? create?table?if?not?exists?m_ys_lab_jointest_b?(?? ?????id?bigint,?? ?????statyear?bigint,?? ?????num?bigint?? )?? row?format?delimited?? fields?terminated?by?'9'?? lines?terminated?by?'10'?? stored?as?textfile;?? 數據:
我們的目的是,以id為key做join操作,得到以下表: m_ys_lab_jointest_ab
import?java.io.IOException;?? import?java.util.HashMap;?? import?java.util.Iterator;?? import?java.util.Vector;?? ?? import?org.apache.hadoop.io.LongWritable;?? import?org.apache.hadoop.io.Text;?? import?org.apache.hadoop.io.Writable;?? import?org.apache.hadoop.mapred.FileSplit;?? import?org.apache.hadoop.mapred.JobConf;?? import?org.apache.hadoop.mapred.MapReduceBase;?? import?org.apache.hadoop.mapred.Mapper;?? import?org.apache.hadoop.mapred.OutputCollector;?? import?org.apache.hadoop.mapred.RecordWriter;?? import?org.apache.hadoop.mapred.Reducer;?? import?org.apache.hadoop.mapred.Reporter;?? ?? /**? ?*?MapReduce實現Join操作? ?*/?? public?class?MapRedJoin?{?? ????public?static?final?String?DELIMITER?=?"\u0009";?//?字段分隔符?? ?????? ????//?map過程?? ????public?static?class?MapClass?extends?MapReduceBase?implements?? ????????????Mapper<LongWritable,?Text,?Text,?Text>?{?? ?????????????????????????? ????????public?void?configure(JobConf?job)?{?? ????????????super.configure(job);?? ????????}?? ?????????? ????????public?void?map(LongWritable?key,?Text?value,?OutputCollector<Text,?Text>?output,?? ????????????????Reporter?reporter)?throws?IOException,?ClassCastException?{?? ????????????//?獲取輸入文件的全路徑和名稱?? ????????????String?filePath?=?((FileSplit)reporter.getInputSplit()).getPath().toString();?? ????????????//?獲取記錄字符串?? ????????????String?line?=?value.toString();?? ????????????//?拋棄空記錄?? ????????????if?(line?==?null?||?line.equals(""))?return;??? ?????????????? ????????????//?處理來自表A的記錄?? ????????????if?(filePath.contains("m_ys_lab_jointest_a"))?{?? ????????????????String[]?values?=?line.split(DELIMITER);?//?按分隔符分割出字段?? ????????????????if?(values.length?<?2)?return;?? ?????????????????? ????????????????String?id?=?values[0];?//?id?? ????????????????String?name?=?values[1];?//?name?? ?????????????????? ????????????????output.collect(new?Text(id),?new?Text("a#"+name));?? ????????????}?? ????????????//?處理來自表B的記錄?? ????????????else?if?(filePath.contains("m_ys_lab_jointest_b"))?{?? ????????????????String[]?values?=?line.split(DELIMITER);?//?按分隔符分割出字段?? ????????????????if?(values.length?<?3)?return;?? ?????????????????? ????????????????String?id?=?values[0];?//?id?? ????????????????String?statyear?=?values[1];?//?statyear?? ????????????????String?num?=?values[2];?//num?? ?????????????????? ????????????????output.collect(new?Text(id),?new?Text("b#"+statyear+DELIMITER+num));?? ????????????}?? ????????}?? ????}?? ?????? ????//?reduce過程?? ????public?static?class?Reduce?extends?MapReduceBase?? ????????????implements?Reducer<Text,?Text,?Text,?Text>?{?? ????????public?void?reduce(Text?key,?Iterator<Text>?values,?? ????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)?? ????????????????throws?IOException?{?? ?????????????????????? ????????????Vector<String>?vecA?=?new?Vector<String>();?//?存放來自表A的值?? ????????????Vector<String>?vecB?=?new?Vector<String>();?//?存放來自表B的值?? ?????????????? ????????????while?(values.hasNext())?{?? ????????????????String?value?=?values.next().toString();?? ????????????????if?(value.startsWith("a#"))?{?? ????????????????????vecA.add(value.substring(2));?? ????????????????}?else?if?(value.startsWith("b#"))?{?? ????????????????????vecB.add(value.substring(2));?? ????????????????}?? ????????????}?? ?????????????? ????????????int?sizeA?=?vecA.size();?? ????????????int?sizeB?=?vecB.size();?? ?????????????? ????????????//?遍歷兩個向量?? ????????????int?i,?j;?? ????????????for?(i?=?0;?i?<?sizeA;?i?++)?{?? ????????????????for?(j?=?0;?j?<?sizeB;?j?++)?{?? ????????????????????output.collect(key,?new?Text(vecA.get(i)?+?DELIMITER?+vecB.get(j)));?? ????????????????}?? ????????????}????? ????????}?? ????}?? ?????? ????protected?void?configJob(JobConf?conf)?{?? ????????conf.setMapOutputKeyClass(Text.class);?? ????????conf.setMapOutputValueClass(Text.class);?? ????????conf.setOutputKeyClass(Text.class);?? ????????conf.setOutputValueClass(Text.class);?? ????????conf.setOutputFormat(ReportOutFormat.class);?? ????}?? }??
數據準備
首先是準備好數據。這個倒已經是一個熟練的過程,所要做的是把示例數據準備好,記住路徑和字段分隔符。 準備好下面兩張表: (1)m_ys_lab_jointest_a(以下簡稱表A) 建表語句為: [sql]?view plaincopyprint?| id???? name 1???? 北京 2???? 天津 3???? 河北 4???? 山西 5???? 內蒙古 6???? 遼寧 7???? 吉林 8???? 黑龍江 |
(2)m_ys_lab_jointest_b(以下簡稱表B) 建表語句為: [sql]?view plaincopyprint?
| id ? ? statyear???? num 1???? 2010???? 1962 1???? 2011???? 2019 2???? 2010???? 1299 2???? 2011???? 1355 4???? 2010???? 3574 4???? 2011???? 3593 9???? 2010???? 2303 9???? 2011???? 2347 |
我們的目的是,以id為key做join操作,得到以下表: m_ys_lab_jointest_ab
| id???? name ? ?statyear?????num 1?????? 北京??? 2011??? 2019 1?????? 北京??? 2010??? 1962 2?????? 天津??? 2011??? 1355 2?????? 天津??? 2010??? 1299 4?????? 山西??? 2011??? 3593 4?????? 山西??? 2010??? 3574 |
計算模型
整個計算過程是: (1)在map階段,把所有記錄標記成<key, value>的形式,其中key是id,value則根據來源不同取不同的形式:來源于表A的記錄,value的值為"a#"+name;來源于表B的記錄,value的值為"b#"+score。 (2)在reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終結果。 如下圖所示:代碼
代碼如下: [java]?view plaincopyprint?技術細節
下面說一下其中的若干技術細節: (1)由于輸入數據涉及兩張表,我們需要判斷當前處理的記錄是來自表A還是來自表B。Reporter類getInputSplit()方法可以獲取輸入數據的路徑,具體代碼如下: String?filePath?= ((FileSplit)reporter.getInputSplit()).getPath().toString(); (2)map的輸出的結果,同id的所有記錄(不管來自表A還是表B)都在同一個key下保存在同一個列表中,在reduce階段需要將其拆開,保存為相當于笛卡爾積的m x n條記錄。由于事先不知道m、n是多少,這里使用了兩個向量(可增長數組)來分別保存來自表A和表B的記錄,再用一個兩層嵌套循環組織出我們需要的最終結果。 (3)在MapReduce中可以使用System.out.println()方法輸出,以方便調試。不過System.out.println()的內容不會在終端顯示,而是輸出到了stdout和stderr這兩個文件中,這兩個文件位于logs/userlogs/attempt_xxx目錄下。可以通過web端的歷史job查看中的“Analyse This Job”來查看stdout和stderr的內容。總結
以上是生活随笔為你收集整理的MapReduce实现join操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VisualBox中增大linux硬盘空
- 下一篇: hive 简单操作搜狗实验室的词频文件