MapReduce 进阶:Partitioner 组件
概述
Partitioner 組件可以讓 Map 對 Key 進行分區,從而將不同分區的 Key 交由不同的 Reduce 處理。如果這么說讓你覺得有一些籠統的話,那么本文可能很適合你,因為本文會依據一個具體的實例進行講解。
版權說明
著作權歸作者所有。
 商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
 本文作者:Q-WHai
 發表日期: 2016年6月21日
 本文鏈接:https://qwhai.blog.csdn.net/article/details/51730960
 來源:CSDN
 更多內容:分類 >> 大數據之 Hadoop
 
需求場景
假設我們現在要統計各個省份的男女人數,每個省份的數據單獨保存。而我們的原始數據是這樣的:
Fern girl guangdong Alice girl jiangsu Bunny girl shanghai Amy girl xian Walker boy guangdong Ingram boy shichuang Paul boy shichuang Caroline girl jiangsu Esther girl jiangsu Eve girl tianjing第一個字段為名字,第二個字段為性別,第三個字段為省份。這是其中一個文件中的內容,全部文件的列表如下:
 
Partitioner 組件
這里我并不打算討論 Map 與 Reduce 過程,通過前面的學習,這一點我相信你一定是成竹在胸的。
HashPartitioner
在一般的 MapReduce 過程中,我們知道可以通過 job.setNumReduceTasks(N) 來創建多個 ReducerTask 進行處理任務。可是,這種情況下,系統會調用默認的 Partitioner 也就是 HashPartitioner 來對 Map 的 key 進行分區。
 進入 Hadoop 的源碼,可以看到 HashPartitioner 的實現其實很簡單。如下:
HashPartitioner 對分區的處理過程也就是一個 hash 函數的事,hash 的好處是可以很 key 的分布更加隨機。可是,hash 處理也有一個比較突出的問題,那就是某一個分區中可能會包含了很多不同的 key。
 原因就是因為這里需要對 numReduceTasks 進行取余(取余是必須的,因為 getPartition() 方法的返回值不可以大于 numReduceTasks ),所以你的 hashCode 相差再大也是于事無補。
自定義 Partitioner(Hash)
上面說默認的 HashPartitioner 解決起來會有一些問題,所以這里我們就需要自己定義 Partitioner 組件了。下面是我第一次進行自定義的 Partitioner 組件,也是用到了一個 hashCode()。
public static class WordcountHashPartitioner extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {String location = key.toString().split(":")[0];return Math.abs(location.hashCode() * 127) % numPartitions;} }運行 Hadoop 程序,不出所料也出現相同的問題(key 映射分區的分布不均勻):
 
自定義 Partitioner(非 Hash)
從上圖的結果中可以看到各個文件的內容相差還是挺大的,尤其是其中還有一些文件沒有內容,沒有內容的原因是因為,本該寫入此文件的數據,被分到了其他分區中了,也就被寫入其他文件中了。于是,我修改了 Partitioner 中的代碼,不再使用 hash,而是采用一對一映射的方法。代碼如下( 這里如果你不喜歡使用 switch … case,那么就使用一些重構手法修改它 ):
public static class WordcountHashPartitionerNew extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {String location = key.toString().split(":")[0];switch (location) {case "anhui":return 0;case "beijing":return 1;( ... 此處省略 N 行 ...)case "zhejiang":return 16;}return 0;} }修改之后,再來看執行結果就正常多了。最明顯的一點就是沒有 0 長度的文件了。檢查了其中的文件,也沒有發現異常情況。
 
客戶端調用
public class PartitionerClient {( ... 此處省略 N 行 ...)public static void main(String[] args) throws Exception {PartitionerClient client = new PartitionerClient();client.execute(args);}private void execute(String[] args) throws Exception {( ... 此處省略 N 行 ...) runWordCountJob(inputPath, outputPath);}private int runWordCountJob(String inputPath, String outputPath) throws Exception {( ... 此處省略 N 行 ...)job.setMapperClass(CorePartitioner.CoreMapper.class);job.setCombinerClass(CorePartitioner.CoreReducer.class);job.setPartitionerClass(CorePartitioner.WordcountHashPartitionerNew.class);job.setNumReduceTasks(17);job.setReducerClass(CorePartitioner.CoreReducer.class);( ... 此處省略 N 行 ...)} }這里調用的方式也就兩句話:
job.setPartitionerClass(CorePartitioner.WordcountHashPartitionerNew.class); job.setNumReduceTasks(17);前一句沒什么好說的,與 job.setMapperClass(CorePartitioner.CoreMapper.class) 都是類似的。關于后一句,也就是設置 ReduceTasks 的個數。這個值會傳遞給 getPartition() 的 numPartitions 參數。
其他 Partitioner
查看 Partitioner 的 API 可以看到 Partitioner 的 4 個實現類:
BinaryPartitioner, HashPartitioner, KeyFieldBasedPartitioner, TotalOrderPartitioner- BinaryPartitioner
 - HashPartitioner
 - KeyFieldBasedPartitioner
 - TotalOrderPartitioner
 
征集
如果你也需要使用ProcessOn這款在線繪圖工具,可以使用如下邀請鏈接進行注冊:
 https://www.processon.com/i/56205c2ee4b0f6ed10838a6d
總結
以上是生活随笔為你收集整理的MapReduce 进阶:Partitioner 组件的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Hadoop 核心编程之 HDFS 的文
 - 下一篇: MapReduce 应用:TF-IDF