Mapreduce的序列化和流量统计程序开发
一、Hadoop數據序列化的數據類型
Java數據類型 => Hadoop數據類型
int IntWritable
float FloatWritable
long LongWritable
double ? DoubleWritable
String Text
boolean BooleanWritable
byte ByteWritable
map ? MapWritable
array ArrayWritable
二、Hadoop的序列化
1.什么是序列化?
在java中,序列化接口是Serializable,它下面又實現了很多的序列化接口,所以java的序列化是一個重量級的序列化框架,一個對象被java序列化之后會附帶很多額外的信息(校驗信息、header、繼承體系等),不便于在網絡中進行高效的傳輸,所以Hadoop開發了一套自己的序列化框架——Writable。
?? 序列化就是把內存當中的對象,轉化為字節序列以便于存儲和網絡傳輸;
反序列化是將收到的字節序列或硬盤當中的持續化數據,轉換成內存中的對象。
2.序列化的理解方法(自己悟的,不對勿噴~~)
比如下面流量統計案例中,流量的封裝類FlowBean實現了Writable接口,其中定義了變量upFlow、dwFlow、flowSum;
在Mapper和Reducer類中初始化封裝類FlowBean時,內存會分配空間加載這些對象,而這些對象不便于在網絡中高效的傳輸,這是封裝類FlowBean中的序列化方法將這些對象轉換為字節序列,方便了存儲和傳輸;
當Mapper或Reducer需要將這些對象的字節序列寫出到磁盤時,封裝類FlowBean中的反序列化方法將字節序列轉換為對象,然后寫道磁盤中。
3.序列化特點
序列化與反序列化時分布式數據處理當中經常會出現的,比如hadoop通信是通過遠程調用(rpc)實現的,這個過程就需要序列化。
特點:1)緊湊;
2)快速
3)可擴展
4)可互操作
三、Mapreduce的流量統計程序案例
1.代碼
/*** @author: PrincessHug* @date: 2019/3/23, 23:38* @Blog: https://www.cnblogs.com/HelloBigTable/*/ public class FlowBean implements Writable {private long upFlow;private long dwFlow;private long flowSum;public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}public FlowBean() {}public FlowBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}/*** 序列化* @param out 輸出流* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}/*** 反序列化* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;} }public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取數據String line = value.toString();//切分數據String[] fields = line.split("\t");//封裝數據String phoneNum = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dwFlow = Long.parseLong(fields[fields.length - 2]);//發送數據context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));} }public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {//聚合數據long upFlow_sum = 0;long dwFlow_sum = 0;for (FlowBean f:values){upFlow_sum += f.getUpFlow();dwFlow_sum += f.getDwFlow();}//發送數據context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));} }public class FlowPartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int i) {//獲取用來分區的電話號碼前三位String phoneNum = key.toString().substring(0, 3);//設置分區邏輯int partitionNum = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partitionNum;} } public class FlowCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//獲取配置,定義工具Configuration conf = new Configuration();Job job = Job.getInstance();//設置運行類job.setJarByClass(FlowCountDriver.class);//設置Mapper類及Mapper輸出數據類型job.setMapperClass(FlowCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//設置Reducer類及其輸出數據類型job.setReducerClass(FlowCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//設置自定義分區job.setPartitionerClass(FlowPartitioner.class);job.setNumReduceTasks(5);//設置文件輸入輸出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout"));//返回運行完成if (job.waitForCompletion(true)){System.out.println("運行完畢!");}else {System.out.println("運行出錯!");}} }?
?
?
轉載于:https://www.cnblogs.com/HelloBigTable/p/10590705.html
總結
以上是生活随笔為你收集整理的Mapreduce的序列化和流量统计程序开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ant Design Tabs标签页隐藏
- 下一篇: 高通工具QXDM、QCAT和QPST的使