Mapreduce自定义数据类型
生活随笔
收集整理的這篇文章主要介紹了
Mapreduce自定义数据类型
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Hadoop自帶的數據類型:
Intwritable,LongWritable,Text,xxWritable.
?
某些情況下:使用自定義的數據類型方便一些(類似java中的pojo)。
實現:
實現writableComparable接口即可。
場景例如:
成績表:由語文,數學,英文組成。
上傳到hdfs上score目錄下一個score.txt文件--文件內容如下:
想讓按照總成績進行排名。如果成績相同,則按照語文,數學,英文來排序。
一、自定義ScoreWritable實現writableComparable接口:
package com.day07;import org.apache.hadoop.io.WritableComparable;import java.io.*; public class ScoreWritable implements WritableComparable<ScoreWritable> {int chinese;int math;int english;int sum;public ScoreWritable() {}public ScoreWritable(int chinese, int math, int english) {this.chinese = chinese;this.math = math;this.english = english;this.sum=chinese+english+math;}@Overridepublic String toString() {return "ScoreWritable{" +"chinese=" + chinese +", math=" + math +", english=" + english +", sum=" + sum +'}';}public int getChinese() {return chinese;}public void setChinese(int chinese) {this.chinese = chinese;}public int getMath() {return math;}public void setMath(int math) {this.math = math;}public int getEnglish() {return english;}public void setEnglish(int english) {this.english = english;}public int getSum() {return sum;}public void setSum(int sum) {this.sum = sum;}//比較public int compareTo(ScoreWritable that) {//先比較總成績if (this.sum>that.getSum()){return -1;}else if(this.sum<that.getSum()){return 1;}else{if (this.chinese>that.getChinese()){return -1;}else if (this.chinese<that.getChinese()){return 1;}else {return -(this.math-that.getMath());}}}//序列化--dataOutput(data流):可以自定義序列化對象,節省空間,hadoop用的就是這個流public void write(DataOutput out) throws IOException {out.writeInt(chinese);out.writeInt(math);out.writeInt(english);out.writeInt(sum);}//反序列化public void readFields(DataInput in) throws IOException {this.chinese = in.readInt();this.math = in.readInt();this.english = in.readInt();this.sum = in.readInt();} }注意:
最好實現toString方法。
?
二、編寫ScoreJob類用于測試自定義的ScoreWritable
package com.day07;import com.day03.MaxSaleJob; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ScoreJob {public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//super.map(key, value, context);String[] grades = value.toString().split(",");ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2]));context.write(score,NullWritable.get());}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration coreSiteConf = new Configuration();coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));//設置一個任務Job job = Job.getInstance(coreSiteConf, "score");//設置job的運行類job.setJarByClass(ScoreJob.class);//mrdemo/target/mrdemo-1.0-SNAPSHOT.jar//job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar");//設置Map和Reduce處理類job.setMapperClass(ScoreMapper.class);//map輸出類型job.setMapOutputKeyClass(ScoreWritable.class);job.setMapOutputValueClass(NullWritable.class);//設置job/reduce輸出類型/*job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);*///設置任務的輸入路徑FileInputFormat.addInputPath(job, new Path("/score/"));FileSystem fileSystem = FileSystem.get(coreSiteConf);if(fileSystem.exists(new Path("/out/"))){fileSystem.delete(new Path("/out/"),true);};FileOutputFormat.setOutputPath(job, new Path("/out/"));//運行任務boolean flag = job.waitForCompletion(true);if(flag){FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000"));byte[] buffer = new byte[1024];IOUtils.readFully(open,buffer,0,open.available());System.out.println(new String(buffer));}} }三、測試結果,類似于一下內容
轉載于:https://www.cnblogs.com/pigdata/p/10305597.html
總結
以上是生活随笔為你收集整理的Mapreduce自定义数据类型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 并发编程之多线程基础-守护线程与非守护线
- 下一篇: NOS跨分区灾备设计与实现