MapReduce算法–了解数据联接第1部分
在本文中,我們繼續執行一系列實現算法的系列,該算法在使用MapReduce進行數據密集型文本處理中找到,這一次討論數據聯接。 雖然我們將討論在Hadoop中聯接數據的技術并提供示例代碼,但在大多數情況下,您可能不會自己編寫代碼來執行聯接。 取而代之的是,使用可以在更高抽象級別工作的工具(例如Hive或Pig)可以更好地完成連接數據。 如果有可以幫助您處理數據的工具,為什么還要花時間學習如何聯接數據呢? 可以說,聯接數據是Hadoop的最大用途之一。 全面了解Hadoop如何執行聯接對于確定使用哪個聯接以及在出現問題時進行調試至關重要。 此外,一旦您完全了解了Hadoop中如何執行不同的聯接,就可以更好地利用Hive和Pig等工具。 最后,在一種情況下,一種工具可能無法滿足您的需求,因此您必須袖手旁觀并自行編寫代碼。
加入的需要
在處理大型數據集時,如果不是必需的話,通過公用密鑰連接數據的需求可能會非常有用。 通過加入數據,您可以進一步獲得洞察力,例如加入時間戳以將事件與一天中的時間關聯起來。 連接數據的需求多種多樣。 我們將在3個單獨的帖子中介紹3種類型的聯接:Reduce-Side聯接,Map-Side聯接和Memory-Backed聯接。 在這一期中,我們將考慮使用Reduce-Side聯接。
減少側面連接
在我們將要討論的聯接模式中,減少端聯接是最容易實現的。 簡化方聯接的直接原因是Hadoop將相同的密鑰發送到相同的reducer,因此默認情況下,數據是為我們組織的。 要執行聯接,我們只需要緩存一個密鑰并將其與傳入密鑰進行比較。 只要鍵匹配,我們就可以結合來自相應鍵的值。 由于所有數據在整個網絡上都經過混洗,因此使用減少側連接進行權衡是性能。 在減少側連接中,我們將考慮兩種不同的方案:一對一和一對多。 我們還將探索不需要跟蹤傳入密鑰的選項; 給定鍵的所有值都將在簡化器中分組在一起。
一對一加入
一對一聯接的情況是數據集“ X”中的值與數據集“ Y”中的值共享一個公共密鑰。 由于Hadoop保證將相等的鍵發送到同一reducer,因此在兩個數據集上進行映射將為我們處理聯接。 由于僅對鍵進行排序,因此值的順序未知。 我們可以使用輔助排序輕松解決這種情況。 我們二級排序的實現方式是用“ 1”或“ 2”標記鍵,以確定值的順序。 我們需要采取一些額外的步驟來實施我們的標記策略。
實現一個WritableComparable
首先,我們需要編寫一個實現WritableComparable接口的類,該接口將用于包裝密鑰。
public class TaggedKey implements Writable, WritableComparable<TaggedKey> {private Text joinKey = new Text();private IntWritable tag = new IntWritable();@Overridepublic int compareTo(TaggedKey taggedKey) {int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());if(compareValue == 0 ){compareValue = this.tag.compareTo(taggedKey.getTag());}return compareValue;}//Details left out for clarity}當我們對TaggedKey類進行排序時,具有相同joinKey值的鍵將在tag字段的值上進行次要排序,以確保我們想要的順序。
編寫自定義分區程序
接下來,我們需要編寫一個自定義分區程序,該分區程序僅在確定復合鍵和數據發送到哪個減速器時才考慮連接鍵:
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {@Overridepublic int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {return taggedKey.getJoinKey().hashCode() % numPartitions;} }至此,我們擁有了連接數據并確保值順序的條件。 但是,當鍵進入reduce()方法時,我們不想跟蹤它們。 我們希望將所有價值觀歸為一體。 為此,我們將使用Comparator ,該Comparator在決定如何對值進行分組時僅考慮聯接鍵。
編寫組比較器
用于分組的比較器如下所示:
public class TaggedJoiningGroupingComparator extends WritableComparator {public TaggedJoiningGroupingComparator() {super(TaggedKey.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TaggedKey taggedKey1 = (TaggedKey)a;TaggedKey taggedKey2 = (TaggedKey)b;return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());} }數據結構
現在,我們需要確定將用于密鑰的哪些數據。 對于我們的樣本數據,我們將使用從Fakenames Generator生成的CSV文件。 第一列是GUID,它將用作我們的聯接鍵。 我們的樣本數據包含諸如姓名,地址,電子郵件,工作信息,信用卡和擁有的汽車之類的信息。 為了演示的目的,我們將使用GUID,名稱和地址字段,并將它們放置在一個結構如下的文件中:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY然后,我們將使用GUID,電子郵件地址,用戶名,密碼和信用卡號字段,然后將其放置在另一個文件中,該文件應類似于:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard, 5265896533330445 aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard,524現在,我們需要有一個Mapper,它將知道如何處理我們的數據以提取正確的聯接鍵并設置正確的標簽。
創建映射器
這是我們的Mapper代碼:
public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {private int keyIndex;private Splitter splitter;private Joiner joiner;private TaggedKey taggedKey = new TaggedKey();private Text data = new Text();private int joinOrder;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));String separator = context.getConfiguration().get("separator");splitter = Splitter.on(separator).trimResults();joiner = Joiner.on(separator);FileSplit fileSplit = (FileSplit)context.getInputSplit();joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {List<String> values = Lists.newArrayList(splitter.split(value.toString()));String joinKey = values.remove(keyIndex);String valuesWithOutKey = joiner.join(values);taggedKey.set(joinKey, joinOrder);data.set(valuesWithOutKey);context.write(taggedKey, data);}}讓我們回顧一下setup()方法中發生的事情。
我們還應該討論map()方法中發生的事情:
因此,我們已經讀入數據,提取了密鑰,設置了連接順序,然后將數據寫回了。 讓我們看一下如何結合數據。
聯接數據
現在讓我們看一下數據如何在化簡器中聯接:
public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {private Text joinedText = new Text();private StringBuilder builder = new StringBuilder();private NullWritable nullKey = NullWritable.get();@Overrideprotected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {builder.append(key.getJoinKey()).append(",");for (Text value : values) {builder.append(value.toString()).append(",");}builder.setLength(builder.length()-1);joinedText.set(builder.toString());context.write(nullKey, joinedText);builder.setLength(0);} }因為帶有“ 1”標簽的密鑰首先到達了還原器,所以我們知道名稱和地址數據是第一個值,而電子郵件,用戶名,密碼和信用卡數據是第二個值。 因此,我們不需要跟蹤任何鍵。 我們只需遍歷值并將它們連接在一起。
一對一加入結果
這是運行我們的一對一MapReduce作業的結果:
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,EstherJGarner@teleworm.us,Waskepter38,noL2ieghie,MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,TimothyDDuncan@einrot.com,Conerse,Gif4Edeiba,MasterCard, 5265896533330445 aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,BrettMRamsey@dayrep.com,Subjecall,AiKoiweihi6,MasterCard, 5243379373546690正如我們可以看到的,以上示例數據中的兩條記錄已合并為一條記錄。 我們已經成功地將GUID,名稱,地址,電子郵件地址,用戶名,密碼和信用卡字段加入到一個文件中。
指定加入順序
此時,我們可能會問如何為多個文件指定連接順序? 答案就在我們的ReduceSideJoinDriver類中,該類充當MapReduce程序的驅動程序。
public class ReduceSideJoinDriver {public static void main(String[] args) throws Exception {Splitter splitter = Splitter.on('/');StringBuilder filePaths = new StringBuilder();Configuration config = new Configuration();config.set("keyIndex", "0");config.set("separator", ",");for(int i = 0; i< args.length - 1; i++) {String fileName = Iterables.getLast(splitter.split(args[i]));config.set(fileName, Integer.toString(i+1));filePaths.append(args[i]).append(",");}filePaths.setLength(filePaths.length() - 1);Job job = Job.getInstance(config, "ReduceSideJoin");job.setJarByClass(ReduceSideJoinDriver.class);FileInputFormat.addInputPaths(job, filePaths.toString());FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));job.setMapperClass(JoiningMapper.class);job.setReducerClass(JoiningReducer.class);job.setPartitionerClass(TaggedJoiningPartitioner.class);job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);job.setOutputKeyClass(TaggedKey.class);job.setOutputValueClass(Text.class);System.exit(job.waitForCompletion(true) ? 0 : 1);} }通過使用分區程序和分組比較器,我們知道第一個值屬于第一個鍵,并且可以用于將Iterable包含的所有其他值連接到給定鍵的reduce()方法中。 現在是時候考慮一??對多聯接了。
一對多加入
好消息是到目前為止,我們已經完成了所有工作,實際上我們可以使用代碼執行一對多連接。 對于一對多聯接,我們可以考慮兩種方法:1)一個包含單個記錄的小文件,另一個包含具有相同鍵的多個記錄的文件,以及2)同樣具有單個記錄的小文件,但是N每個文件包含與第一個文件匹配的記錄的文件數。 主要區別在于,采用第一種方法時,除了前兩個鍵的聯接之外,值的順序將是未知的。 但是,使用第二種方法,我們將“標記”每個聯接文件,以便我們可以控制所有聯接值的順序。 對于我們的示例,第一個文件將保留為我們的GUID名稱-地址文件,并且我們將擁有3個其他文件,其中將包含汽車,雇主和工作描述記錄。 這可能不是最現實的情況,但將用于演示。 以下是在進行聯接之前數據外觀的示例:
//The single person records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY //Automobile records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze 81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5 aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic //Employer records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth 81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods //Job Description records cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk 81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher一對多加入結果
現在,讓我們看一下一對多聯接結果的示例(使用上面的相同值來輔助比較):
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk 81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher結果表明,我們已經能夠成功地以指定順序連接多個值。
結論
我們已經成功演示了如何在MapReduce中執行約簡邊連接。 即使該方法并不太復雜,我們也可以看到在Hadoop中執行聯接可能涉及編寫大量代碼。 雖然學習聯接的工作方式是一項有用的練習,但是在大多數情況下,使用Hive或Pig這樣的工具聯接數據要好得多。 謝謝你的時間。
資源資源
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼和測試
- 愛德華·卡普里奧洛(Edward Capriolo),迪恩·沃普勒(Dean Wampler)和杰森·盧瑟格倫(Jason Rutherglen)的編程蜂巢
- 通過Alan Gates對Pig進行編程
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
翻譯自: https://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html
總結
以上是生活随笔為你收集整理的MapReduce算法–了解数据联接第1部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux查看网卡信息 命令(linux
- 下一篇: 肥东在售楼盘网上备案流程(肥东在售楼盘网