Hadoop入门(十六)Mapreduce的单表关联程序
生活随笔
收集整理的這篇文章主要介紹了
Hadoop入门(十六)Mapreduce的单表关联程序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
"單表關聯"要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘
1 實例描述
給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——祖父母)表
樣例輸入:?
期望輸出:
grandchild??????? grandparent Tom??????????? Alice Tom??????????? Jesse Jone??????????? Alice Jone??????????? Jesse Tom??????????? Mary Tom??????????? Ben Jone??????????? Mary Jone??????????? Ben Philip?????????? ? Alice Philip??????????? Jesse Mark??????????? Alice Mark??????????? Jesse?
2 問題分析
這個實例需要進行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個表。
連接結果中除去連接的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自連接;其次就是連接列的設置;最后是結果的整理
MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結果的key設置成待連接的列,然后列中相同的值就自然會連接在一起了。
3.實現步驟
?
4.關鍵代碼
package com.mk.mapreduce;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Objects;public class JoinSelf {public static class JoinSelfMapper extends Mapper<LongWritable, Text, Text, TableInfo> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (StringUtils.isBlank(value.toString())) {System.out.println("空白行");return;}String[] values = value.toString().split("\\s+");if (values.length < 2 || values[0].equals("child")) {System.out.println("長度不夠的行:" + value.toString());return;}context.write(new Text(values[0]), new TableInfo(TableInfo.PARENT, values[1]));context.write(new Text(values[1]), new TableInfo(TableInfo.CHILD, values[0]));}}public static class JoinSelfReducer extends Reducer<Text, TableInfo, Text, Text> {@Overrideprotected void setup(Context context) throws IOException, InterruptedException {context.write(new Text("grandchild"), new Text("grandparent"));}@Overrideprotected void reduce(Text key, Iterable<TableInfo> values, Context context) throws IOException, InterruptedException {List<String> grandChildren = new LinkedList<>();List<String> grandParents = new LinkedList<>();for (TableInfo v : values) {if (v.getTable() == TableInfo.CHILD) {grandChildren.add(v.value.toString());} else {grandParents.add(v.value.toString());}}if (!grandChildren.isEmpty() && !grandParents.isEmpty()) {grandChildren.sort((a,b)->a.compareTo(b));grandParents.sort((a,b)->a.compareTo(b));for (String grandChild :grandChildren)for (String grandParent : grandParents)context.write(new Text(grandChild), new Text(grandParent));}}}public static class TableInfo implements WritableComparable<TableInfo> {public static final int PARENT = 1;public static final int CHILD = 2;private int table;private Text value;public TableInfo() {}public TableInfo(int table, String value) {this.table = table;this.value = new Text(value);}public int getTable() {return table;}public void setTable(int table) {this.table = table;}public void setValue(Text value) {this.value = value;}@Overridepublic int compareTo(TableInfo o) {int c = table - o.table;if (c != 0)return c;return value.compareTo(o.value);}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(table);this.value.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {this.table = in.readInt();if (this.value == null)this.value = new Text();this.value.readFields(in);}@Overridepublic String toString() {return "TableInfo{" +"table=\'" + table +"\', value=\'" + value +"\'}";}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {String uri = "hdfs://192.168.150.128:9000";String input = "/joinSelf/input";String output = "/joinSelf/output";Configuration conf = new Configuration();if (System.getProperty("os.name").toLowerCase().contains("win"))conf.set("mapreduce.app-submission.cross-platform", "true");FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);Path path = new Path(output);fileSystem.delete(path, true);Job job = new Job(conf, "JoinSelf");job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");job.setJarByClass(JoinSelf.class);job.setMapperClass(JoinSelfMapper.class);job.setReducerClass(JoinSelfReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableInfo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPaths(job, uri + input);FileOutputFormat.setOutputPath(job, new Path(uri + output));boolean ret = job.waitForCompletion(true);System.out.println(job.getJobName() + "-----" + ret);} }?
編程中遇到的問題:
write String到hadoop的問題
read Text類初始化的問題
mapper空白字符\u00A0分割字符串的問題
多個孫子與多祖父母的問題
?
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Hadoop入门(十六)Mapreduce的单表关联程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中小型企业文档管理和工作流程解决方案中小
- 下一篇: 如何安装apk文件电脑如何安装app