Hadoop入门(十)Mapreduce高级shuffle之Sort和Group
一、排序分組概述
MapReduce中排序和分組在哪里被執行
第3步中需要對不同分區中的數據進行排序和分組,默認情況按照key進行排序和分組
?
二、排序
在Hadoop默認的排序算法中,只會針對key值進行排序
任務:
數據文件中,如果按照第一列升序排列,
當第一列相同時,第二列升序排列
如果當第一列相同時,求出第二列的最小值
自定義排序
1.封裝一個自定義類型作為key的新類型:將第一列與第二列都作為key
WritableComparable接口
定義:
public interface WritableComparable<T> extends Writable, Comparable<T> { }自定義類型MyNewKey實現了WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規則,從而實現我們想要的效果
?
2.改寫最初的MapReduce方法函數
public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> {protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split("\t");long firstNum = Long.parseLong(spilted[0]);long secondNum = Long.parseLong(spilted[1]);// 使用新的類型作為key參與排序MyNewKey newKey = new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};} public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}?
三、分組
在Hadoop中的默認分組規則中,也是基于Key進行的,會將相同key的value放到一個集合中去
目的:求出第一列相同時第二列的最小值
上面的例子看分組,因為我們自定義了一個新的key,它是以兩列數據作為key的,因此這6行數據中每個key都不相同產生6組
它們是:1 1,2 1,2 2,3 1,3 2,3 3。
而實際上只可以分為3組,分別是1,2,3。現在首先改寫一下reduce函數代碼
public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long min = Long.MAX_VALUE;for (LongWritable number : values) {long temp = number.get();if (temp < min) {min = temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};}自定義分組
為了針對新的key類型作分組,我們也需要自定義一下分組規則:
private static class MyGroupingComparator implementsRawComparator<MyNewKey> {/** 基本分組規則:按第一列firstNum進行分組*/@Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** @param b1 表示第一個參與比較的字節數組*?* @param s1 表示第一個參與比較的字節數組的起始位置*?* @param l1 表示第一個參與比較的字節數組的偏移量*?* @param b2 表示第二個參與比較的字節數組*?* @param s2 表示第二個參與比較的字節數組的起始位置*?* @param l2 表示第二個參與比較的字節數組的偏移量*/@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,這兩個接口的定義:
public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj); }分組實現步驟:
1.MyGroupingComparator實現這兩個接口
RawComparator中的compare()方法是基于字節的比較,
Comparator中的compare()方法是基于對象的比較
由于在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。
2.添加對分組規則的設置:
// 設置自定義分組規則
? ?job.setGroupingComparatorClass(MyGroupingComparator.class);
3. 運行結果:
?
?
總結
以上是生活随笔為你收集整理的Hadoop入门(十)Mapreduce高级shuffle之Sort和Group的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 杂牌电源对电脑影响大吗杂牌电源对电脑的影
- 下一篇: 推荐一款电脑端软件启动加密软件电脑加密软