大数据技术原理与应用实验4——MapReduce初级编程实践
鏈接: 大數(shù)據(jù)技術原理與應用實驗1——熟悉常用的HDFS操作
鏈接: 大數(shù)據(jù)技術原理與應用實驗2——熟悉常用的Hbase操作
鏈接: 大數(shù)據(jù)技術原理與應用實驗3——NoSQL和關系數(shù)據(jù)庫的操作比較
MapReduce初級編程實踐
- 一、實驗目的
- 二、實驗環(huán)境
- 三、實驗內(nèi)容
- (一)編程實現(xiàn)文件合并和去重操作
- 1. 具體內(nèi)容
- 2. 操作過程
- 3. 實驗代碼
- 4. 運行結(jié)果
- (二)編寫程序?qū)崿F(xiàn)對輸入文件的排序
- 1. 具體內(nèi)容
- 2. 操作過程
- 3. 實驗代碼
- 4. 運行結(jié)果
- (三)對給定的表格進行信息挖掘
- 1. 具體內(nèi)容
- 2. 操作過程
- 3. 實驗代碼
- 4. 運行結(jié)果
- 四、實驗總結(jié)
一、實驗目的
(1)通過實驗掌握基本的MapReduce編程方法;
(2)掌握用MapReduce解決一些常見的數(shù)據(jù)處理問題,包括數(shù)據(jù)去重、數(shù)據(jù)排序和數(shù)據(jù)挖掘等。
二、實驗環(huán)境
(1)Linux操作系統(tǒng)(CentOS7.5)
(2)VMware Workstation Pro 15.5
(3)遠程終端工具Xshell7
(4)Xftp7傳輸工具;
(5)Hadoop版本:3.1.3;
(6)HBase版本:2.2.2;
(7)JDK版本:1.8;
(8)Java IDE:Idea;
(9)MySQL版本:5.7;
三、實驗內(nèi)容
(一)編程實現(xiàn)文件合并和去重操作
1. 具體內(nèi)容
對于兩個輸入文件,即文件A和文件B,請編寫MapReduce程序,對兩個文件進行合并,并剔除其中重復的內(nèi)容,得到一個新的輸出文件C。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件A的樣例如下:
輸入文件B的樣例如下:
20170101 y 20170102 y 20170103 x 20170104 z 20170105 y根據(jù)輸入文件A和B合并得到的輸出文件C的樣例如下:
20170101 x 20170101 y 20170102 y 20170103 x 20170104 y 20170104 z 20170105 y 20170105 z 20170106 x2. 操作過程
1.啟動 hadoop:
2. 需要首先刪除HDFS中與當前Linux用戶hadoop對應的input和output目錄(即HDFS中的“/opt/module/hadoop-3.1.3/input”和“/opt/module/hadoop-3.1.3/output”目錄),這樣確保后面程序運行不會出現(xiàn)問題
3. 再在HDFS中新建與當前Linux用戶hadoop對應的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
創(chuàng)建A.txt B.txt,輸入上述內(nèi)容
vim A.txt vim B.txt
4. 將A,B上傳到HDFS中
3. 實驗代碼
package com.xusheng.mapreduce.shiyan;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Merge {/*** @param xusheng* 對A,B兩個文件進行合并,并剔除其中重復的內(nèi)容,得到一個新的輸出文件C*///重載map函數(shù),直接將輸入中的value復制到輸出數(shù)據(jù)的key上public static class Map extends Mapper<Object, Text, Text, Text>{private static Text text = new Text();public void map(Object key, Text value, Context context) throws IOException,InterruptedException{text = value;context.write(text, new Text(""));}}//重載reduce函數(shù),直接將輸入中的key復制到輸出數(shù)據(jù)的key上public static class Reduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{context.write(key, new Text(""));}}public static void main(String[] args) throws Exception{// TODO Auto-generated method stubConfiguration conf = new Configuration();//conf.set("fs.default.name","hdfs://localhost:9000");conf.set("fs.defaultFS","hdfs://hadoop102:8020");String[] otherArgs = new String[]{"/input/test1","/output/test1"}; //* 直接設置輸入?yún)?shù) *//*if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job = Job.getInstance(conf,"Merge and duplicate removal");job.setJarByClass(Merge.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}4. 運行結(jié)果
./bin/hdfs dfs -cat /output/test1/*(二)編寫程序?qū)崿F(xiàn)對輸入文件的排序
1. 具體內(nèi)容
現(xiàn)在有多個輸入文件,每個文件中的每行內(nèi)容均為一個整數(shù)。要求讀取所有文件中的整數(shù),進行升序排序后,輸出到一個新的文件中,輸出的數(shù)據(jù)格式為每行兩個整數(shù),第一個數(shù)字為第二個整數(shù)的排序位次,第二個整數(shù)為原待排列的整數(shù)。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件1的樣例如下:
輸入文件2的樣例如下:
4 16 39 5輸入文件3的樣例如下:
1 45 25根據(jù)輸入文件1、2和3得到的輸出文件如下:
1 1 2 4 3 5 4 12 5 16 6 25 7 33 8 37 9 39 10 40 11 452. 操作過程
1.創(chuàng)建1.txt ,2.txt ,3.txt,輸入上述內(nèi)容
再在HDFS中新建與當前Linux用戶hadoop對應的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
2.將1.txt ,2.txt ,3.txt上傳到HDFS中
./bin/hdfs dfs -put ./1.txt /input/test2/ ./bin/hdfs dfs -put ./2.txt /input/test2/ ./bin/hdfs dfs -put ./3.txt /input/test2/
3. 實驗代碼
package com.xusheng.mapreduce.shiyan;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class MergeSort {/*** @param xusheng* 輸入多個文件,每個文件中的每行內(nèi)容均為一個整數(shù)* 輸出到一個新的文件中,輸出的數(shù)據(jù)格式為每行兩個整數(shù),第一個數(shù)字為第二個整數(shù)的排序位次,第二個整數(shù)為原待排列的整數(shù)*///map函數(shù)讀取輸入中的value,將其轉(zhuǎn)化成IntWritable類型,最后作為輸出keypublic static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{private static IntWritable data = new IntWritable();public void map(Object key, Text value, Context context) throws IOException,InterruptedException{String text = value.toString();data.set(Integer.parseInt(text));context.write(data, new IntWritable(1));}}//reduce函數(shù)將map輸入的key復制到輸出的value上,然后根據(jù)輸入的value-list中元素的個數(shù)決定key的輸出次數(shù),定義一個全局變量line_num來代表key的位次public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{private static IntWritable line_num = new IntWritable(1);public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{for(IntWritable val : values){context.write(line_num, key);line_num = new IntWritable(line_num.get() + 1);}}}//自定義Partition函數(shù),此函數(shù)根據(jù)輸入數(shù)據(jù)的最大值和MapReduce框架中Partition的數(shù)量獲取將輸入數(shù)據(jù)按照大小分塊的邊界,然后根據(jù)輸入數(shù)值和邊界的關系返回對應的Partiton IDpublic static class Partition extends Partitioner<IntWritable, IntWritable>{public int getPartition(IntWritable key, IntWritable value, int num_Partition){int Maxnumber = 65223;//int型的最大數(shù)值int bound = Maxnumber/num_Partition+1;int keynumber = key.get();for (int i = 0; i<num_Partition; i++){if(keynumber<bound * (i+1) && keynumber>=bound * i){return i;}}return -1;}}public static void main(String[] args) throws Exception{// TODO Auto-generated method stubConfiguration conf = new Configuration();//conf.set("fs.default.name","hdfs://localhost:9000");conf.set("fs.defaultFS","hdfs://hadoop102:8020");String[] otherArgs = new String[]{"/input/test2","/output/test2"}; /* 直接設置輸入?yún)?shù) */if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job = Job.getInstance(conf,"Merge and sort");//實例化Merge類job.setJarByClass(MergeSort.class);//設置主類名job.setMapperClass(Map.class);//指定使用上述代碼自定義的Map類job.setReducerClass(Reduce.class);//指定使用上述代碼自定義的Reduce類job.setPartitionerClass(Partition.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);//設定Reduce類輸出的<K,V>,V類型FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//添加輸入文件位置FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//設置輸出結(jié)果文件位置System.exit(job.waitForCompletion(true) ? 0 : 1);//提交任務并監(jiān)控任務狀態(tài)} }4. 運行結(jié)果
./bin/hdfs dfs -cat /output/test2/*(三)對給定的表格進行信息挖掘
1. 具體內(nèi)容
下面給出一個child-parent的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格。
輸入文件內(nèi)容如下:
輸出文件內(nèi)容如下:
grandchild grandparent Steven Alice Steven Jesse Jone Alice Jone Jesse Steven Mary Steven Frank Jone Mary Jone Frank Philip Alice Philip Jesse Mark Alice Mark Jesse2. 操作過程
1.創(chuàng)建child.txt,輸入上述內(nèi)容
再在HDFS中新建與當前Linux用戶hadoop對應的input目錄,即“/opt/module/hadoop-3.1.3/input”目錄
2. 將child.txt上傳到HDFS中
3. 實驗代碼
package com.xusheng.mapreduce.shiyan;import java.io.IOException; import java.util.*;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class simple_data_mining {public static int time = 0;/*** @param xusheng* 輸入一個child-parent的表格* 輸出一個體現(xiàn)grandchild-grandparent關系的表格*///Map將輸入文件按照空格分割成child和parent,然后正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表區(qū)別標志public static class Map extends Mapper<Object, Text, Text, Text>{public void map(Object key, Text value, Context context) throws IOException,InterruptedException{String child_name = new String();String parent_name = new String();String relation_type = new String();String line = value.toString();int i = 0;while(line.charAt(i) != ' '){i++;}String[] values = {line.substring(0,i),line.substring(i+1)};if(values[0].compareTo("child") != 0){child_name = values[0];parent_name = values[1];relation_type = "1";//左右表區(qū)分標志context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));//左表relation_type = "2";context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));//右表}}}public static class Reduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{if(time == 0){ //輸出表頭context.write(new Text("grand_child"), new Text("grand_parent"));time++;}int grand_child_num = 0;String grand_child[] = new String[10];int grand_parent_num = 0;String grand_parent[]= new String[10];Iterator ite = values.iterator();while(ite.hasNext()){String record = ite.next().toString();int len = record.length();int i = 2;if(len == 0) continue;char relation_type = record.charAt(0);String child_name = new String();String parent_name = new String();//獲取value-list中value的childwhile(record.charAt(i) != '+'){child_name = child_name + record.charAt(i);i++;}i=i+1;//獲取value-list中value的parentwhile(i<len){parent_name = parent_name+record.charAt(i);i++;}//左表,取出child放入grand_childif(relation_type == '1'){grand_child[grand_child_num] = child_name;grand_child_num++;}else{//右表,取出parent放入grand_parentgrand_parent[grand_parent_num] = parent_name;grand_parent_num++;}}if(grand_parent_num != 0 && grand_child_num != 0 ){for(int m = 0;m<grand_child_num;m++){for(int n=0;n<grand_parent_num;n++){context.write(new Text(grand_child[m]), new Text(grand_parent[n]));//輸出結(jié)果}}}}}public static void main(String[] args) throws Exception{// TODO Auto-generated method stubConfiguration conf = new Configuration();//conf.set("fs.default.name","hdfs://localhost:9000");conf.set("fs.default.name","hdfs://hadoop102:8020");String[] otherArgs = new String[]{"/input/test3","/output/test3"}; /* 直接設置輸入?yún)?shù) */if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job = Job.getInstance(conf,"Single table join");job.setJarByClass(simple_data_mining.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }4. 運行結(jié)果
./bin/hdfs dfs -cat /output/test3/*
四、實驗總結(jié)
三個實驗的思路:
(1)編程實現(xiàn)文件合并和去重操作
本道題主要目的是去重,我在編寫的時候的思路就是 通過map函數(shù)讀取 key,value 因為我的目的是去重,所以在這里我完全可以把整個數(shù)據(jù)作為一個key,而value我可以不管他 而reduce會接受到的是<key,value-list>形式的數(shù)據(jù),我們只需要輸出他接受到的key就可以了,因為重復的值體現(xiàn)在value-list里面,而key是位移的.
可以從python字典的角度來理解,我們把文檔的每一行作為字典的鍵,出現(xiàn)的次數(shù)作為值,最終我們循環(huán)輸出所有的鍵
(2)編寫程序?qū)崿F(xiàn)對輸入文件的排序
因為MR自帶排序,所以我們只要把輸入的數(shù)字以int的形式交給map map將這個數(shù)字作為key輸出,而rerduce函數(shù)將map輸入的key復制到輸出的value上即可,因為要輸入排序的序號,所以再定義個變量用來記錄輸出數(shù)字的排序即可
(3)對給定的表格進行信息挖掘
本題其實相當于一個表的自身join,但是我們需要轉(zhuǎn)化一下,輸入的文件只是child和parent ,將他正序輸出一次作為右表,反序輸出一次作為左表,這樣就可以完成child parent grand三個字段的兩張表操作,輸出的時候加上兩張表的標識來區(qū)分 reduce函數(shù)則用來取出左表中的child 即為grandchild 再取出右表的parent相當于grandparent即可。
總結(jié)
以上是生活随笔為你收集整理的大数据技术原理与应用实验4——MapReduce初级编程实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 优酷视频整段代理php,thinkphp
- 下一篇: 程序员教程第五版笔记