Hadoop教程(四):理解MapReduce、MapReduce计数器和连接、MapReduce Hadoop程序连接数据
本教程中的代碼分為 3 個部分:
解釋 SalesMapper 類
解釋 SalesCountryReducer 類
解釋 SalesCountryDriver 類
SalesMapper類的說明
在本節(jié)中,我們將了解 SalesMapper 類的實現(xiàn)。
我們首先指定類的包名稱。?SalesCountry?就是這個示例中使用的包名。請注意編譯的輸出,SalesMapper.class?將進入目錄并命名這個軟件包名稱:SalesCountry.
其次,我們導入庫軟件包。
以下快照顯示實現(xiàn) SalesMapper 類?
代碼解釋:
1. SalesMapper 類定義
public class SalesMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {...}
每一個 mapper 類必須從 MapReduceBase 類進行擴展,它必須實現(xiàn) Mapper 接口。
2. 定義 'map' 函數(shù)
| 1 2 3 4 | publicvoidmap(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)?throwsIOException |
Mapper類的主要部分是接受四個參數(shù)的 “map()” 方法。
每次調(diào)用?'map()'?方法, 一個鍵值?key-value?對 ('key'?和?'value'?在代碼里) 被傳遞。
'map()'?方法開始被接受拆分輸入文本作為一個參數(shù),并使用分詞來拆分這些行成詞。
| 1 2 | String valueString = value.toString(); String[] SingleCountryData = valueString.split(","); |
這里,“,” 被用作分隔符。
在這之后,使用記錄在數(shù)組??'SingleCountryData'?中的第七索引,其值為?'1'.
????????output.collect(new Text(SingleCountryData[7]), one);
我們在選擇第7索引記錄,因為我們需要的國家數(shù)據(jù),它位于數(shù)組?'SingleCountryData' 的第七索引。
請注意,我們輸入的數(shù)據(jù)是下面的格式 (Country?在索引的位置為:7, ?0 是開始的索引)-
Transaction_date,Product,Price,Payment_Type,Name,City,State,Country,Account_Created,Last_Login,Latitude,Longitude
mapper的輸出使用的是?'OutputCollector'?的?'collect()' 方法的鍵值對.
SalesCountryReducer 類的說明
在本節(jié)中,我們將了解 SalesCountryReducer 類的實現(xiàn)。
1. 我們首先為類指定包的名稱。SalesCountry 是包的名稱。請注意編譯的輸出,?SalesCountryReducer.class?將進入命名這個軟件包名稱目錄:?SalesCountry.
其次,我們導入庫軟件包。
以下快照顯示實現(xiàn) SalesCountryReducer 類
代碼解釋:
1. SalesCountryReducer 類定義 -
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
此處,前兩個數(shù)據(jù)類型,?'Text'?和?'IntWritable'?是輸入鍵值的數(shù)據(jù)類型到reducer。
映射器的輸出的形式<CountryName1, 1>, <CountryName2, 1>.映射器的輸出被輸入到reducer。所以,以配合其數(shù)據(jù)類型,?Text?和?IntWritable?數(shù)據(jù)在這里輸入被使用。
最后兩個數(shù)據(jù)類型,'Text' 和 'IntWritable' 是由 reducer 的鍵 - 值對的形式生成的輸出的數(shù)據(jù)類型。
每個 reducer 類必須從MapReduceBase類進行擴展,它必須實現(xiàn) Reducer 接口。
2. Defining 'reduce' function-
| 1 2 3 | publicvoidreduce( Text t_key, Iterator<IntWritable> values,??OutputCollector<Text,IntWritable> output, Reporter reporter)?throwsIOException { |
輸入到 reduce() 方法是在具有多個值的列表中選擇一個鍵。
例如,在我們的示例中,這將是 -
<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>,<United Arab Emirates, 1>, <United Arab Emirates, 1>, <United Arab Emirates, 1>.
這賦予 reducer 作為?<United Arab Emirates, {1,1,1,1,1,1}>
因此,接受這種形式參數(shù),前兩個數(shù)據(jù)類型的使用,即 Text 和?Iterator<IntWritable>.?Text是一個數(shù)據(jù)類型的鍵 和?Iterator<IntWritable>為對于鍵的值的列表的數(shù)據(jù)類型。
接下來的參數(shù)的類型是?OutputCollector<Text,IntWritable>?它收集 reducer 階段的輸出。
reduce()?方法開始通過復制鍵值和初始化頻率計數(shù)為0。
????????Text key = t_key;
????????int frequencyForCountry = 0;
然后,使用 “while” 循環(huán),我們通過與鍵關(guān)聯(lián)的值列表循環(huán),并通過總結(jié)所有計算的值。
| 1 2 3 4 5 6 | while(values.hasNext()) { // replace type of value with the actual type of our value IntWritable value = (IntWritable) values.next(); frequencyForCountry += value.get(); } |
現(xiàn)在,結(jié)果中的鍵得到的頻率計數(shù)輸出到收集器。
下面的代碼執(zhí)行這個 -
????????output.collect(key, new IntWritable(frequencyForCountry));
SalesCountryDriver類的說明
在本節(jié)中,我們將了解 SalesCountryDriver 類實現(xiàn)。
1. 我們首先為類指定包的名稱。?SalesCountry?是這里使用的包名。請注意編譯的輸出,?SalesCountryDriver.class?將進入命名這個包名稱的目錄:?SalesCountry.
這里一行指定是包名稱后面的代碼是導入庫軟件包。
2. 定義一個用于創(chuàng)建一個新的客戶端工作,配置 Mapper及Reducer 類對象驅(qū)動程序類。
該驅(qū)動程序類負責設(shè)置我們的 MapReduce 作業(yè)在 Hadoop 運行。 在這個類中,我們指定作業(yè)名稱,輸入/輸出,mapper 和 reducer 類名稱的數(shù)據(jù)類型。
3. 在下面的代碼片段中,我們設(shè)置這是用來輸入數(shù)據(jù)集消費和生產(chǎn)輸出,分別輸入和輸出目錄。
arg[0]?和?arg[1]?是通過 MapReduce 的實際操作,也就是賦予在命令行參數(shù)執(zhí)行命令,
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
4. 觸發(fā)我們的作業(yè)
下面的代碼開始執(zhí)行 MapReduce 作業(yè)
try{// Run the jobJobClient.runJob(job_conf);} catch(Exception e) {e.printStackTrace();}
在MapReduce的計數(shù)器是用于收集關(guān)于 MapReduce 工作的統(tǒng)計信息的機制。這個信息在MapReduce的作業(yè)處理的問題的診斷是很有用的。 計數(shù)器類似于將在 map 或 reduce 在代碼日志信息中。
通常情況下,這些計數(shù)器在一個程序(map 或 reduce)中定義,當一個特定事件或條件(特定于該計數(shù)器)發(fā)生執(zhí)行期間遞增。計數(shù)器是一個很好的應(yīng)用來從輸入數(shù)據(jù)集跟蹤有效和無效的記錄。
有兩種類型的計數(shù)器:
1.?Hadoop?內(nèi)置計數(shù)器:?有一些內(nèi)置計數(shù)器存在每個作業(yè)中。下面是內(nèi)置計數(shù)器組:
- MapReduce任務(wù)計數(shù)器?- 收集任務(wù)的具體信息(例如,輸入記錄的數(shù)量)在它的執(zhí)行期間。
- 文件系統(tǒng)計數(shù)器?- 收集信息像由一個任務(wù)讀取或?qū)懭氲淖止?jié)數(shù)
- FileInputFormat計數(shù)器?- 收集通過FileInputFormat讀取的字節(jié)數(shù)的信息
- FileOutputFormat計數(shù)器?- 收集的字節(jié)數(shù)量的信息通過 FileOutputFormat 寫入
- Job?計數(shù)器-?這些計數(shù)器使用 JobTracker。它們收集統(tǒng)計數(shù)據(jù)包括如,任務(wù)發(fā)起了作業(yè)的數(shù)量。
2. 用戶定義的計數(shù)器
除了內(nèi)置的計數(shù)器,用戶可以定義自己的計數(shù)器,通過使用編程語言提供了類似的功能。 例如,在 Java 的枚舉用于定義用戶定義的計數(shù)器。
一個MapClass例子使用計數(shù)器計算缺失和無效值的數(shù)量:
| publicstaticclassMapClass extendsMapReduceBase implementsMapper<LongWritable, Text, Text, Text> { staticenumSalesCounters { MISSING, INVALID }; publicvoidmap ( LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)?throwsIOException { //Input string is split using ',' and stored in 'fields' array String fields[] = value.toString().split(",", -20); //Value at 4th index is country. It is stored in 'country' variable String country = fields[4]; //Value at 8th index is sales data. It is stored in 'sales' variable String sales = fields[8]; if(country.length() ==?0) { reporter.incrCounter(SalesCounters.MISSING,?1); }?elseif(sales.startsWith("\"")) { reporter.incrCounter(SalesCounters.INVALID,?1); }?else{ output.collect(newText(country),?newText(sales +?",1")); } } } |
上面的代碼片段顯示在 Map Reduce 實現(xiàn)計數(shù)器的示例。
在這里,SalesCounters是用“枚舉”定義的計數(shù)器。它被用來計算 MISSING 和 INVALID 的輸入記錄。
在代碼段中,如果 “country” 字段的長度為零那么它的值丟失,因此相應(yīng)的計數(shù)器 SalesCounters.MISSING 遞增。
接下來,如果 “sales” 字段開頭是符號 '' ,則記錄被視為無效。這通過遞增計數(shù)器 SalesCounters.INVALID 來表示。
MapReduce 連接
連接兩個大的數(shù)據(jù)集可以使用 MapReduce Join 來實現(xiàn)。然而,這個過程需要編寫大量的代碼來執(zhí)行實際的連接操作。
連接兩個數(shù)據(jù)集開始是通過比較每個數(shù)據(jù)集的大小。如果因為相比其他數(shù)據(jù)集一個數(shù)據(jù)集小,那么小數(shù)據(jù)集被分布到集群中的每個數(shù)據(jù)節(jié)點。一旦分散,無論是 Mapper 或 Reducer 使用更小的數(shù)據(jù)集進行查找匹配的大型數(shù)據(jù)集的記錄,然后結(jié)合這些記錄,形成輸出記錄。
這取決于在實際連接進行的地方,這個連接分為:
1. 映射端連接 -?當該聯(lián)接是由映射器執(zhí)行的,它稱為映射端鏈接。在這種類型中,聯(lián)結(jié)前的數(shù)據(jù)由映射函數(shù)實際來消耗的處理。它是強制性的,輸入到每個映射是在分區(qū)中的形式,并且是按排序順序。另外,必須有一個相等數(shù)目的分區(qū),它必須由連接鍵進行排序。
2. Reduce端連接-?當連接是通過減速器進行的,稱為reduce端連接。沒有必要在此連接有數(shù)據(jù)集中在以結(jié)構(gòu)化形式(或分區(qū))。
在這里,映射端的處理發(fā)出連接這兩個表的關(guān)鍵字和對應(yīng)的元組。作為該處理的效果,所有的元組相同連接鍵都落在相同的 reducer,然后使用相同的連接鍵連接記錄。
整體處理流程示于下圖。
這里有兩個數(shù)據(jù)集合在兩個不同的文件中,如下所示:
??
DEPT_ID 鍵在這兩個文件中常見的。
目標是使用 MapReduce 加入來組合這些文件。
輸入:?我們的輸入數(shù)據(jù)集是兩個txt文件:DeptName.txt 和 DepStrength.txt
下載輸入文件
前提條件:
- 本教程是在 Linux 上開發(fā) - Ubuntu操作系統(tǒng)
- 已經(jīng)安裝的Hadoop(本教程使用2.7.1版本)
- Java的開發(fā)運行環(huán)境已經(jīng)在系統(tǒng)上安裝(本教程使用的版本是:1.8.0)
在我們開始實際操作之前,使用的用戶 'hduser_'(使用 Hadoop 的用戶)。
yiibai@ubuntu:~$ su hduser_?
步驟
Step 1)?復制 zip 文件到您選擇的位置
hduser_@ubuntu:/home/yiibai$?cp?/home/yiibai/Downloads/MapReduceJoin.tar.gz?/home/hduser_/ hduser_@ubuntu:/home/yiibai$?ls?/home/hduser_/操作過程及結(jié)果如下:
?
Step 2) 解壓縮ZIP文件,使用以下命令:
hduser_@ubuntu:~$?sudo?tar?-xvf?MapReduceJoin.tar.gzStep 3) 進入目錄 MapReduceJoin/
hduser_@ubuntu:~$?cd?MapReduceJoin/Step 4) ?啟動?Hadoop
hduser_@ubuntu:~/MapReduceJoin$?$HADOOP_HOME/sbin/start-dfs.sh hduser_@ubuntu:~/MapReduceJoin$?$HADOOP_HOME/sbin/start-yarn.shStep 5) DeptStrength.txt 和 DeptName.txt 用于此項目的輸入文件
這些文件需要使用以下命令 - 復制到 HDFS 的根目錄下,使用以下命令:
hduser_@ubuntu:~/MapReduceJoin$?$HADOOP_HOME/bin/hdfs?dfs?-copyFromLocal?DeptStrength.txt?DeptName.txt?/Step 6) 使用以下命令 - 運行程序
hduser_@ubuntu:~/MapReduceJoin$?$HADOOP_HOME/bin/hadoop jar MapReduceJoin.jar /DeptStrength.txt /DeptName.txt /output_mapreducejoinStep 7)
在執(zhí)行命令后, 輸出文件 (named 'part-00000') 將會存儲在?HDFS目錄 /output_mapreducejoin?
結(jié)果可以使用命令行界面可以看到:
hduser_@ubuntu:~/MapReduceJoin$?$HADOOP_HOME/bin/hdfs dfs -cat /output_mapreducejoin/part-00000結(jié)果也可以通過 Web 界面查看(這里我的虛擬機的IP是 192.168.1.109),如下圖所示:
現(xiàn)在,選擇 “Browse the filesystem”,并瀏覽到 /output_mapreducejoin
打開?part-r-00000
結(jié)果如下所示,點擊 Download 鏈接下載:
打開下載后的 文件,結(jié)果如下所示:
注:請注意,下一次運行此程序之前,需要刪除輸出目錄 /output_mapreducejoin
$HADOOP_HOME/bin/hdfs dfs -rm -r /output_mapreducejoin
另一種方法是使用不同的名稱作為輸出目錄。
from: http://www.yiibai.com/hadoop/
總結(jié)
以上是生活随笔為你收集整理的Hadoop教程(四):理解MapReduce、MapReduce计数器和连接、MapReduce Hadoop程序连接数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop教程(三):HDFS、Map
- 下一篇: Hadoop教程(五):Flume、Sq