hadoop知识整理(2)之MapReduce
之前寫的關于MR的文章的前半部分已丟。
所以下面重點從3個部分來談MR:
1)Job任務執行過程,以及主要進程-ResourceManager和NodeManager作用;
2)shuffle過程;
3)主要代碼;
一、Job任務執行過程
這里是hadoop2.0-ResourceManager的Job的執行過程:
1)run job階段,由提交Job客戶端JVM完成,主要做job環境信息的收集,各個組件類,如Mapper、Reducer類,輸出輸入的K-V類型做檢驗是否合法,并且檢驗輸入hdfs路徑的合法性,還有輸出hdfs目錄是否已經存在,檢測不通過,則Job停止。
2)1階段通過后,Job會獲取一個Application對象,同時給一個應用ID,用于MapReduce的作業ID;
3)再次檢查輸入輸出目錄的合法性,hdfs目錄的合法性,計算作業的輸入分片,如果分片無法計算,作業將不會提交,錯誤將返回給MR客戶端程序,如果沒有問題,將運行作業的所需資源,包括MR程序的JAR文件,配置文件以及輸入分片,復制到一個以應用ID命名的hdfs目錄下的共享文件系統中,JOB的jar的副本較多,所以在運行job時,集群的所有節點都可訪問job的副本;
4)MR客戶端通過調用submitApplication()方法提交Job給RM;
5)RM即資源管理器(ResourceManager)收到Job作業后,并將請求傳遞給YARN的調度器(scheduler),調度器會分配一個容器container,然后資源管理器在節點管理器(NodeManager)中啟動application master進程;
6)application master是一個java應用程序,主類為MRAppMaster,他將接收來自Job的進度和完成報告;
7)application master對Job的初始化,是創建了很多薄記對象,以保持對于job進度的跟蹤,然后他將從hdfs共享存儲中獲得由MR客戶端計算的輸入分片,然后對每一個split創建一個Map任務,以及確定有幾個redece任務;
8)分配資源,application master程序,會計算構成MR的job的所有任務,判斷是在一個節點上進行還是多個節點進行并行計算,簡單來說,通過MR的數量來將這個job定性為小任務還是超級(uber)任務;
小job指的是,少于10個mapper且只有1個reducer,且輸入大小小于一個HDFS塊的job。通過設置mapreduce.job.ubertask.enable設置為true才可確保啟動超級任務作業。
如果非uber任務,application master會向資源管理器RM請求需要的所有容器資源;當然,請求中先為map任務請求,然后是reduce任務,通常,完成有5%的map任務完成之后,為reduce任務請求資源的信息才會發出;
reduce任務可以在集群的任何節點運行,但是map任務盡量本著本地化的策略在進行,盡量減少磁盤的IO操作,通常情況之下,每個map任務和reduce任務都會申請獲得1核的cpu以及1GB的內存,參數可配。
9)一旦RM分配了一個特定節點的容器,那么application master就與該nodeManager進行通信來啟動容器;
10)執行任務的主類為YarnChild,一個JAVA程序,在運行任務之前,首先將需要的資源本地化,從共享的hdfs中取得,包括作業的配置,jar包和其他所有緩存文件等等;
11)執行MR任務。
以上是整個Job的生命周期。
ResourceManager(RM)
RM是一個全局的資源管理器,負責整個系統的資源管理和分配。它主要由兩個組件構成:調度器(Scheduler)和應用程序管理器(Applications Manager,ASM)。
調度器 調度器根據容量、隊列等限制條件(如每個隊列分配一定的資源,最多執行一定數量的作業等),將系統中的資源分配給各個正在運行的應用程序。需要注意的是,該調度器是一個“純調度器”,它不再從事任何與具體應用程序相關的工作,比如不負責監控或者跟蹤應用的執行狀態等,也不負責重新啟動因應用執行失敗或者硬件故障而產生的失敗任務,這些均交由應用程序相關的ApplicationMaster完成。調度器僅根據各個應用程序的資源需求進行資源分配,而資源分配單位用一個抽象概念“資源容器”(Resource Container,簡稱Container)表示,Container是一個動態資源分配單位,它將內存、CPU資源封裝在一起,從而限定每個任務使用的資源量。
應用程序管理器(Applications Manager)負責管理整個系統中所有應用程序,包括應用程序提交、與調度器協商資源以啟動ApplicationMaster、監控ApplicationMaster運行狀態并在失敗時重新啟動它等。
ApplicationMaster(AM)
用戶提交的每個應用程序均包含一個AM,主要功能包括:
與RM調度器協商以獲取資源(用Container表示);
將得到的任務進一步分配給內部的任務(資源的二次分配);
與NM通信以啟動/停止任務;
監控所有任務運行狀態,并在任務運行失敗時重新為任務申請資源以重啟任務。
NodeManager(NM)
NM是每個節點上的資源和任務管理器,一方面,它會定時地向RM匯報本節點上的資源使用情況和各個Container的運行狀態;另一方面,它接收并處理來自AM的Container啟動/停止等各種請求。
Container
Container是YARN中的資源抽象,它封裝了某個節點上的內存、CPU資源,當AM向RM申請資源時,RM為AM返回的資源便是用Container表示。YARN會為每個任務分配一個Container,且該任務只能使用該Container中描述的資源。
?
二、shuffle
MapReduce確保每個reducer的輸入都是按照Key來進行排序的。系統執行排序,且將Map輸出作為輸入給Reduce的過程稱之為shuffer。
? 1)map端在輸出時,會首先輸出到一個內存緩沖區,英文名字為spill,他的默認大小為100M,可以理解為在內存中一部分首尾相連的內存區域,這個內存緩沖區的閾值為80%,可通過mapreduce.task.io.spill.percent參數改變,當map的輸出達到閾值時,會把溢出的舊輸出內容寫入磁盤,新的輸出繼續往緩沖區去寫,至于為什么是80%,是因為內存區的IO遠快于物理磁盤的IO速度,所以在達到閾值時,開始溢寫,如果spill寫滿時,仍未寫到物理磁盤上,那么map會處于wait狀態;
2)而在將map輸出結果寫磁盤之前,會根據最后返回給reduce的數據劃分成對應的分區,且在每個分區中,后臺線程會按照key進行排序,這個時候如果存在一個conbiner,那么conbiner的函數redece是在排序后進行的。運行combiner會使map的輸出結果更加緊湊,因此會減少寫磁盤IO的壓力。
3)這時的疑問在于,當spill內存緩沖區不足以支撐map的輸出時,那么會將輸出溢寫到本地磁盤中,那么map的輸出會有多個磁盤文件,所以,在map任務完成之前,會對他們進行合并排序。
至于存在combine的情況時, 任務會判斷溢出文件的數量,假如溢出文件的數量大于3,那么有必要再對此進行一次combine操作,這個操作的時間是,map任務的最終輸出準備向磁盤上寫時。所以由此判斷,combine可以在map任務中多次執行,也不會影響最終的結果,至于是否再次進行combine操作,那么由map來進行判斷,通過溢出文件的數量來進行判斷,其主要目的時判斷進行combine帶來的開銷是否足夠抵消IO磁盤操作。
4)在這里,map任務的輸出,對這個輸出文件進行壓縮,然后放到磁盤會更好,這是一個典型的節省磁盤IO的有效操作。這樣同樣可以減少通過網絡IO傳輸給reduce的文件大小。壓縮的配置參數為:mapreduce.map.output.compress設置為true,hadoop便會啟動map結果的壓縮功能。
5)至于reduce任務,前面在分析job的執行過程的時候,知道有一個參數會影響application master向RM為reduce申請資源的時間,那便是map任務完成的比率,比率默認是5%,即有5%的map任務完成時,那么reduce任務將開始進行工作。在上圖中,reduce是通過fetch(抓)過來map的輸出結果,其實是通過網絡通信將map的輸出結果復制過來。reduce任務有少量的復制線程,默認值為5個,這5個線程可以從多個執行完畢的map任務中復制過來其輸出結果。而這個線程的數量,可以通過mapreduce.reduce.shuffle.parallelcopies屬性。
6)這里的問題關鍵點在于,reduce任務如何得知map任務已經結束,且從哪里獲得其輸出結果?其實還在于強大的application master,全程負責所有任務的調度工作,當map任務完成后,會通過心跳機制,告知application master,而reduce任務一旦開啟,也會有一個線程,不停輪詢application master的map任務完成情況,這里推測,完成的map任務的網絡主機情況,輸出結果的磁盤存儲情況,會保存在application master的一個對象中,大概率是一個(HashMap),而當reduce取得map的輸出結果之后,并不會馬上刪除此map的結果釋放資源,他會等待application master的通知,這是在整體job完成后執行的。
7)下一步當reduce取得map任務的輸出結果只會,需要進行的就是不停的merge工作。
如果,map的輸出結果非常小,那么直接在reduce任務的jvm內存中進行合并了,但往往這種情況并不會經常發生。
當有很多個map的輸出,且輸出文件都比較大,redece會將map的輸出結果復制到磁盤,如果磁盤上的副本太多了,那么reduce會將這些個文件合并成更大的文件,而之前在于被壓縮的map輸出,都會在內存中被解壓縮。
直到將所有的map復制完畢,那么下一步會進行真正的reduce合并操作。
reduce合并這一塊很有意思,hadoop為了減少磁盤的IO,做了很多構想,很巧妙。
首先有一個指定參數,名字為合并因子,通過:mapreduce.task.io.sort.factor屬性設置,默認為10。
這個因子決定你的map輸出數量合并多少次,假如有40個map的輸出結果,那么將會合并4次。
如上圖所看,一共有40個map輸出,那么hadoop不會每10個文件合并一次,將合并完成的4個文件交給reduce task。
他會第一次合并4個文件形成s1,第二次、三次、四次分別合并10個文件形成s2和s3以及s4,然后它會將s1、s2、s3、s4以及剩下未合并的6個文件直接交給reduce函數。
因為map輸出結果本身為排序狀態,這樣操作可以減少了6個map結果文件的多一次通過磁盤IO進行合并的操作,而hadoop這樣做,也只是為了減少磁盤IO,多用內存。
而做完這個操作之后,reduce調用reduce函數,將輸出結果復用到HDFS之前配置地輸出目錄當中。至此,shuffle結束。
而MR任務的魔幻點就在于shuffle過程,他神奇地將亂序地文件,通過一系列map和reduce操作,通過強大地設計application master地控制中心,完美的完成了整理數據工作。
所以,知曉了MR任務地執行過程和shuffle內容,那么MR任務地優化點也來了,不論是通過參數還是在編碼中刻意進行改變,都會很好地優化MR。
? 推測執行:application master會跟蹤每個task的執行情況,當某個task執行過慢時,會創建出這個task的副本,從而進一步判定task是否存在執行失誤的情況,假如副本task先行執行完成,那么會廢掉原task。
從中體現application master強大的task線程調度能力。而這個參數的配置方法為:mapreduce.map.speculative/reduce.speculative->true。
這個推測執行功能有點過于屌了,但并不推薦使用,因為它是以整個集群的資源為代價的,應該根據具體情況開啟此功能。
參數優化點:
調優的總體綱領為:
1)減少數據傳輸--》增加conbine操作和map輸出壓縮操作;
2)盡量使用內存-》增加spill內存緩沖區的大小-增加map和reduce的jvm內存參數->mapred.child.java.opts,這個是task任務執行時的jvm內存大小;
3)減少磁盤IO-》壓縮map輸出,減少reduce合并次數,即增大合并因子參數;
4)增大任務并行數-》增加reduce的fetch數量,盡量更改此參數數量與map數量一致,達到并行抽取;
5)剩下就是推測執行了,根據集群網絡情況和機器性能進行調優操作。
?jvm調優,jvm重用機制:
1)默認不允許JVM重用;
2)一旦開啟JVM的重用,所有的task都將在一個jvm中執行,簡單表達,即是,所有的包括application master、map task、reduce task都會在一個jvm-container中執行;
3)此種情況適用于小的MR任務,默認為10個及其以下的map任務,1個的reduce任務,且reduce輸入大小為小于一個hdfs文件塊的任務。
4)此種情況依舊適用于海量小文件的情況,減少jvm的頻繁啟停;
對于海量的小文件,應該將多個小文件處理成為一個文件,以減少map的任務數量。
三、部分源碼,主要代碼
MapTask部分,即map小任務部分:
啟動map任務,調用的是其中的run方法
@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;if (isMapTask()) {// If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress.if (conf.getNumReduceTasks() == 0) {mapPhase = getProgress().addPhase("map", 1.0f);} else {// If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%).mapPhase = getProgress().addPhase("map", 0.667f);sortPhase = getProgress().addPhase("sort", 0.333f);}}TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewMapper();initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}if (useNewApi) {runNewMapper(job, splitMetaInfo, umbilical, reporter);} else {runOldMapper(job, splitMetaInfo, umbilical, reporter);}done(umbilical, reporter);}?
這里啟動了taskReporter,向application master報告執行情況,并且初始化了整個Job任務,且在2.0中,調用了runNewMapper方法;
@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapperorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input formatorg.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input splitorg.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output objectif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {output = new NewOutputCollector(taskContext, job, umbilical, reporter);}org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {input.initialize(split, mapperContext);mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}} View Code
?
在代碼中可見, 在這個方法中解析了job中的計算出的,InputSplit信息,這里面封裝了所有的map文件的切片信息,而InputSplit對象的初始化,由?private <T> T getSplitDetails(Path file, long offset)方法獲得。
這個方法里去獲取?T split = deserializer.deserialize(null);切片信息,而切片信息,又通過AvroSerialization獲得,代碼如下,現在就可以串起來,Job客戶端從RM獲取了一個輸入流,而這個輸入流中存儲了map所需輸入文件的切片信息,類似上文講的,從hdfs文件系統中下載文件的過程之一,先從NN節點獲取文件的切片信息:
@Overridepublic T deserialize(T t) throws IOException {return reader.read(t, decoder);} View Code?
?InputSplit中包含了切片信息,拿到本map任務需要的切片后,通過RecordReader,獲取文件內容,然后反射調用程序員縮寫的Mapper類。此項代碼在runNewMapper方法的第722行:
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job); View Code?
然后在最后有如下代碼:
try {input.initialize(split, mapperContext);mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);} View Code?
這里,初始化輸入文件切片,然后run程序員寫的mapper再之后輸出結果,關閉資源。
需要注意的是,NewOutputCollector這個在上文代碼中的作用:
他會每次收集調用map新的kv對,然后將他們spill到內存或者文件中,還可以做進一步的partition和sort和combine操作,當存在reduce的時候,此類代碼如下:
private class NewOutputCollector<K,V>extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {private final MapOutputCollector<K,V> collector;private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;private final int partitions;@SuppressWarnings("unchecked")NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,JobConf job,TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException {collector = createSortingCollector(job, reporter);partitions = jobContext.getNumReduceTasks();if (partitions > 1) {partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);} else {partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {@Overridepublic int getPartition(K key, V value, int numPartitions) {return partitions - 1;}};}}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {collector.collect(key, value,partitioner.getPartition(key, value, partitions));}@Overridepublic void close(TaskAttemptContext context) throws IOException,InterruptedException {try {collector.flush();} catch (ClassNotFoundException cnf) {throw new IOException("can't find class ", cnf);}collector.close();}} View Code?
還有一個MapOutputBuffer需要注意,他是在實例化NewOutputCollector時被創建的:
構造方法:
?private final MapOutputCollector<K,V> collector;
collector = createSortingCollector(job, reporter);
然后在
?
創建了這個buffer對象,在這個對象中
?
ReduceTask部分源碼:
?
轉載于:https://www.cnblogs.com/qfxydtk/p/11167437.html
總結
以上是生活随笔為你收集整理的hadoop知识整理(2)之MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: WPF(Windows Presenta
- 下一篇: 修改浏览器下拉条颜色和粗细