一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...
一:序列化概念
序列化(Serialization)是指把結(jié)構(gòu)化對象轉(zhuǎn)化為字節(jié)流。
反序列化(Deserialization)是序列化的逆過程。即把字節(jié)流轉(zhuǎn)回結(jié)構(gòu)化對象。
Java序列化(java.io.Serializable)
二:Hadoop序列化的特點(diǎn)
(1):序列化格式特點(diǎn):
緊湊:高效使用存儲(chǔ)空間。
快速:讀寫數(shù)據(jù)的額外開銷小。
可擴(kuò)展:可透明地讀取老格式的數(shù)據(jù)。
互操作:支持多語言的交互。
(2):Hadoop的序列化格式:Writable接口
三:Hadoop序列化的作用:
(1):序列化在分布式環(huán)境的兩大作用:進(jìn)程間通信,永久存儲(chǔ)。
(2):Hadoop節(jié)點(diǎn)間通信。
四:Writable接口(實(shí)現(xiàn)序列化的類實(shí)現(xiàn)這個(gè)接口)
(1)Writable接口, 是根據(jù) DataInput 和 DataOutput 實(shí)現(xiàn)的簡單、有效的序列化對象.
(2)MapReduce的任意Key和Value必須實(shí)現(xiàn)Writable接口.
(3)MapReduce的任意key必須實(shí)現(xiàn)WritableComparable接口.
?1:創(chuàng)建一個(gè)FlowBean的實(shí)體類,實(shí)現(xiàn)序列化操作:
1 package com.flowSum; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 9 /*** 10 * 11 * @author Administrator 12 * 1:write 是把每個(gè)對象序列化到輸出流 13 * 2:readFields是把輸入流字節(jié)反序列化 14 * 3:實(shí)現(xiàn)WritableComparable 15 * Java值對象的比較:一般需要重寫toString(),hashCode(),equals()方法 16 * 17 */ 18 public class FlowBean implements Writable{ 19 20 private String phoneNumber;//電話號碼 21 private long upFlow;//上行流量 22 private long downFlow;//下行流量 23 private long sumFlow;//總流量 24 25 26 27 public String getPhoneNumber() { 28 return phoneNumber; 29 } 30 public void setPhoneNumber(String phoneNumber) { 31 this.phoneNumber = phoneNumber; 32 } 33 public long getUpFlow() { 34 return upFlow; 35 } 36 public void setUpFlow(long upFlow) { 37 this.upFlow = upFlow; 38 } 39 public long getDownFlow() { 40 return downFlow; 41 } 42 public void setDownFlow(long downFlow) { 43 this.downFlow = downFlow; 44 } 45 public long getSumFlow() { 46 return sumFlow; 47 } 48 public void setSumFlow(long sumFlow) { 49 this.sumFlow = sumFlow; 50 } 51 52 //為了對象數(shù)據(jù)的初始化方便,加入一個(gè)帶參的構(gòu)造函數(shù) 53 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 54 this.phoneNumber = phoneNumber; 55 this.upFlow = upFlow; 56 this.downFlow = downFlow; 57 this.sumFlow = upFlow + downFlow; 58 } 59 //在反序列化時(shí)候,反射機(jī)制需要調(diào)用空參的構(gòu)造函數(shù),所以定義了一個(gè)空參的構(gòu)造函數(shù) 60 public FlowBean() { 61 } 62 63 //重寫toString()方法 64 @Override 65 public String toString() { 66 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 67 } 68 69 70 //從數(shù)據(jù)流中反序列出對象的數(shù)據(jù) 71 //從數(shù)據(jù)流中讀取字段時(shí)必須和序列化的順序保持一致 72 @Override 73 public void readFields(DataInput in) throws IOException { 74 phoneNumber = in.readUTF(); 75 upFlow = in.readLong(); 76 downFlow = in.readLong(); 77 sumFlow = in.readLong(); 78 79 } 80 81 //將對象數(shù)據(jù)序列化到流中 82 @Override 83 public void write(DataOutput out) throws IOException { 84 out.writeUTF(phoneNumber); 85 out.writeLong(upFlow); 86 out.writeLong(downFlow); 87 out.writeLong(sumFlow); 88 89 } 90 91 92 }創(chuàng)建FlowSumMapper的類實(shí)現(xiàn)Mapper這個(gè)類:
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我們自定義的一種數(shù)據(jù)類型,要在hadoop的各個(gè)節(jié)點(diǎn)之間進(jìn)行傳輸,應(yīng)該遵循h(huán)adoop的序列化 13 * 所以就必須實(shí)現(xiàn)hadoop的相應(yīng)的序列化接口 14 * 2:Text一般認(rèn)為它等價(jià)于java.lang.String的Writable。針對UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行數(shù)據(jù),切分各個(gè)字段,抽取出我們需要的字段:手機(jī)號,上行流量,下行流量 19 //封裝成key-value發(fā)送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行數(shù)據(jù) 25 String line = value.toString(); 26 //切分成各個(gè)字段 27 String[] fields = StringUtils.split(line,"/t"); 28 //拿到手機(jī)號的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封裝數(shù)據(jù)為key-value進(jìn)行輸出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }創(chuàng)建FlowSumReducer類繼承Reducer類:
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Reducer; 7 8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ 9 10 //框架每傳遞一組數(shù)據(jù)<手機(jī)號,{flowbean,flowbean,flowbean...}>調(diào)用一次我們的reduce方法 11 //reduce中的業(yè)務(wù)邏輯就是遍歷values,然后累加求和再輸出 12 @Override 13 protected void reduce(Text key, Iterable<FlowBean> values, Context context) 14 throws IOException, InterruptedException { 15 //上行流量計(jì)數(shù)器和下行流量計(jì)數(shù)器 16 long up_flow_counter = 0; 17 long down_flow_counter = 0; 18 19 //上行流量和下行流量累加求和 20 for(FlowBean bean : values){ 21 up_flow_counter += bean.getUpFlow(); 22 down_flow_counter += bean.getDownFlow(); 23 } 24 25 //將結(jié)果輸出 26 context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter)); 27 28 } 29 30 }創(chuàng)建FlowSumRunner 類繼承Configured實(shí)現(xiàn)Tool,規(guī)范性操作(Job描述和提交類的規(guī)范寫法):
1 package com.flowSum; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 import org.apache.hadoop.util.Tool; 10 import org.apache.hadoop.util.ToolRunner; 11 12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 13 /*** 14 * 15 * @author Administrator 16 * 1:Job描述和提交類的規(guī)范寫法 17 */ 18 public class FlowSumRunner extends Configured implements Tool{ 19 20 21 @Override 22 public int run(String[] args) throws Exception { 23 //創(chuàng)建配置文件 24 Configuration conf = new Configuration(); 25 //獲取一個(gè)作業(yè) 26 Job job = Job.getInstance(conf); 27 28 //設(shè)置整個(gè)job所用的那些類在哪個(gè)jar包 29 job.setJarByClass(FlowSumRunner.class); 30 31 //本job使用的mapper和reducer的類 32 job.setMapperClass(FlowSumMapper.class); 33 job.setReducerClass(FlowSumReducer.class); 34 35 //指定mapper的輸出數(shù)據(jù)key-value類型 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(FlowBean.class); 38 39 //指定reduce的輸出數(shù)據(jù)key-value類型 40 job.setOutputKeyClass(Text.class); 41 job.setOutputValueClass(FlowBean.class); 42 43 //指定要處理的輸入數(shù)據(jù)存放路徑 44 //FileInputFormat是所有以文件作為數(shù)據(jù)源的InputFormat實(shí)現(xiàn)的基類, 45 //FileInputFormat保存作為job輸入的所有文件,并實(shí)現(xiàn)了對輸入文件計(jì)算splits的方法。 46 //至于獲得記錄的方法是有不同的子類——TextInputFormat進(jìn)行實(shí)現(xiàn)的。 47 FileInputFormat.setInputPaths(job, new Path(args[0])); 48 49 //指定處理結(jié)果的輸出數(shù)據(jù)存放路徑 50 FileOutputFormat.setOutputPath(job, new Path(args[1])); 51 52 //將job提交給集群運(yùn)行 53 //job.waitForCompletion(true); 54 //正常執(zhí)行成功返回0,否則返回1 55 return job.waitForCompletion(true) ? 0 : 1; 56 } 57 58 public static void main(String[] args) throws Exception { 59 //規(guī)范性調(diào)用 60 int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); 61 //執(zhí)行結(jié)束退出 62 System.exit(res); 63 } 64 65 }然后打包上傳到虛擬機(jī)上面,還有模擬數(shù)據(jù),過程省略,貼出模擬數(shù)據(jù):
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網(wǎng)站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點(diǎn)統(tǒng)計(jì) 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點(diǎn)統(tǒng)計(jì) 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200可以看到打的包和模擬數(shù)據(jù)已經(jīng)上傳到虛擬機(jī)上:
然后將數(shù)據(jù)上傳到hdfs集群(這里是偽分布式集群)上面:
現(xiàn)在集群上面創(chuàng)建一個(gè)空白的文件夾flow,然后在文件夾里面創(chuàng)建一個(gè)data文件夾存放數(shù)據(jù),最后將數(shù)據(jù)存放到data文件夾里面:
然后執(zhí)行程序,由于是需要傳入?yún)?shù)的,所以注意最后兩個(gè)是參數(shù):
?
然后就報(bào)了一個(gè)這樣子的錯(cuò),我也是一臉懵逼:
?Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
?? ?at java.lang.Class.asSubclass(Class.java:3165)
?? ?at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
?? ?at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
?? ?at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
?? ?at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
?? ?at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
?? ?at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
?? ?at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
?? ?at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
?然后根據(jù)你現(xiàn)在學(xué)的知識肯定已經(jīng)被別人學(xué)過的理論,and一定有好心的大神會(huì)貼出來錯(cuò)誤的心態(tài)百度一下,然后解決問題:
原來是Text的包導(dǎo)錯(cuò)了(還是小心點(diǎn)好。不然夠喝一壺的了)
不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
而是:import org.apache.hadoop.io.Text;
然后打包上傳到虛擬機(jī)上面運(yùn)行,然后你會(huì)發(fā)現(xiàn)這個(gè)錯(cuò)誤:
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
?? ?at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
?? ?at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
?? ?at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
?? ?at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
?? ?at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
?? ?at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
?? ?at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
?? ?at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
?? ?at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
?? ?at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
?? ?at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
?? ?at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
?? ?at java.lang.reflect.Method.invoke(Method.java:606)
?? ?at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
然后你把這個(gè)/flow/output的這個(gè)output文件夾刪除了,因?yàn)檩敵鑫募A是程序自動(dòng)創(chuàng)建的:
?
最后運(yùn)行程序(由于是需要傳入?yún)?shù)的,所以注意最后兩個(gè)是參數(shù)):
?然后就報(bào)數(shù)據(jù)越界的異常,我想可能是測試數(shù)據(jù)不干凈:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
?? ?at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
?? ?at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
?? ?at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
?? ?at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
?? ?at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
?? ?at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162
然后手動(dòng)造了一份數(shù)據(jù),如下所示:
(好吧,后來測試上面的測試數(shù)據(jù)又可以運(yùn)行了,總之多測試幾遍吧,都是坑!!!)
1363157985066 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 241 200 1363157985061 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985062 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 681 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 4681 200 1363157985064 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 5481 4681 200 1363157985065 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 6481 2681 200 1363157985066 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 7481 2481 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 8481 2461 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 281 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 2681 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 24681 200 1363157985069 13726230509 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 681 200 1363157985060 13726230500 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 24681 200 1363157985061 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 81 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985063 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985064 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 2681 200 1363157985065 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 81 24681 200 1363157985067 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 241 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 681 200 1363157985068 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 241 681 200最后將String[] fields = StringUtils.split(line, "\t");修改為了27 String[] fields = StringUtils.split(line, " ");
(后來測試了一下,String[] fields = StringUtils.split(line, "\t");也可以,開始以為空格的大小也影響測試數(shù)據(jù)呢,代碼沒問題,就是測試數(shù)據(jù)的問題。)
1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我們自定義的一種數(shù)據(jù)類型,要在hadoop的各個(gè)節(jié)點(diǎn)之間進(jìn)行傳輸,應(yīng)該遵循h(huán)adoop的序列化 13 * 所以就必須實(shí)現(xiàn)hadoop的相應(yīng)的序列化接口 14 * 2:Text一般認(rèn)為它等價(jià)于java.lang.String的Writable。針對UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行數(shù)據(jù),切分各個(gè)字段,抽取出我們需要的字段:手機(jī)號,上行流量,下行流量 19 //封裝成key-value發(fā)送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行數(shù)據(jù) 25 String line = value.toString(); 26 //切分成各個(gè)字段 27 String[] fields = StringUtils.split(line, " "); 28 //拿到手機(jī)號的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封裝數(shù)據(jù)為key-value進(jìn)行輸出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }打包上傳到虛擬機(jī)上面,然后運(yùn)行(正常運(yùn)行結(jié)果如下所示):
?[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job:? map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job:? map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job:? map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
?? ?File System Counters
?? ??? ?FILE: Number of bytes read=1179
?? ??? ?FILE: Number of bytes written=187971
?? ??? ?FILE: Number of read operations=0
?? ??? ?FILE: Number of large read operations=0
?? ??? ?FILE: Number of write operations=0
?? ??? ?HDFS: Number of bytes read=2467
?? ??? ?HDFS: Number of bytes written=279
?? ??? ?HDFS: Number of read operations=6
?? ??? ?HDFS: Number of large read operations=0
?? ??? ?HDFS: Number of write operations=2
?? ?Job Counters
?? ??? ?Launched map tasks=1
?? ??? ?Launched reduce tasks=1
?? ??? ?Data-local map tasks=1
?? ??? ?Total time spent by all maps in occupied slots (ms)=2691
?? ??? ?Total time spent by all reduces in occupied slots (ms)=2582
?? ??? ?Total time spent by all map tasks (ms)=2691
?? ??? ?Total time spent by all reduce tasks (ms)=2582
?? ??? ?Total vcore-seconds taken by all map tasks=2691
?? ??? ?Total vcore-seconds taken by all reduce tasks=2582
?? ??? ?Total megabyte-seconds taken by all map tasks=2755584
?? ??? ?Total megabyte-seconds taken by all reduce tasks=2643968
?? ?Map-Reduce Framework
?? ??? ?Map input records=23
?? ??? ?Map output records=23
?? ??? ?Map output bytes=1127
?? ??? ?Map output materialized bytes=1179
?? ??? ?Input split bytes=93
?? ??? ?Combine input records=0
?? ??? ?Combine output records=0
?? ??? ?Reduce input groups=10
?? ??? ?Reduce shuffle bytes=1179
?? ??? ?Reduce input records=23
?? ??? ?Reduce output records=10
?? ??? ?Spilled Records=46
?? ??? ?Shuffled Maps =1
?? ??? ?Failed Shuffles=0
?? ??? ?Merged Map outputs=1
?? ??? ?GC time elapsed (ms)=126
?? ??? ?CPU time spent (ms)=1240
?? ??? ?Physical memory (bytes) snapshot=218099712
?? ??? ?Virtual memory (bytes) snapshot=726839296
?? ??? ?Total committed heap usage (bytes)=137433088
?? ?Shuffle Errors
?? ??? ?BAD_ID=0
?? ??? ?CONNECTION=0
?? ??? ?IO_ERROR=0
?? ??? ?WRONG_LENGTH=0
?? ??? ?WRONG_MAP=0
?? ??? ?WRONG_REDUCE=0
?? ?File Input Format Counters
?? ??? ?Bytes Read=2374
?? ?File Output Format Counters
?? ??? ?Bytes Written=279
[root@master hadoop]#
?查看輸出結(jié)果如下所示:
?
總之吧,學(xué)習(xí)新知識,難免各種錯(cuò)誤,靜下心去解決吧。
?2:流量求和統(tǒng)計(jì)排序案例實(shí)踐:
?將Mapper類和Reducer類都寫成靜態(tài)內(nèi)部類(又遇到上面比較騷氣的問題了String[] fields = StringUtils.split(line, "\t");就是跑步起來,各種報(bào)數(shù)組越界異常,郁悶,換成了String[] fields = StringUtils.split(line, " ");就跑起來了,真是一臉懵逼);
1 package com.flowSort; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 18 public class FlowSortMapReduce { 19 20 /*** 21 * mapper靜態(tài)內(nèi)部類 22 * @author Administrator 23 * 24 */ 25 public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ 26 27 //拿到一行數(shù)據(jù),切分出各字段,封裝為一個(gè)flowbean,作為key輸出 28 @Override 29 protected void map(LongWritable key, Text value,Context context) 30 throws IOException, InterruptedException { 31 //獲取到一行數(shù)據(jù) 32 String line = value.toString(); 33 //對這一行數(shù)據(jù)進(jìn)行截取 34 String[] fields = StringUtils.split(line, ""); 35 36 //獲取數(shù)據(jù)里面的數(shù)據(jù) 37 String phoneNumber = fields[0]; 38 long up_flow = Long.parseLong(fields[1]); 39 long down_flow = Long.parseLong(fields[2]); 40 41 //將數(shù)據(jù)進(jìn)行封裝傳遞給reduce 42 context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get()); 43 } 44 45 } 46 47 /*** 48 * reducer的靜態(tài)內(nèi)部類 49 * @author Administrator 50 * 51 */ 52 public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ 53 54 @Override 55 protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context) 56 throws IOException, InterruptedException { 57 58 String phoneNumber = key.getPhoneNumber(); 59 context.write(new Text(phoneNumber), key); 60 } 61 } 62 63 64 /*** 65 * 主方法 66 * @param args 67 * @throws InterruptedException 68 * @throws IOException 69 * @throws ClassNotFoundException 70 */ 71 public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { 72 //創(chuàng)建配置文件 73 Configuration conf = new Configuration(); 74 //獲取一個(gè)作業(yè) 75 Job job = Job.getInstance(conf); 76 77 //設(shè)置整個(gè)job所用的那些類在哪個(gè)jar包 78 job.setJarByClass(FlowSortMapReduce.class); 79 80 //本job使用的mapper和reducer的類 81 job.setMapperClass(FlowSortMapper.class); 82 job.setReducerClass(FlowSortReducer.class); 83 84 //指定mapper的輸出數(shù)據(jù)key-value類型 85 job.setMapOutputKeyClass(FlowBean.class); 86 job.setMapOutputValueClass(NullWritable.class); 87 88 //指定reduce的輸出數(shù)據(jù)key-value類型Text 89 job.setOutputKeyClass(Text.class); 90 job.setOutputValueClass(FlowBean.class); 91 92 //指定要處理的輸入數(shù)據(jù)存放路徑 93 //FileInputFormat是所有以文件作為數(shù)據(jù)源的InputFormat實(shí)現(xiàn)的基類, 94 //FileInputFormat保存作為job輸入的所有文件,并實(shí)現(xiàn)了對輸入文件計(jì)算splits的方法。 95 //至于獲得記錄的方法是有不同的子類——TextInputFormat進(jìn)行實(shí)現(xiàn)的。 96 FileInputFormat.setInputPaths(job, new Path(args[0])); 97 98 //指定處理結(jié)果的輸出數(shù)據(jù)存放路徑 99 FileOutputFormat.setOutputPath(job, new Path(args[1])); 100 101 //將job提交給集群運(yùn)行 102 //job.waitForCompletion(true); 103 //正常執(zhí)行成功返回0,否則返回1 104 System.exit(job.waitForCompletion(true) ? 0 : 1); 105 } 106 107 }?實(shí)體類改造,進(jìn)行總流量排序處理:
1 package com.flowSort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 import org.apache.hadoop.io.WritableComparable; 9 10 /*** 11 * 12 * @author Administrator 13 * 1:write 是把每個(gè)對象序列化到輸出流 14 * 2:readFields是把輸入流字節(jié)反序列化 15 * 3:實(shí)現(xiàn)WritableComparable 16 * Java值對象的比較:一般需要重寫toString(),hashCode(),equals()方法 17 * 18 */ 19 public class FlowBean implements WritableComparable<FlowBean>{ 20 21 22 private String phoneNumber;//電話號碼 23 private long upFlow;//上行流量 24 private long downFlow;//下行流量 25 private long sumFlow;//總流量 26 27 28 29 public String getPhoneNumber() { 30 return phoneNumber; 31 } 32 public void setPhoneNumber(String phoneNumber) { 33 this.phoneNumber = phoneNumber; 34 } 35 public long getUpFlow() { 36 return upFlow; 37 } 38 public void setUpFlow(long upFlow) { 39 this.upFlow = upFlow; 40 } 41 public long getDownFlow() { 42 return downFlow; 43 } 44 public void setDownFlow(long downFlow) { 45 this.downFlow = downFlow; 46 } 47 public long getSumFlow() { 48 return sumFlow; 49 } 50 public void setSumFlow(long sumFlow) { 51 this.sumFlow = sumFlow; 52 } 53 54 //為了對象數(shù)據(jù)的初始化方便,加入一個(gè)帶參的構(gòu)造函數(shù) 55 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 56 this.phoneNumber = phoneNumber; 57 this.upFlow = upFlow; 58 this.downFlow = downFlow; 59 this.sumFlow = upFlow + downFlow; 60 } 61 //在反序列化時(shí)候,反射機(jī)制需要調(diào)用空參的構(gòu)造函數(shù),所以定義了一個(gè)空參的構(gòu)造函數(shù) 62 public FlowBean() { 63 } 64 65 //重寫toString()方法 66 @Override 67 public String toString() { 68 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 69 } 70 71 72 //從數(shù)據(jù)流中反序列出對象的數(shù)據(jù) 73 //從數(shù)據(jù)流中讀取字段時(shí)必須和序列化的順序保持一致 74 @Override 75 public void readFields(DataInput in) throws IOException { 76 phoneNumber = in.readUTF(); 77 upFlow = in.readLong(); 78 downFlow = in.readLong(); 79 sumFlow = in.readLong(); 80 81 } 82 83 //將對象數(shù)據(jù)序列化到流中 84 @Override 85 public void write(DataOutput out) throws IOException { 86 out.writeUTF(phoneNumber); 87 out.writeLong(upFlow); 88 out.writeLong(downFlow); 89 out.writeLong(sumFlow); 90 91 } 92 93 //流量比較的實(shí)現(xiàn)方法 94 @Override 95 public int compareTo(FlowBean o) { 96 97 //大就返回-1,小于等于返回1,進(jìn)行倒序排序 98 return sumFlow > o.sumFlow ? -1 : 1; 99 } 100 101 102 103 }?效果就是這樣,總之問題不斷:
[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job:? map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job:? map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job:? map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
?? ?File System Counters
?? ??? ?FILE: Number of bytes read=822
?? ??? ?FILE: Number of bytes written=187379
?? ??? ?FILE: Number of read operations=0
?? ??? ?FILE: Number of large read operations=0
?? ??? ?FILE: Number of write operations=0
?? ??? ?HDFS: Number of bytes read=635
?? ??? ?HDFS: Number of bytes written=526
?? ??? ?HDFS: Number of read operations=6
?? ??? ?HDFS: Number of large read operations=0
?? ??? ?HDFS: Number of write operations=2
?? ?Job Counters
?? ??? ?Launched map tasks=1
?? ??? ?Launched reduce tasks=1
?? ??? ?Data-local map tasks=1
?? ??? ?Total time spent by all maps in occupied slots (ms)=2031
?? ??? ?Total time spent by all reduces in occupied slots (ms)=2599
?? ??? ?Total time spent by all map tasks (ms)=2031
?? ??? ?Total time spent by all reduce tasks (ms)=2599
?? ??? ?Total vcore-seconds taken by all map tasks=2031
?? ??? ?Total vcore-seconds taken by all reduce tasks=2599
?? ??? ?Total megabyte-seconds taken by all map tasks=2079744
?? ??? ?Total megabyte-seconds taken by all reduce tasks=2661376
?? ?Map-Reduce Framework
?? ??? ?Map input records=21
?? ??? ?Map output records=21
?? ??? ?Map output bytes=774
?? ??? ?Map output materialized bytes=822
?? ??? ?Input split bytes=109
?? ??? ?Combine input records=0
?? ??? ?Combine output records=0
?? ??? ?Reduce input groups=21
?? ??? ?Reduce shuffle bytes=822
?? ??? ?Reduce input records=21
?? ??? ?Reduce output records=21
?? ??? ?Spilled Records=42
?? ??? ?Shuffled Maps =1
?? ??? ?Failed Shuffles=0
?? ??? ?Merged Map outputs=1
?? ??? ?GC time elapsed (ms)=121
?? ??? ?CPU time spent (ms)=700
?? ??? ?Physical memory (bytes) snapshot=218284032
?? ??? ?Virtual memory (bytes) snapshot=726839296
?? ??? ?Total committed heap usage (bytes)=137433088
?? ?Shuffle Errors
?? ??? ?BAD_ID=0
?? ??? ?CONNECTION=0
?? ??? ?IO_ERROR=0
?? ??? ?WRONG_LENGTH=0
?? ??? ?WRONG_MAP=0
?? ??? ?WRONG_REDUCE=0
?? ?File Input Format Counters
?? ??? ?Bytes Read=526
?? ?File Output Format Counters
?? ??? ?Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r--?? 1 root supergroup????????? 0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r--?? 1 root supergroup??????? 526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888?? ?2481?? ?24681?? ?27162
13726230503?? ?2481?? ?24681?? ?27162
13925057413?? ?63?? ?11058?? ?11121
18320173382?? ?18?? ?9531?? ?9549
13502468823?? ?102?? ?7335?? ?7437
13660577991?? ?9?? ?6960?? ?6969
13922314466?? ?3008?? ?3720?? ?6728
13560439658?? ?5892?? ?400?? ?6292
84138413?? ?4116?? ?1432?? ?5548
15013685858?? ?27?? ?3659?? ?3686
15920133257?? ?20?? ?3156?? ?3176
13602846565?? ?12?? ?1938?? ?1950
15989002119?? ?3?? ?1938?? ?1941
13926435656?? ?1512?? ?200?? ?1712
18211575961?? ?12?? ?1527?? ?1539
13560436666?? ?954?? ?200?? ?1154
13480253104?? ?180?? ?200?? ?380
13760778710?? ?120?? ?200?? ?320
13826544101?? ?0?? ?200?? ?200
13926251106?? ?0?? ?200?? ?200
13719199419?? ?0?? ?200?? ?200
[root@master hadoop]#
?
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式 之 适配器模式
- 下一篇: 19.Extjs主页面显示js