hadoop2.7之Mapper/reducer源码分析
一切從示例程序開始:
示例程序
Hadoop2.7 提供的示例程序WordCount.java
package org.apache.hadoop.examples;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {//繼承泛型類Mapperpublic static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{//定義hadoop數據類型IntWritable實例one,并且賦值為1private final static IntWritable one = new IntWritable(1);//定義hadoop數據類型Text實例wordprivate Text word = new Text();//實現map函數 public void map(Object key, Text value, Context context) throws IOException, InterruptedException {//Java的字符串分解類,默認分隔符“空格”、“制表符(‘\t’)”、“換行符(‘\n’)”、“回車符(‘\r’)”StringTokenizer itr = new StringTokenizer(value.toString());//循環(huán)條件表示返回是否還有分隔符。while (itr.hasMoreTokens()) {/*nextToken():返回從當前位置到下一個分隔符的字符串word.set()Java數據類型與hadoop數據類型轉換*/word.set(itr.nextToken());//hadoop全局類context輸出函數write; context.write(word, one);}}}//繼承泛型類Reducerpublic static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {//實例化IntWritableprivate IntWritable result = new IntWritable();//實現reducepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;//循環(huán)values,并記錄單詞個數for (IntWritable val : values) {sum += val.get();}//Java數據類型sum,轉換為hadoop數據類型result result.set(sum);//輸出結果到hdfs context.write(key, result);}}public static void main(String[] args) throws Exception {//實例化ConfigurationConfiguration conf = new Configuration();/*GenericOptionsParser是hadoop框架中解析命令行參數的基本類。getRemainingArgs();返回數組【一組路徑】*//*函數實現public String[] getRemainingArgs() {return (commandLine == null) ? new String[]{} : commandLine.getArgs();}*/String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();//如果只有一個路徑,則輸出需要有輸入路徑和輸出路徑if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}//實例化jobJob job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);/*指定CombinerClass類這里很多人對CombinerClass不理解*/job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);//rduce輸出Key的類型,是Textjob.setOutputKeyClass(Text.class);// rduce輸出Value的類型job.setOutputValueClass(IntWritable.class);//添加輸入路徑for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}//添加輸出路徑 FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));//提交jobSystem.exit(job.waitForCompletion(true) ? 0 : 1);} }1.Mapper
將輸入的鍵值對映射到一組中間的鍵值對。
映射將獨立的任務的輸入記錄轉換成中間的記錄。裝好的中間記錄不需要和輸入記錄保持同一種類型。一個給定的輸入對可以映射成0個或者多個輸出對。
Hadoop Map-Reduce框架為每個job產生的輸入格式(InputFormat)的InputSplit產生一個映射task。Mapper實現類通過JobConfigurable#configure(JobConf)獲取job的JobConf,并初始化自己。類似的,它們使用Closeable#close()方法消耗初始化。
然后,框架為該任務的InputSplit中的每個鍵值對調用map(Object, Object, OutputCollector, Reporter)方法。
所有關聯到給定輸出的中間值隨后由框架分組,并傳到Reducer來確定最終的輸出。用戶可通過指定一個比較器Compator來控制分組,Compator的指定通過JobConf#setOutputKeyComparatorClass(Class)完成。
分組的Mapper輸出每個Reducer一個分區(qū)。用戶可以通過實現自定義的分區(qū)來控制哪些鍵(和記錄)到哪個Reducer。
用戶可以選擇指定一個Combiner,通過JobConf#setCombinerClass(Class),來執(zhí)行本地中間輸出的聚合,它可以幫助減少數據從Mapper到Reducer數據轉換的數量。
中間、分組的輸出保存在SequeceFile文件中,應用可以指定中間輸出是否和怎么樣壓縮,壓縮算法可以通過JobConf來設置CompressionCodec。
若job沒有reducer,Mapper的輸出直接寫到FileSystem,而不會根據鍵分組。
示例:
public class MyMapper<K extends WritableComparable, V extends Writable> extends MapReduceBase implements Mapper<K, V, K, V> {static enum MyCounters { NUM_RECORDS }private String mapTaskId;private String inputFile;private int noRecords = 0;public void configure(JobConf job) { mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); inputFile = job.get(JobContext.MAP_INPUT_FILE); } public void map(K key, V val, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Process the <key, value> pair (assume this takes a while) // ... // ... // Let the framework know that we are alive, and kicking! // reporter.progress(); // Process some more // ... // ... // Increment the no. of <key, value> pairs processed ++noRecords; // Increment counters reporter.incrCounter(NUM_RECORDS, 1); // Every 100 records update application-level status if ((noRecords%100) == 0) { reporter.setStatus(mapTaskId + " processed " + noRecords + " from input-file: " + inputFile); } // Output the result output.collect(key, val); } }
上述應用自定義一個MapRunnable來對map處理過程進行更多的控制:如多線程Mapper等等。
或者示例:
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }應用可以重新(org.apache.hadoop.mapreduce.Mapper.Context)的run方法來來對映射處理進行更精確的控制,例如多線程的Mapper等等。
Mapper的方法:
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)throws IOException;該方法將一個單獨的鍵值對輸入映射成一個中間鍵值對。
輸出鍵值對不需要和輸入鍵值對的類型保持一致,一個給定的數據鍵值對可以映射到0個或者多個輸出鍵值對。輸出鍵值對可以通過OutputCollector#collect(Object,Object)獲得的。
應用可以使用Reporter提供處理報告或者僅僅是標示它們的存活。在一個應用需要相當多的時間來處理單獨的鍵值對的場景中,Report就非常重要了,因為框架可能認為task已經超期,并殺死那個task。避免這種情況的辦法是設置mapreduce.task.timeout到一個足夠大的值(或者設置為0表示永遠不會超時)。
mapper的層次結構:
2.Reducer
? 將一組共享一個鍵的中間值減少到一小組值。
?用戶通過JobConf#setNumReducerTask(int)方法來設置job的Reducer的數目。Reducer的實現類通過JobConfigurable#configure(JobConf)方法來獲取job,并初始化它們。類似的,可通過Closeable#close()方法來消耗初始化。
Reducer有是3個主要階段:
第一階段:洗牌,Reducer的輸入是Mapper的分組輸出。在這個階段,每個Reducer通過http獲取所有Mapper的相關分區(qū)的輸出。
第二階段:排序,在這個階段,框架根據鍵(因不同的Mapper可能產生相同的Key)將Reducer進行分組。洗牌和排序階段是同步發(fā)生的,例如:當取出輸出時,將合并它們。
二次排序,若分組中間值等價的鍵規(guī)則和reduce之前鍵分組的規(guī)則不同時,那么其中之一可以通過JobConf#setOutputValueGroupingComparator(Class)來指定一個Comparator。
JobConf#setOutputKeyComparatorClass(Class)可以用來控制中間鍵分組,可以用在模擬二次排序的值連接中。
示例:若你想找出重復的web網頁,并將他們全部標記為“最佳”網址的示例。你可以這樣創(chuàng)建job:
Map輸入的鍵:url
Map輸入的值:document
Map輸出的鍵:document checksum,url pagerank
Map輸出的值:url
分區(qū):通過checksum
? ? ? 輸出鍵比較器:通過checksum,然后是pagerank降序。
輸出值分組比較器:通過checksum
Reduce
在此階段,為在分組書中的每個<key,value數組>對調用reduce(Object, Iterator, OutputCollector, Reporter)方法。
reduce task的輸出通常寫到寫到文件系統(tǒng)中,方法是:OutputCollector#collect(Object, Object)。
Reducer的輸出結果沒有重新排序。
示例:
public class MyReducer<K extends WritableComparable, V extends Writable> extends MapReduceBase implements Reducer<K, V, K, V> {static enum MyCounters { NUM_RECORDS }private String reduceTaskId;private int noKeys = 0;public void configure(JobConf job) {reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);}public void reduce(K key, Iterator<V> values,OutputCollector<K, V> output, Reporter reporter)throws IOException {// Processint noValues = 0;while (values.hasNext()) {V value = values.next();// Increment the no. of values for this key++noValues;// Process the <key, value> pair (assume this takes a while)// ...// ...// Let the framework know that we are alive, and kicking!if ((noValues%10) == 0) {reporter.progress();}// Process some more// ...// ...// Output the <key, value> output.collect(key, value);}// Increment the no. of <key, list of values> pairs processed++noKeys;// Increment countersreporter.incrCounter(NUM_RECORDS, 1);// Every 100 keys update application-level statusif ((noKeys%100) == 0) {reporter.setStatus(reduceTaskId + " processed " + noKeys);}}}?下圖來源:http://x-rip.iteye.com/blog/1541914
3. Job
3.1 上述示例程序最關鍵的一句:job.waitForCompletion(true)
/*** Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}3.2 提交的過程
/*** Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);setUseNewAPI();connect();final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {return submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}連接過程:
private synchronized void connect()throws IOException, InterruptedException, ClassNotFoundException {if (cluster == null) {cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {public Cluster run()throws IOException, InterruptedException, ClassNotFoundException {return new Cluster(getConfiguration());}});}}其中,
ugi定義在JobContextImpl.java中:
/**
* The UserGroupInformation object that has a reference to the current user
*/
protected UserGroupInformation ugi;
Cluster類提供了一個訪問map/reduce集群的接口:
public static enum JobTrackerStatus {INITIALIZING, RUNNING};private ClientProtocolProvider clientProtocolProvider;private ClientProtocol client;private UserGroupInformation ugi;private Configuration conf;private FileSystem fs = null;private Path sysDir = null;private Path stagingAreaDir = null;private Path jobHistoryDir = null;4.?JobSubmitter
/*** Internal method for submitting jobs to the system.* * <p>The job submission process involves:* <ol>* <li>* Checking the input and output specifications of the job.* </li>* <li>* Computing the {@link InputSplit}s for the job.* </li>* <li>* Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary.* </li>* <li>* Copying the job's jar and configuration to the map-reduce system* directory on the distributed file-system. * </li>* <li>* Submitting the job to the <code>JobTracker</code> and optionally* monitoring it's status.* </li>* </ol></p>* @param job the configuration to submit* @param cluster the handle to the Cluster* @throws ClassNotFoundException* @throws InterruptedException* @throws IOException*/JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specs checkSpecs(job);Configuration conf = job.getConfiguration();addMRFrameworkToDistributedCache(conf);Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfsInetAddress ip = InetAddress.getLocalHost();if (ip != null) {submitHostAddress = ip.getHostAddress();submitHostName = ip.getHostName();conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}JobID jobId = submitClient.getNewJobID();job.setJobID(jobId);Path submitJobDir = new Path(jobStagingArea, jobId.toString());JobStatus status = null;try {conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");// get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS): SHUFFLE_KEY_LENGTH;keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(keyLen);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}copyAndConfigureFiles(job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the jobLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir);conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps);// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue);conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job. TokenCache.cleanUpTokenReferral(conf);if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it existsReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)// printTokens(jobId, job.getCredentials()); status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status != null) {return status;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete(submitJobDir, true);}}}上面所說,job的提交有如下過程:
1. 檢查job的輸入/輸出規(guī)范
2. 計算job的InputSplit
3. 如需要,計算job的DistributedCache所需要的前置計算信息
4. 復制job的jar和配置文件到分布式文件系統(tǒng)的map-reduce系統(tǒng)目錄
5. 提交job到JobTracker,還可以監(jiān)視job的執(zhí)行狀態(tài)。
若當前JobClient (0.22 hadoop) 運行在YARN.則job提交任務運行在YARNRunner
?Hadoop Yarn 框架原理及運作機制
主要步驟
- 作業(yè)提交
- 作業(yè)初始化
- 資源申請與任務分配
- 任務執(zhí)行
具體步驟
??? 在運行作業(yè)之前,Resource Manager和Node Manager都已經啟動,所以在上圖中,Resource Manager進程和Node Manager進程不需要啟動
?
- 1. 客戶端進程通過runJob(實際中一般使用waitForCompletion提交作業(yè))在客戶端提交Map Reduce作業(yè)(在Yarn中,作業(yè)一般稱為Application應用程序)
- 2. 客戶端向Resource Manager申請應用程序ID(application id),作為本次作業(yè)的唯一標識
- 3. 客戶端程序將作業(yè)相關的文件(通常是指作業(yè)本身的jar包以及這個jar包依賴的第三方的jar),保存到HDFS上。也就是說Yarn based MR通過HDFS共享程序的jar包,供Task進程讀取
- 4. 客戶端通過runJob向ResourceManager提交應用程序
- 5.a/5.b. Resource Manager收到來自客戶端的提交作業(yè)請求后,將請求轉發(fā)給作業(yè)調度組件(Scheduler),Scheduler分配一個Container,然后Resource Manager在這個Container中啟動Application Master進程,并交由Node Manager對Application Master進程進行管理
- 6. Application Master初始化作業(yè)(應用程序),初始化動作包括創(chuàng)建監(jiān)聽對象以監(jiān)聽作業(yè)的執(zhí)行情況,包括監(jiān)聽任務匯報的任務執(zhí)行進度以及是否完成(不同的計算框架為集成到YARN資源調度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架為了運行在Yarn之上,它們都提供了ApplicationMaster)
- 7. Application Master根據作業(yè)代碼中指定的數據地址(數據源一般來自HDFS)進行數據分片,以確定Mapper任務數,具體每個Mapper任務發(fā)往哪個計算節(jié)點,Hadoop會考慮數據本地性,本地數據本地性、本機架數據本地性以及最后跨機架數據本地性)。同時還會計算Reduce任務數,Reduce任務數是在程序代碼中指定的,通過job.setNumReduceTask顯式指定的
- 8.如下幾點是Application Master向Resource Manager申請資源的細節(jié)
- 8.1 Application Master根據數據分片確定的Mapper任務數以及Reducer任務數向Resource Manager申請計算資源(計算資源主要指的是內存和CPU,在Hadoop Yarn中,使用Container這個概念來描述計算單位,即計算資源是以Container為單位的,一個Container包含一定數量的內存和CPU內核數)。
- 8.2 Application Master是通過向Resource Manager發(fā)送Heart Beat心跳包進行資源申請的,申請時,請求中還會攜帶任務的數據本地性等信息,使得Resource Manager在分配資源時,不同的Task能夠分配到的計算資源盡可能滿足數據本地性
- 8.3 Application Master向Resource Manager資源申請時,還會攜帶內存數量信息,默認情況下,Map任務和Reduce任務都會分陪1G內存,這個值是可以通過參數mapreduce.map.memory.mb and mapreduce.reduce.memory.mb進行修改。
5.?YARNRunner
@Overridepublic JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {addHistoryToken(ts);// Construct necessary information to start the MR AMApplicationSubmissionContext appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts);// Submit to ResourceManagertry { ApplicationId applicationId =resMgrDelegate.submitApplication(appContext);ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);String diagnostics =(appMaster == null ?"application report is null" : appMaster.getDiagnostics());if (appMaster == null|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {throw new IOException("Failed to run job : " +diagnostics);}return clientCache.getClient(jobId).getJobStatus(jobId);} catch (YarnException e) {throw new IOException(e);}}調用YarnClient的submitApplication()方法,其實現如下:
6.?YarnClientImpl
@Overridepublic ApplicationIdsubmitApplication(ApplicationSubmissionContext appContext)throws YarnException, IOException {ApplicationId applicationId = appContext.getApplicationId();if (applicationId == null) {throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");}SubmitApplicationRequest request =Records.newRecord(SubmitApplicationRequest.class);request.setApplicationSubmissionContext(appContext);// Automatically add the timeline DT into the CLC// Only when the security and the timeline service are both enabledif (isSecurityEnabled() && timelineServiceEnabled) {addTimelineDelegationToken(appContext.getAMContainerSpec());}//TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request);int pollCount = 0;long startTime = System.currentTimeMillis();EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW,YarnApplicationState.NEW_SAVING,YarnApplicationState.SUBMITTED);EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED,YarnApplicationState.KILLED); while (true) {try {ApplicationReport appReport = getApplicationReport(applicationId);YarnApplicationState state = appReport.getYarnApplicationState();if (!waitingStates.contains(state)) {if(failToSubmitStates.contains(state)) {throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics());}LOG.info("Submitted application " + applicationId);break;}long elapsedMillis = System.currentTimeMillis() - startTime;if (enforceAsyncAPITimeout() &&elapsedMillis >= asyncApiPollTimeoutMillis) {throw new YarnException("Timed out while waiting for application " +applicationId + " to be submitted successfully");}// Notify the client through the log every 10 poll, in case the client// is blocked here too long.if (++pollCount % 10 == 0) {LOG.info("Application submission is not finished, " +"submitted application " + applicationId +" is still in " + state);}try {Thread.sleep(submitPollIntervalMillis);} catch (InterruptedException ie) {LOG.error("Interrupted while waiting for application "+ applicationId+ " to be successfully submitted.");}} catch (ApplicationNotFoundException ex) {// FailOver or RM restart happens before RMStateStore saves// ApplicationStateLOG.info("Re-submit application " + applicationId + "with the " +"same ApplicationSubmissionContext");rmClient.submitApplication(request);}}return applicationId;}?
7.?ClientRMService
ClientRMService是resource manager的客戶端接口。這個模塊處理從客戶端到resource mananger的rpc接口。
@Overridepublic SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException {ApplicationSubmissionContext submissionContext = request.getApplicationSubmissionContext();ApplicationId applicationId = submissionContext.getApplicationId();// ApplicationSubmissionContext needs to be validated for safety - only// those fields that are independent of the RM's configuration will be// checked here, those that are dependent on RM configuration are validated// in RMAppManager. String user = null;try {// Safetyuser = UserGroupInformation.getCurrentUser().getShortUserName();} catch (IOException ie) {LOG.warn("Unable to get the current user.", ie);RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,ie.getMessage(), "ClientRMService","Exception in submitting application", applicationId);throw RPCUtil.getRemoteException(ie);}// Check whether app has already been put into rmContext,// If it is, simply return the responseif (rmContext.getRMApps().get(applicationId) != null) {LOG.info("This is an earlier submitted application: " + applicationId);return SubmitApplicationResponse.newInstance();}if (submissionContext.getQueue() == null) {submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);}if (submissionContext.getApplicationName() == null) {submissionContext.setApplicationName(YarnConfiguration.DEFAULT_APPLICATION_NAME);}if (submissionContext.getApplicationType() == null) {submissionContext.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);} else {if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {submissionContext.setApplicationType(submissionContext.getApplicationType().substring(0,YarnConfiguration.APPLICATION_TYPE_LENGTH));}}try {// call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user);LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user);RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,"ClientRMService", applicationId);} catch (YarnException e) {LOG.info("Exception in submitting application with id " +applicationId.getId(), e);RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,e.getMessage(), "ClientRMService","Exception in submitting application", applicationId);throw e;}SubmitApplicationResponse response = recordFactory.newRecordInstance(SubmitApplicationResponse.class);return response;}調用RMAppManager來直接提交application
@SuppressWarnings("unchecked")protected void submitApplication(ApplicationSubmissionContext submissionContext, long submitTime,String user) throws YarnException {ApplicationId applicationId = submissionContext.getApplicationId();RMAppImpl application =createAndPopulateNewRMApp(submissionContext, submitTime, user);ApplicationId appId = submissionContext.getApplicationId();if (UserGroupInformation.isSecurityEnabled()) {try {this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,parseCredentials(submissionContext),submissionContext.getCancelTokensWhenComplete(),application.getUser());} catch (Exception e) {LOG.warn("Unable to parse credentials.", e);// Sending APP_REJECTED is fine, since we assume that the// RMApp is in NEW state and thus we haven't yet informed the// scheduler about the existence of the applicationassert application.getState() == RMAppState.NEW;this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, e.getMessage()));throw RPCUtil.getRemoteException(e);}} else {// Dispatcher is not yet started at this time, so these START events// enqueued should be guaranteed to be first processed when dispatcher// gets started.this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.START));}}8.RMAppManager
@SuppressWarnings("unchecked")protected void submitApplication(ApplicationSubmissionContext submissionContext, long submitTime,String user) throws YarnException {ApplicationId applicationId = submissionContext.getApplicationId();RMAppImpl application =createAndPopulateNewRMApp(submissionContext, submitTime, user);ApplicationId appId = submissionContext.getApplicationId();if (UserGroupInformation.isSecurityEnabled()) {try {this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,parseCredentials(submissionContext),submissionContext.getCancelTokensWhenComplete(),application.getUser());} catch (Exception e) {LOG.warn("Unable to parse credentials.", e);// Sending APP_REJECTED is fine, since we assume that the// RMApp is in NEW state and thus we haven't yet informed the// scheduler about the existence of the applicationassert application.getState() == RMAppState.NEW;this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, e.getMessage()));throw RPCUtil.getRemoteException(e);}} else {// Dispatcher is not yet started at this time, so these START events// enqueued should be guaranteed to be first processed when dispatcher// gets started.this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.START));}}9. 異步增加Application--DelegationTokenRenewer
/*** Asynchronously add application tokens for renewal.* @param applicationId added application* @param ts tokens* @param shouldCancelAtEnd true if tokens should be canceled when the app is* done else false. * @param user user*/public void addApplicationAsync(ApplicationId applicationId, Credentials ts,boolean shouldCancelAtEnd, String user) { processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(applicationId, ts, shouldCancelAtEnd, user));}調用如下:
private void processDelegationTokenRenewerEvent(DelegationTokenRenewerEvent evt) {serviceStateLock.readLock().lock();try {if (isServiceStarted) {renewerService.execute(new DelegationTokenRenewerRunnable(evt));} else {pendingEventQueue.add(evt);}} finally {serviceStateLock.readLock().unlock();}}從上面可以看到,通過鎖形式來讓線程池來處理事件或者放入到事件隊列中中。
新啟一個線程:
@Overridepublic void run() {if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {DelegationTokenRenewerAppSubmitEvent appSubmitEvt =(DelegationTokenRenewerAppSubmitEvent) evt; handleDTRenewerAppSubmitEvent(appSubmitEvt);} else if (evt.getType().equals(DelegationTokenRenewerEventType.FINISH_APPLICATION)) {DelegationTokenRenewer.this.handleAppFinishEvent(evt);}}?
@SuppressWarnings("unchecked")private void handleDTRenewerAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent event) {/** For applications submitted with delegation tokens we are not submitting* the application to scheduler from RMAppManager. Instead we are doing* it from here. The primary goal is to make token renewal as a part of* application submission asynchronous so that client thread is not* blocked during app submission.*/try {// Setup tokens for renewalDelegationTokenRenewer.this.handleAppSubmitEvent(event);rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START));} catch (Throwable t) {LOG.warn("Unable to add the application to the delegation token renewer.",t);// Sending APP_REJECTED is fine, since we assume that the// RMApp is in NEW state and thus we havne't yet informed the// Scheduler about the existence of the application rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));}}}?
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)throws IOException, InterruptedException {ApplicationId applicationId = evt.getApplicationId();Credentials ts = evt.getCredentials();boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();if (ts == null) {return; // nothing to add }if (LOG.isDebugEnabled()) {LOG.debug("Registering tokens for renewal for:" +" appId = " + applicationId);}Collection<Token<?>> tokens = ts.getAllTokens();long now = System.currentTimeMillis();// find tokens for renewal, but don't add timers until we know// all renewable tokens are valid// At RM restart it is safe to assume that all the previously added tokens// are valid appTokens.put(applicationId,Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();boolean hasHdfsToken = false;for (Token<?> token : tokens) {if (token.isManaged()) {if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {LOG.info(applicationId + " found existing hdfs token " + token);hasHdfsToken = true;}DelegationTokenToRenew dttr = allTokens.get(token);if (dttr == null) {dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,getConfig(), now, shouldCancelAtEnd, evt.getUser());try {renewToken(dttr);} catch (IOException ioe) {throw new IOException("Failed to renew token: " + dttr.token, ioe);}}tokenList.add(dttr);}}if (!tokenList.isEmpty()) {// Renewing token and adding it to timer calls are separated purposefully// If user provides incorrect token then it should not be added for// renewal.for (DelegationTokenToRenew dtr : tokenList) {DelegationTokenToRenew currentDtr =allTokens.putIfAbsent(dtr.token, dtr);if (currentDtr != null) {// another job beat us currentDtr.referringAppIds.add(applicationId);appTokens.get(applicationId).add(currentDtr);} else {appTokens.get(applicationId).add(dtr);setTimerForTokenRenewal(dtr);}}}if (!hasHdfsToken) {requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),shouldCancelAtEnd);}}?RM:resourceManager
AM:applicationMaster
NM:nodeManager
簡單的說,yarn涉及到3個通信協(xié)議:
ApplicationClientProtocol:client通過該協(xié)議與RM通信,以后會簡稱其為CR協(xié)議
ApplicationMasterProtocol:AM通過該協(xié)議與RM通信,以后會簡稱其為AR協(xié)議
ContainerManagementProtocol:AM通過該協(xié)議與NM通信,以后會簡稱其為AN協(xié)議
---------------------------------------------------------------------------------------------------------------------
通常而言,客戶端向RM提交一個程序,流程是這樣滴:
step1:創(chuàng)建一個CR協(xié)議的客戶端
rmClient=(ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol,rmAddress,conf)
step2:客戶端通過CR協(xié)議#getNewApplication從RM獲取唯一的應用程序ID,簡化過的代碼:
//GetNewApplicationRequest包含兩項信息:ApplicationId 和 最大可申請的資源量
//Records.newRecord(...)是一個靜態(tài)方法,通過序列化框架生成一些RPC過程需要的對象(yarn默認采用ProtocolBuffers(序列化框架,google ProtocolBuffers這些東東,麻煩大家google下呀,喵))
GetNewApplicationRequest request=Records.newRecord(GetNewApplicationRequest.class);
繼續(xù)看代碼(代碼都是簡化過的,親們原諒):
GetNewApplicationResponse newApp =rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();
step3:客戶端通過CR協(xié)議#submitApplication將AM提交到RM上,簡化過的代碼:
// 客戶端將啟動AM需要的所有信息打包到ApplicationSubmissionContext 中
ApplicationSubmissionContext??context = Records.newRecord(ApplicationSubmissionContext.class);
。。。。//設置應用程序名稱,優(yōu)先級,隊列名稱云云
context.setApplicationName(appName);
//構造一個AM啟動上下文對象?
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext .class)
。。。//設置AM相關的變量
amContainer.setLocalResource(localResponse);//設置AM啟動所需要的本地資源
amContainer.setEnvironment(env);
context.setAMContainerSpec(amContainer);
context.setApplicationId(appId);
SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class);?
request.setApplicationSubmissionContext(request);
rmClien.submitApplication(request);//將應用程序提交到RM上?
--------------------------------------------------------------------------------------------------------------------------------------------------
通常而言,AM向RM注冊自己,申請資源,請求NM啟動Container的流程是這樣滴:
AM-RM流程:
step1:創(chuàng)建一個AR協(xié)議的客戶端
ApplicationMasterProtocol??rmClient = (ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class,rmAddress,conf);
step2:AM向RM注冊自己
//這里的 recordFactory.newRecordInstance(。。。)與上面的Records.newRecord(。。。)作用一樣,都屬于靜態(tài)調用
RegisterApplicationMasterRequest??request =recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);?
request.setHost(host);
request.setRpcPort(port);
request.setTrackingUrl(appTrackingUrl)?
RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request);//完成注冊
step3:AM向RM請求資源
一段簡化的代碼如下(感興趣的朋友,還請親自閱讀源碼):
synchronized(this){
askList =new ArrayList<ResourceRequest>(ask);
releaseList = new ArrayList<ContainerId>(release);
allocateRequest = BuilderUtils.newAllocateRequest(....);構造一個 allocateRequest 對象
}?
//向RM申請資源,同時領取新分配的資源(CPU,內存等)
allocateResponse = rmClient.allocate(allocateRequest ) ;
//根據RM的應答信息設計接下來的邏輯(資源分配)
.....?
step4:AM告訴RM應用程序執(zhí)行完畢,并退出
//構造請求對象
FinishApplicationMasterRequest??request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class );
request.setFinishApplicationStatus(appStatus);
..//設置診斷信息
..//設置trackingUrl
//通知RM自己退出
rmclient.finishApplicationMaster(request);?
--------------------------------------------------------------------------------------------------------------------------------------------
AM-NM流程 :
step1:構造AN協(xié)議客戶端,并啟動Container
String cmIpPortStr = container.getNodeId().getHost()+":"+container.getNodeId().getPort();
InetSocketAddress? ?cmAddress=NetUtils.createSocketAddr(cmIpPortStr);
anClient = (ContainerManagementProtocol)rpc.getProxy(ContainerManagementProtocol.class,cmAddress,conf)
ContainerLaunchContext??ctx=Records.newRecord(ContainerLaunchContext.class);
。。。//設置ctx變量
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
request.setContainerLaunchContext(ctx);??
request.setContainer(container);?
anClient.startContainer(request);
Step2:為了實時掌握各個Container運行狀態(tài),AM可通過AN協(xié)議#getContainerStatus向NodeManager詢問Container運行狀態(tài)?
Step3:一旦一個Container運行完成后,AM可通過AN協(xié)議#stopContainer釋放Container?
===============================================================================================
參考文獻:
【1】http://www.aboutyun.com/thread-14277-1-1.html
【2】http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/
【3】http://www.bigdatas.cn/thread-59001-1-1.html
【4】http://bit1129.iteye.com/blog/2186238
【5】http://x-rip.iteye.com/blog/1541914
轉載于:https://www.cnblogs.com/davidwang456/p/4816336.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的hadoop2.7之Mapper/reducer源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Streaming Big Data:
- 下一篇: Apache Hadoop YARN –