MapReduce源码分析总结
http://blog.csdn.net/HEYUTAO007/article/details/5725379
參考:?
1 caibinbupt的源代碼分析http://caibinbupt.javaeye.com
2?coderplay的avaeye?
http://coderplay.javaeye.com/blog/295097
http://coderplay.javaeye.com/blog/318602?
3?Javen-Studio 咖啡小屋
http://www.cppblog.com/javenstudio/articles/43073.html
一?MapReduce概述
????Map/Reduce是一個用于大規模數據處理的分布式計算模型,它最初是由Google工程師設計并實現的,Google已經將它完整的MapReduce論文公開發布了。其中對它的定義是,Map/Reduce是一個編程模型(programmingmodel),是一個用于處理和生成大規模數據集(processing and generating large data sets)的相關的實現。用戶定義一個map函數來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數將所有這些中間的有著相同key的values合并起來。很多現實世界中的任務都可用這個模型來表達。
二?MapReduce工作原理
????Map-Reduce框架的運作完全基于<key,value>對,即數據的輸入是一批<key,value>對,生成的結果也是一批<key,value>對,只是有時候它們的類型不一樣而已。Key和value的類由于需要支持被序列化(serialize)操作,所以它們必須要實現Writable接口,而且key的類還必須實現WritableComparable接口,使得可以讓框架對數據集的執行排序操作。
????一個Map-Reduce任務的執行過程以及數據輸入輸出的類型如下所示:
????Map:<k1,v1> ->list<k2,v2>
????Reduce:<k2,list<v2>>?-><k3,v3>
????下面通過一個的例子來詳細說明這個過程。
????WordCount是Hadoop自帶的一個例子,目標是統計文本文件中單詞的個數。假設有如下的兩個文本文件來運行WorkCount程序:
????Hello World Bye World
????Hello Hadoop GoodBye Hadoop
1?map數據輸入
????Hadoop針對文本文件缺省使用LineRecordReader類來實現讀取,一行一個key/value對,key取偏移量,value為行內容。
????如下是map1的輸入數據:
| Key1 | Value1 |
| 0 | Hello World Bye World |
如下是map2的輸入數據:
| Key1 | Value1 |
| 0 | Hello Hadoop GoodBye Hadoop |
2?map輸出/combine輸入
????如下是map1的輸出結果
| Key2 | Value2 |
| Hello | 1 |
| World | 1 |
| Bye | 1 |
| World | 1 |
????如下是map2的輸出結果
| Key2 | Value2 |
| Hello | 1 |
| Hadoop | 1 |
| GoodBye | 1 |
| Hadoop | 1 |
3?combine輸出
????Combiner類實現將相同key的值合并起來,它也是一個Reducer的實現。
????如下是combine1的輸出
| Key2 | Value2 |
| Hello | 1 |
| World | 2 |
| Bye | 1 |
????如下是combine2的輸出
| Key2 | Value2 |
| Hello | 1 |
| Hadoop | 2 |
| GoodBye | 1 |
4?reduce輸出
????Reducer類實現將相同key的值合并起來。
????如下是reduce的輸出
| Key2 | Value2 |
| Hello | 2 |
| World | 2 |
| Bye | 1 |
| Hadoop | 2 |
| GoodBye | 1 |
?
三?MapReduce框架結構
1?角色
1.1?JobTracker
????JobTracker是一個master服務,?JobTracker負責調度job的每一個子任務task運行于TaskTracker上,并監控它們,如果發現有失敗的task就重新運行它。一般情況應該把JobTracker部署在單獨的機器上。
1.2?TaskTracker
????TaskTracker是運行于多個節點上的slaver服務。TaskTracker則負責直接執行每一個task。TaskTracker都需要運行在HDFS的DataNode上,
1.3?JobClient
????每一個job都會在用戶端通過JobClient類將應用程序以及配置參數打包成jar文件存儲在HDFS,并把路徑提交到JobTracker,然后由JobTracker創建每一個Task(即MapTask和ReduceTask)并將它們分發到各個TaskTracker服務中去執行。
???
2?數據結構
????2.1 Mapper和Reducer
????運行于Hadoop的MapReduce應用程序最基本的組成部分包括一個Mapper和一個Reducer類,以及一個創建JobConf的執行程序,在一些應用中還可以包括一個Combiner類,它實際也是Reducer的實現。
????2.2?JobInProgress
????JobClient提交job后,JobTracker會創建一個JobInProgress來跟蹤和調度這個job,并把它添加到job隊列里。JobInProgress會根據提交的job jar中定義的輸入數據集(已分解成FileSplit)創建對應的一批TaskInProgress用于監控和調度MapTask,同時在創建指定數目的TaskInProgress用于監控和調度ReduceTask,缺省為1個ReduceTask。
????2.3?TaskInProgress
????JobTracker啟動任務時通過每一個TaskInProgress來launchTask,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到后會創建對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,作用類似)用于監控和調度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載jobjar,并設置好環境變量后啟動一個獨立的java child進程來執行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。
????2.4?MapTask和ReduceTask
????一個完整的job會自動依次執行Mapper、Combiner(在JobConf指定了Combiner時執行)和Reducer,其中Mapper和Combiner是由MapTask調用執行,Reducer則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據jobjar中定義的輸入數據集按<key1,value1>對讀入,處理完成生成臨時的<key2,value2>對,如果定義了Combiner,MapTask會在Mapper完成調用該Combiner將相同key的值做合并處理,以減少輸出結果集。MapTask的任務全完成即交給ReduceTask進程調用Reducer處理,生成最終結果<key3,value3>對。這個過程在下一部分再詳細介紹。
????下圖描述了Map/Reduce框架中主要組成和它們之間的關系:
?
?
3?流程
????一道MapRedcue作業是通過JobClient.rubJob(job)向master節點的JobTracker提交的, JobTracker接到JobClient的請求后把其加入作業隊列中。JobTracker一直在等待JobClient通過RPC提交作業,而TaskTracker一直通過RPC向?JobTracker發送心跳heartbeat詢問有沒有任務可做,如果有,讓其派發任務給它執行。如果JobTracker的作業隊列不為空, 則TaskTracker發送的心跳將會獲得JobTracker給它派發的任務。這是一道pull過程。slave節點的TaskTracker接到任務后在其本地發起Task,執行任務。以下是簡略示意圖:
?
?
下面詳細介紹一下Map/Reduce處理一個工作的流程。
四JobClient
????在編寫MapReduce程序時通常是上是這樣寫的:
????Configuration conf = new Configuration();//?讀取hadoop配置
????Job job = new Job(conf, "作業名稱"); //?實例化一道作業
????job.setMapperClass(Mapper類型);
????job.setCombinerClass(Combiner類型);
????job.setReducerClass(Reducer類型);
????job.setOutputKeyClass(輸出Key的類型);
????job.setOutputValueClass(輸出Value的類型);
????FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑));
????FileOutputFormat.setOutputPath(job, newPath(輸出hdfs路徑));
????//?其它初始化配置
JobClient.runJob(job);
1配置Job
????JobConf是用戶描述一個job的接口。下面的信息是MapReduce過程中一些較關鍵的定制信息:
2?JobClient.runJob():運行Job并分解輸入數據集
????一個MapReduce的Job會通過JobClient類根據用戶在JobConf類中定義的InputFormat實現類來將輸入的數據集分解成一批小的數據集,每一個小數據集會對應創建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調用FileInputFormat.getSplits()方法生成小數據集,如果判斷數據文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統一打包到jobFile的jar中。
????JobClient然后使用submitJob(job)方法向?master提交作業。submitJob(job)內部是通過submitJobInternal(job)方法完成實質性的作業提交。?submitJobInternal(job)方法首先會向hadoop分布系統文件系統hdfs依次上傳三個文件: job.jar, job.split和job.xml。
????job.xml:?作業配置,例如Mapper,Combiner, Reducer的類型,輸入輸出格式的類型等。
????job.jar: jar包,里面包含了執行此任務需要的各種類,比如?Mapper,Reducer等實現。
????job.split:?文件分塊的相關信息,比如有數據分多少個塊,塊的大小(默認64m)等。
????這三個文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統路徑mapred.system.dir屬性?+ jobid決定。mapred.system.dir屬性默認是/tmp/hadoop-user_name/mapred/system。寫完這三個文 件之后,?此方法會通過RPC調用master節點上的JobTracker.submitJob(job)方法,此時作業已經提交完成。
3提交Job
????jobFile的提交過程是通過RPC模塊(有單獨一章來詳細介紹)來實現的。大致過程是,JobClient類中通過RPC實現的Proxy接口調用JobTracker的submitJob()方法,而JobTracker必須實現JobSubmissionProtocol接口。
????JobTracker創建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態信息,如執行時間、Map和Reduce任務完成的比例等。JobClient會根據這個JobStatus對象創建一個NetworkedJob的RunningJob對象,用于定時從JobTracker獲得執行過程的統計數據來監控并打印到用戶的控制臺。
????與創建Job過程相關的類和方法如下圖所示
?
?
五?JobTracker
????上面已經提到,job是統一由JobTracker來調度的,具體的Task分發給各個TaskTracker節點來執行。下面來詳細解析執行過程,首先先從JobTracker收到JobClient的提交請求開始。
1JobTracker初始化Job
????1.1JobTracker.submitJob()?收到請求
????當JobTracker接收到新的job請求(即submitJob()函數被調用)后,會創建一個JobInProgress對象并通過它來管理和調度任務。JobInProgress在創建的時候會初始化一系列與任務有關的參數,調用到FileSystem,把在JobClient端上傳的所有任務文件下載到本地的文件系統中的臨時目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。
????1.2JobTracker.JobInitThread?通知初始化線程
????JobTracker?中的監聽器類EagerTaskInitializationListener負責任務Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一個專門管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。resortInitQueue方法根據作業的優先級排序。然后調用notifyAll()函數,會喚起一個用于初始化job的線程JobInitThread來處理。JobInitThread收到信號后即取出最靠前的job,即優先級別最高的job,調用TaskTrackerManager的initJob最終調用JobInProgress.initTasks()執行真正的初始化工作。
????1.3JobInProgress.initTasks()?初始化TaskInProgress
????任務Task分兩種: MapTask?和reduceTask,它們的管理對象都是TaskInProgress?。
????首先JobInProgress會創建Map的監控對象。在initTasks()函數里通過調用JobClient的readSplitFile()獲得已分解的輸入數據的RawSplit列表,然后根據這個列表創建對應數目的Map執行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應的所有在HDFS里的blocks所在的DataNode節點的host,這個會在RawSplit創建時通過FileSplit的getLocations()函數獲取,該函數會調用DistributedFileSystem的getFileCacheHints()獲得(這個細節會在HDFS中講解)。當然如果是存儲在本地文件系統中,即使用LocalFileSystem時當然只有一個location即“localhost”了。
????創建這些TaskInProgress對象完畢后,initTasks()方法會通過createCache()方法為這些TaskInProgress對象產生一個未執行任務的Map緩存nonRunningMapCache。slave端的TaskTracker向master發送心跳時,就可以直接從這個cache中取任務去執行。
????其次JobInProgress會創建Reduce的監控對象,這個比較簡單,根據JobConf里指定的Reduce數目創建,缺省只創建1個Reduce任務。監控和調度Reduce任務的是TaskInProgress類,不過構造方法有所不同,TaskInProgress會根據不同參數分別創建具體的MapTask或者ReduceTask。同樣地,initTasks()也會通過createCache()方法產生nonRunningReduceCache成員。
????JobInProgress創建完TaskInProgress后,最后構造JobStatus并記錄job正在執行中,然后再調用JobHistory.JobInfo.logStarted()記錄job的執行日志。到這里JobTracker里初始化job的過程全部結束。
?
?
2?JobTracker調度Job
????hadoop默認的調度器是FIFO策略的JobQueueTaskScheduler,它有兩個成員變量jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一個監聽器類,它包含了一個映射,用來管理和調度所有的JobInProgress。jobAdded(job)同時會加入job到JobQueueJobInProgressListener中的映射。
????JobQueueTaskScheduler最重要的方法是assignTasks,他實現了工作調度。具體實現:JobTracker?接到TaskTracker的heartbeat()?調用后,首先會檢查上一個心跳響應是否完成,是沒要求啟動或重啟任務,如果一切正常,則會處理心跳。首先它會檢查?TaskTracker?端還可以做多少個?map?和?reduce?任務,將要派發的任務數是否超出這個數,是否超出集群的任務平均剩余可負載數。如果都沒超出,則為此TaskTracker 分配一個 MapTask 或 ReduceTask 。產生 Map 任務使用 JobInProgress 的obtainNewMapTask() 方法,實質上最后調用了 JobInProgress 的 findNewMapTask() 訪問nonRunningMapCache 。
????上面講解任務初始化時說過,createCache()方法會在網絡拓撲結構上掛上需要執行的TaskInProgress。findNewMapTask()從近到遠一層一層地尋找,首先是同一節點,然后在尋找同一機柜上的節點,接著尋找相同數據中心下的節點,直到找了maxLevel層結束。這樣的話,在JobTracker給TaskTracker派發任務的時候,可以迅速找到最近的TaskTracker,讓它執行任務。
????最終生成一個Task類對象,該對象被封裝在一個LanuchTaskAction中,發回給TaskTracker,讓它去執行任務。
????產生?Reduce?任務過程類似,使用JobInProgress.obtainNewReduceTask()?方法,實質上最后調用了JobInProgress?的?findNewReduceTask()?訪問?nonRuningReduceCache。
?
六?TaskTracker
1TaskTracker加載Task到子進程
????Task的執行實際是由TaskTracker發起的,TaskTracker會定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等。如果發現有自己需要執行的新任務也會在這時啟動,即是在TaskTracker調用JobTracker的heartbeat()方法時進行,此調用底層是通過IPC層調用Proxy接口實現。下面一一簡單介紹下每個步驟。
????1.1TaskTracker.run()?連接JobTracker
????TaskTracker的啟動過程會初始化一系列參數和服務,然后嘗試連接JobTracker(即必須實現InterTrackerProtocol接口),如果連接斷開,則會循環嘗試連接JobTracker,并重新初始化所有成員和參數。
????1.2TaskTracker.offerService()?主循環
????如果連接JobTracker服務成功,TaskTracker就會調用offerService()函數進入主執行循環中。這個循環會每隔10秒與JobTracker通訊一次,調用transmitHeartBeat(),獲得HeartbeatResponse信息。然后調用HeartbeatResponse的getActions()函數獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數組。再遍歷這個數組,如果是一個新任務指令即LaunchTaskAction則調用調用addToTaskQueue加入到待執行隊列,否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執行KillJobAction或者KillTaskAction等。
????1.3TaskTracker.transmitHeartBeat()?獲取JobTracker指令
在transmitHeartBeat()函數處理中,TaskTracker會創建一個新的TaskTrackerStatus對象記錄目前任務的執行狀況,檢查目前執行的Task數目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設置heartbeat()的askForNewTask參數為true。然后通過IPC接口調用JobTracker的heartbeat()方法發送過去,heartbeat()返回值TaskTrackerAction數組。
????1.4?TaskTracker.addToTaskQueue,交給TaskLauncher處理
????TaskLauncher是用來處理新任務的線程類,包含了一個待運行任務的隊列 tasksToLaunch。TaskTracker.addToTaskQueue會調用TaskTracker的registerTask,創建TaskInProgress對象來調度和監控任務,并把它加入到runningTasks隊列中。同時將這個TaskInProgress加到tasksToLaunch中,并notifyAll()喚醒一個線程運行,該線程從隊列tasksToLaunch取出一個待運行任務,調用TaskTracker的startNewTask運行任務。
????1.5?TaskTracker.startNewTask()?啟動新任務
????調用localizeJob()真正初始化Task并開始執行。
????1.6?TaskTracker.localizeJob()?初始化job目錄等
????此函數主要任務是初始化工作目錄workDir,再將job jar包從HDFS復制到本地文件系統中,調用RunJar.unJar()將包解壓到工作目錄。然后創建一個RunningJob并調用addTaskToJob()函數將它添加到runningJobs監控隊列中。addTaskToJob方法把一個任務加入到該任務屬于的runningJob的tasks列表中。如果該任務屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調用launchTaskForJob()開始執行Task。
????1.7?TaskTracker.launchTaskForJob()執行任務
????啟動Task的工作實際是調用TaskTracker$TaskInProgress的launchTask()函數來執行的。
????1.8?TaskTracker$TaskInProgress.launchTask()執行任務
????執行任務前先調用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調用Task的createRunner()方法創建TaskRunner對象并調用其start()方法最后啟動Task獨立的java執行子進程。
????1.9?Task.createRunner()創建啟動Runner對象
????Task有兩個實現版本,即MapTask和ReduceTask,它們分別用于創建Map和Reduce任務。MapTask會創建MapTaskRunner來啟動Task子進程,而ReduceTask則創建ReduceTaskRunner來啟動。
????1.10?TaskRunner.start()啟動子進程
????TaskRunner負責將一個任務放到一個進程里面來執行。它會調用run()函數來處理,主要的工作就是初始化啟動java子進程的一系列環境變量,包括設定工作目錄workDir,設置CLASSPATH環境變量等。然后裝載job jar包。JvmManager用于管理該TaskTracker上所有運行的Task子進程。每一個進程都是由JvmRunner來管理的,它也是位于單獨線程中的。JvmManager的launchJvm方法,根據任務是map還是reduce,生成對應的JvmRunner并放到對應JvmManagerForType的進程容器中進行管理。JvmManagerForType的reapJvm()
分配一個新的JVM進程。如果JvmManagerForType槽滿,就尋找idle的進程,如果是同Job的直接放進去,否則殺死這個進程,用一個新的進程代替。?如果槽沒有滿,那么就啟動新的子進程。生成新的進程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner線程的run方法,run方法用于生成一個新的進程并運行它,具體實現是調用runChild。
2?子進程執行MapTask
????真實的執行載體,是Child,它包含一個?main函數,進程執行,會將相關參數傳進來,它會拆解這些參數,通過getTask(jvmId)向父進程索取任務,并且構造出相關的Task實例,然后使用Task的run()啟動任務。
????2.1?run
????方法相當簡單,配置完系統的TaskReporter后,就根據情況執行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或執行Mapper。由于MapReduce現在有兩套API,MapTask需要支持這兩套API,使得MapTask執行Mapper分為runNewMapper和runOldMapper,我們分析runOldMapper。
????2.2?runOldMapper
????runOldMapper最開始部分是構造Mapper處理的InputSplit,然后就開始創建Mapper的RecordReader,最終得到map的輸入。之后構造Mapper的輸出,是通過MapOutputCollector進行的,也分兩種情況,如果沒有Reducer,那么,用DirectMapOutputCollector,否則,用MapOutputBuffer。
????構造完Mapper的輸入輸出,通過構造配置文件中配置的MapRunnable,就可以執行Mapper了。目前系統有兩個MapRunnable:MapRunner和MultithreadedMapRunner。MapRunner是單線程執行器,比較簡單,他會使用反射機制生成用戶定義的Mapper接口實現類,作為他的一個成員。
????2.3?MapRunner的run方法
????會先創建對應的key,value對象,然后,對InputSplit的每一對<key,value>,調用用戶實現的Mapper接口實現類的map方法,每處理一個數據對,就要使用OutputCollector收集每次處理kv對后得到的新的kv對,把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。
????2.4?OutputCollector
????OutputCollector的作用是收集每次調用map后得到的新的kv對,寧把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。
?
????MapOutputCollector?有兩個子類:MapOutputBuffer和DirectMapOutputCollector。???DirectMapOutputCollector用在不需要Reduce階段的時候。如果Mapper后續有reduce任務,系統會使用MapOutputBuffer做為輸出,?MapOutputBuffer使用了一個緩沖區對map的處理結果進行緩存,放在內存中,又使用幾個數組對這個緩沖區進行管理。
?
?
?
在適當的時機,緩沖區中的數據會被spill到硬盤中。
?
?
?
????向硬盤中寫數據的時機:
????(1)當內存緩沖區不能容下一個太大的kv對時。spillSingleRecord方法。
????(2)內存緩沖區已滿時。SpillThread線程。
????(3)Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。
?
????2.5?spillThread線程:將緩沖區中的數據spill到硬盤中。
????(1)需要spill時調用函數sortAndSpill,按照partition和key做排序。默認使用的是快速排序QuickSort。
????(2)如果沒有combiner,則直接輸出記錄,否則,調用CombinerRunner的combine,先做combin然后輸出。
3?子進程執行ReduceTask
????ReduceTask.run方法開始和MapTask類似,包括initialize()初始化,runJobCleanupTask(),runJobSetupTask(),runTaskCleanupTask()。之后進入正式的工作,主要有這么三個步驟:Copy、Sort、Reduce。
?
????3.1?Copy
????就是從執行各個Map任務的服務器那里,收羅到map的輸出文件。拷貝的任務,是由ReduceTask.ReduceCopier?類來負責。
?
????3.1.1?類圖:
?
???????????????????????????
?
????3.1.2?流程:?使用ReduceCopier.fetchOutputs開始
?
????(1)索取任務。使用GetMapEventsThread線程。該線程的run方法不停的調用getMapCompletionEvents方法,該方法又使用RPC調用TaskUmbilicalProtocol協議的getMapCompletionEvents,方法使用所屬的jobID向其父TaskTracker詢問此作業個Map任務的完成狀況(TaskTracker要向JobTracker詢問后再轉告給它...)。返回一個數組TaskCompletionEventevents[]。TaskCompletionEvent包含taskid和ip地址之類的信息。?(2)當獲取到相關Map任務執行服務器的信息后,有一個線程MapOutputCopier開啟,做具體的拷貝工作。 它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。MapOutputCopier的run循環調用copyOutput,copyOutput又調用getMapOutput,使用HTTP遠程拷貝。
????(3)getMapOutput遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。
????(4) 同時,還有一個內存Merger線程InMemFSMergeThread和一個文件Merger線程LocalFSMerger在同步工作,它們將下載過來的文件(可能在內存中,簡單的統稱為文件...),做著歸并排序,以此,節約時間,降低輸入文件的數量,為后續的排序工作減 負。InMemFSMergeThread的run循環調用doInMemMerge,該方法使用工具類Merger實現歸并,如果需要combine,則combinerRunner.combine。
?
????3.2 Sort
????排序工作,就相當于上述排序工作的一個延續。它會在所有的文件都拷貝完畢后進行。使用工具類Merger歸并所有的文件。經過這一個流程,一個合并了所有所需Map任務輸出文件的新文件產生了。而那些從其他各個服務器網羅過來的?Map任務輸出文件,全部刪除了。
?
????3.3Reduce
????Reduce任務的最后一個階段。他會準備好keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和Comparator(“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)。最后調用runOldReducer方法。(也是兩套API,我們分析runOldReducer)
????3.3.1 runOldReducer
????(1)輸出方面。
它會準備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統,基本都是分布式文件系統,或者說是HDFS。
????(2)輸入方面,ReduceTask會用準備好的KeyClass、ValueClass、KeyComparator等等之類的自定義類,構造出Reducer所需的鍵類型,和值的迭代類型Iterator(一個鍵到了這里一般是對應一組值)。
????(3)有了輸入,有了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。
?
?
總結
以上是生活随笔為你收集整理的MapReduce源码分析总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 通用的Java hashCode重写方案
- 下一篇: sort command