Hadoop源码分析(四)
2021SC@SDUSC
研究內(nèi)容簡略介紹
上周我們分析了org.apache.hadoop.mapreduce.Cluster中的的核心代碼,本周將繼續(xù)對mapreduce部分進(jìn)行分析。在對Cluster類有初步了解的基礎(chǔ)上,我們繼續(xù)分析與Cluster相關(guān)的org.apache.hadoop.mapreduce.ClusterMetrics。
hadoop.mapreduce.ClusterMetrics分析
我們首先來看ClusterMetrics的全局變量與構(gòu)造方法。
@InterfaceAudience.Public @InterfaceStability.Evolving public class ClusterMetrics implements Writable {private int runningMaps;private int runningReduces;private int occupiedMapSlots;private int occupiedReduceSlots;private int reservedMapSlots;private int reservedReduceSlots;private int totalMapSlots;private int totalReduceSlots;private int totalJobSubmissions;private int numTrackers;private int numBlacklistedTrackers;private int numGraylistedTrackers;private int numDecommissionedTrackers;public ClusterMetrics() {}public ClusterMetrics(int runningMaps, int runningReduces,int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots,int reservedReduceSlots, int mapSlots, int reduceSlots,int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers,int numDecommissionedNodes) {this(runningMaps, runningReduces, occupiedMapSlots, occupiedReduceSlots,reservedMapSlots, reservedReduceSlots, mapSlots, reduceSlots,totalJobSubmissions, numTrackers, numBlacklistedTrackers, 0,numDecommissionedNodes);}public ClusterMetrics(int runningMaps, int runningReduces,int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots,int reservedReduceSlots, int mapSlots, int reduceSlots,int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers,int numGraylistedTrackers, int numDecommissionedNodes) {this.runningMaps = runningMaps;this.runningReduces = runningReduces;this.occupiedMapSlots = occupiedMapSlots;this.occupiedReduceSlots = occupiedReduceSlots;this.reservedMapSlots = reservedMapSlots;this.reservedReduceSlots = reservedReduceSlots;this.totalMapSlots = mapSlots;this.totalReduceSlots = reduceSlots;this.totalJobSubmissions = totalJobSubmissions;this.numTrackers = numTrackers;this.numBlacklistedTrackers = numBlacklistedTrackers;this.numGraylistedTrackers = numGraylistedTrackers;this.numDecommissionedTrackers = numDecommissionedNodes;}可以看出在ClusterMetrics中定義了許多的與cluster相關(guān)的變量,例如:
集群的大小。
列入黑名單和退役的跟蹤器數(shù)量。
集群的槽位容量。
當(dāng)前占用/保留的map和reduce槽的數(shù)量。
當(dāng)前運(yùn)行的 map 和 reduce 任務(wù)的數(shù)量。
作業(yè)提交的數(shù)量。
因此不難推測ClusterMetrics主要是用于記錄cluster的相關(guān)信息,提供給用戶。
在官網(wǎng)提供的api中也可以看到對ClusterMetrics類的描述。
Status information on the current state of the Map-Reduce cluster.,即用于記錄Map-Reduce cluster的具體信息,在上次一的博客中我們已經(jīng)學(xué)習(xí)了cluster的相關(guān)內(nèi)容,這里便不展開贅述。
我們還可以看到Clients can query for the latest ClusterMetrics, via Cluster.getClusterStatus(),也就是通過調(diào)用getClusterStatus即可得到對應(yīng)的ClusterMetrics信息。
關(guān)于ClusterMetrics就大致看到這里,接下來我們繼續(xù)學(xué)習(xí)mapreduce中的其他類。
org.apache.hadoop.mapreduce.Counters源碼分析
Counters是mapreduce中極其重要的一個類。
計數(shù)器(Counter)是 MapReduce 應(yīng)用程序報告其統(tǒng)計數(shù)據(jù)的設(shè)施。Mapper 和 Reducer 實(shí)現(xiàn)可以使用計數(shù)器報告統(tǒng)計數(shù)據(jù)。
MapReduce Counter為提供我們一個窗口:觀察MapReduce job運(yùn)行期的各種細(xì)節(jié)數(shù)據(jù)。MapReduce自帶了許多默認(rèn)Counter。
Counters 是全局計數(shù)器,由MapReduce框架或者Application定義。每一個Counter可以是任何枚舉類型。特定計數(shù)器按枚舉類型進(jìn)行分組。
用戶可以在map/reduce方法里通過context.getCounter(Enum<?> counterName)來獲取定義好的計數(shù)器。然后通過counter.increment(long incr)的方式計數(shù)。
MapReduce框架本身就提供了很多內(nèi)置的計數(shù)器,如File System Counters、Job Counters、Map-Reduce Framework、File Input Format Counters、File Output Format Counters。可在運(yùn)行MR任務(wù)后的控制臺打印信息里看到,如下圖。其中SecondarySort Counters是自定義的Counter,不是MR自帶的。
我們首先附上整個Counters類的源碼,以便進(jìn)一步分析。
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.mapreduce;import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.counters.GenericCounter; import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup; import org.apache.hadoop.mapreduce.counters.CounterGroupBase; import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup; import org.apache.hadoop.mapreduce.counters.AbstractCounters; import org.apache.hadoop.mapreduce.counters.CounterGroupFactory; import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;/*** <p><code>Counters</code> holds per job/task counters, defined either by the* Map-Reduce framework or applications. Each <code>Counter</code> can be of* any {@link Enum} type.</p>** <p><code>Counters</code> are bunched into {@link CounterGroup}s, each* comprising of counters from a particular <code>Enum</code> class.*/ @InterfaceAudience.Public @InterfaceStability.Stable public class Counters extends AbstractCounters<Counter, CounterGroup> {// Mix framework group implementation into CounterGroup interfaceprivate static class FrameworkGroupImpl<T extends Enum<T>>extends FrameworkCounterGroup<T, Counter> implements CounterGroup {FrameworkGroupImpl(Class<T> cls) {super(cls);}@Overrideprotected FrameworkCounter<T> newCounter(T key) {return new FrameworkCounter<T>(key, getName());}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}// Mix generic group implementation into CounterGroup interface// and provide some mandatory group factory methods.private static class GenericGroup extends AbstractCounterGroup<Counter>implements CounterGroup {GenericGroup(String name, String displayName, Limits limits) {super(name, displayName, limits);}@Overrideprotected Counter newCounter(String name, String displayName, long value) {return new GenericCounter(name, displayName, value);}@Overrideprotected Counter newCounter() {return new GenericCounter();}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}// Mix file system group implementation into the CounterGroup interfaceprivate static class FileSystemGroup extends FileSystemCounterGroup<Counter>implements CounterGroup {@Overrideprotected Counter newCounter(String scheme, FileSystemCounter key) {return new FSCounter(scheme, key);}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}/*** Provide factory methods for counter group factory implementation.* See also the GroupFactory in* {@link org.apache.hadoop.mapred.Counters mapred.Counters}*/private static class GroupFactoryextends CounterGroupFactory<Counter, CounterGroup> {@Overrideprotected <T extends Enum<T>>FrameworkGroupFactory<CounterGroup>newFrameworkGroupFactory(final Class<T> cls) {return new FrameworkGroupFactory<CounterGroup>() {@Override public CounterGroup newGroup(String name) {return new FrameworkGroupImpl<T>(cls); // impl in this package}};}@Overrideprotected CounterGroup newGenericGroup(String name, String displayName,Limits limits) {return new GenericGroup(name, displayName, limits);}@Overrideprotected CounterGroup newFileSystemGroup() {return new FileSystemGroup();}}private static final GroupFactory groupFactory = new GroupFactory();/*** Default constructor*/public Counters() {super(groupFactory);}/*** Construct the Counters object from the another counters object* @param <C> the type of counter* @param <G> the type of counter group* @param counters the old counters object*/public <C extends Counter, G extends CounterGroupBase<C>>Counters(AbstractCounters<C, G> counters) {super(counters, groupFactory);} }那么Counters類是用來做什么的呢?我們首先來看官方對Class Counters的定義:
可以看出,Counters包括了每個作業(yè)/任務(wù)計數(shù)器,由 Map-Reduce 框架或應(yīng)用程序定義。每個Counter都可以是任何Enum類型。
Counters被聚集成CounterGroups,每個都包含來自特定Enum類的計數(shù)器。
我們看到,Counters繼承自AbstractCounters類
打開AbstractCounters相關(guān)源碼,它其實(shí)是一個抽象類,用于為 mapred 和 mapreduce 包中的 Counters 容器提供通用實(shí)現(xiàn)。
它提供的函數(shù)也不難理解,主要是為了方便計數(shù)器的使用,包括構(gòu)造計數(shù)器、統(tǒng)計組內(nèi)計數(shù)器總數(shù)、返回計數(shù)器名稱等基本功能。
Counters構(gòu)造函數(shù)分析
Counters提供了兩個構(gòu)造函數(shù),分別是有參和無參構(gòu)造:
有參構(gòu)造器利用了我們上面提到的AbstractCounters提供的計數(shù)器構(gòu)造方法,從另一個計數(shù)器對象構(gòu)造新的計數(shù)器對象。別傳入兩個參數(shù),一個是AbstractCounters,一個是舊的counters對象。AbstractCounters <C,G>中的C -指計數(shù)器的類型,G -指計數(shù)器組的類型。
我們上面提到了計數(shù)器組的概念,那么什么是計數(shù)器組呢?通過查詢相關(guān)資料,我了解到
Counter有"組group"的概念,用于表示邏輯上相同范圍的所有數(shù)值。MapReduce job提供的默認(rèn)Counter分為三個組:
Map-Reduce Frameword
Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
File Systems
FileSystem bytes read,FileSystem bytes written
Job Counters
Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
Counters的實(shí)際使用
了解了counters的概念與構(gòu)造函數(shù),那么我們希望進(jìn)一步了解counters的使用方法。
比如,用戶可能想快速實(shí)現(xiàn)文件行數(shù),以及其中錯誤記錄的統(tǒng)計。
為了使用這樣的特性,用戶代碼創(chuàng)建一個叫作 Counter 的對象,并且在適當(dāng)?shù)臅r候,Map 和 Reduce 函數(shù)中增加 Counter 的值。
這些 Counter 的值,會定時從各個單獨(dú)的 Worker 機(jī)器上傳遞給 Master(通過 Ping 的應(yīng)答包傳遞)。
Master 把執(zhí)行成功的 Map 或者 Reduce 任務(wù)的 Counter 值進(jìn)行累計,并且當(dāng) MapReduce 操作完成之后,返回給用戶代碼。
當(dāng)前 Counter 值也會顯示在 Master 的狀態(tài)頁面,這樣用戶可以看到計算現(xiàn)場的進(jìn)度。
當(dāng)累計 Counter 的值的時侯, Master 會檢查是否有對同一個 Map 或者 Reduce 任務(wù)的相同累計,避免重復(fù)累計。
下面的代碼就可以通過counters實(shí)現(xiàn)這樣的一個目標(biāo)。
package com.shockang.study.bigdata.mapreduce.counter;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyMapperWithCounter extends Mapper<LongWritable, Text, LongWritable, IntWritable> {/*** 定義一個枚舉類型*/public static enum FileRecorder {ErrorRecorder,TotalRecorder}@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {if ("error".equals(value.toString())) {/*** 把counter實(shí)現(xiàn)累加*/context.getCounter(FileRecorder.ErrorRecorder).increment(1);}/*** 把counter實(shí)現(xiàn)累加*/context.getCounter(FileRecorder.TotalRecorder).increment(1);} } package com.shockang.study.bigdata.mapreduce.counter;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class JobMain {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();/*** 使NLineInputFormat來分割一個小文件,近而模擬分布式大文件的處理*/configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5);Job job = new Job(configuration, "counter-job");job.setInputFormatClass(NLineInputFormat.class);job.setJarByClass(JobMain.class);job.setMapperClass(MyMapperWithCounter.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));Path outputDir = new Path(args[1]);FileSystem fs = FileSystem.get(configuration);if (fs.exists(outputDir)) {fs.delete(outputDir, true);}FileOutputFormat.setOutputPath(job, outputDir);if (job.waitForCompletion(true)) {System.out.println("Error num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.ErrorRecorder).getValue());System.out.println("Total num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.TotalRecorder).getValue());}} }總結(jié)
本次我們首先分析了ClusterMetrics類,完善了對Cluster集群類的了解,然后開始了對mapreduce核心類counters計數(shù)器的分析,初步探討了它的作用、構(gòu)造函數(shù),并通過編寫簡單的自定義Counters完成了對文件錯誤記錄與全部記錄的統(tǒng)計,加深了了解。
總結(jié)
以上是生活随笔為你收集整理的Hadoop源码分析(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在大型商业银行研发中心发挥PMO作用
- 下一篇: 无稳态多谐振荡器原理