WordCount作业提交到FileInputFormat类中split切分算法和host选择算法过程源码分析
參考?FileInputFormat類中split切分算法和host選擇算法介紹? 以及?Hadoop2.6.0的FileInputFormat的任務切分原理分析(即如何控制FileInputFormat的map任務數量)? 以及?Hadoop中FileInputFormat計算InputSplit的getSplits方法的流程? 以及?hadoop作業分片處理以及任務本地性分析(源碼分析第一篇)? ??
分析前先介紹一下:
( 這里要注意下, Block 的 hosts 和 Split 的 hosts 不一樣,?Split 的 hosts 是通過 Split 的 hosts 按一定方法生成的, 如果一個 Block 對應一個 Split (一般情況下是這樣的), 這時它們兩個 hosts 是一樣的. 如果不是一對一( Split > block), 則 Split 需要按一定方法選擇 hosts .?
Split 和 MapTask 是一一對應的, 一個 Split 對應一個 MapTask. 所以本地性是跟 Split 的 hosts 相關的.
BlocksMap存儲 Block 與 BlockInfo 的映射關系, Block 中主要包含3項: long blockId;? // 數據塊的唯一標識,即數據塊的ID號.?long numBytes;? // 數據塊包含的文件數據大小.?long generationStamp;? // 數據塊的版本號,或數據塊的時間戳.? ?BlockInfo( 在 Hadoop-2.7.3 中是 BlockInfoContiguous) 包含所以副本所在主機名.? )
開始分析: ( 這里是 hadoop-2.7.3-src )
以WordCount開始:?org.apache.hadoop.examples.WordCount.main() 內部調用?org.apache.hadoop.mapreduce.Job.waitForCompletion(boolean)
// 該段代碼在 org.apache.hadoop.examples.WordCountpublic static void main(String[] args) throws Exception {Configuration conf = new Configuration(); //指定作業執行規范 , Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工作String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count"); //指定job名稱,及運行對象 job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class); //為job設置Mapper類 job.setCombinerClass(IntSumReducer.class); //為job設置Combiner類 job.setReducerClass(IntSumReducer.class); //為job設置Reducer類job.setOutputKeyClass(Text.class); //為job的輸出數據設置Key類job.setOutputValueClass(IntWritable.class); //為job輸出設置value類 for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //為job設置輸入路徑, org.apache.hadoop.mapreduce.lib.input.FileInputFormat }FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1])); //為job設置輸出路徑 System.exit(job.waitForCompletion(true) ? 0 : 1); //運行job, 調用 Job.waitForCompletion()} WordCount在 Job.waitForCompletion() 函數內部會調用 Job 本類的方法 submit(), 在?submit() 內部接著調用?org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job , Cluster)
// 該段代碼在 org.apache.hadoop.mapreduce.Job 中// ....../*** Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState.DEFINE) {submit(); // 調用本類的 submit() }if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}// ....../*** Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);setUseNewAPI();connect();final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {return submitter.submitJobInternal(Job.this, cluster); // }});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());} Job?在?JobSubmitter.submitJobInternal(Job , Cluster) 函數內部調用本類的?writeSplits(Job ,Path )? 為job創建分片; 接著?writeSplits(Job ,Path?) 方法內部會調用本類的 (1) writeNewSplits(JobContext , Path ) { Hadoop2.0 會調用這個,新版的API}和 (2) writeOldSplits(JobConf , Path ) { 這個是舊版的 API };?在?JobSubmitter.writeNewSplits(JobContext , Path?) 方法內部會調用抽象類?org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ), 計算job的輸入文件的邏輯分片集合; 而在?JobSubmitter.writeOldSplits(JobContext , Path?) 方法內部會調用抽象類?org.apache.hadoop.mapred.InputFormat.getSplites(JobContext , int ), 計算job的輸入文件的邏輯分片集合.
// 該段代碼在 org.apache.hadoop.mapreduce.JobSubmitter 中// ....../*** Internal method for submitting jobs to the system.* * <p>The job submission process involves:* <ol>* <li>* Checking the input and output specifications of the job.* </li>* <li>* Computing the {@link InputSplit}s for the job.* </li>* <li>* Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary.* </li>* <li>* Copying the job's jar and configuration to the map-reduce system* directory on the distributed file-system. * </li>* <li>* Submitting the job to the <code>JobTracker</code> and optionally* monitoring it's status.* </li>* </ol></p>* @param job the configuration to submit* @param cluster the handle to the Cluster* @throws ClassNotFoundException* @throws InterruptedException* @throws IOException*/JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specs checkSpecs(job);Configuration conf = job.getConfiguration();addMRFrameworkToDistributedCache(conf);Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfsInetAddress ip = InetAddress.getLocalHost();if (ip != null) {submitHostAddress = ip.getHostAddress();submitHostName = ip.getHostName();conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}JobID jobId = submitClient.getNewJobID();job.setJobID(jobId);Path submitJobDir = new Path(jobStagingArea, jobId.toString());JobStatus status = null;try {conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");// get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] { submitJobDir }, conf);populateTokenCache(conf, job.getCredentials());// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}if (CryptoUtils.isEncryptedSpillEnabled(conf)) {conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}copyAndConfigureFiles(job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the jobLOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir); // 為job創建分片 conf.setInt(MRJobConfig.NUM_MAPS, maps);LOG.info("number of splits:" + maps);// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue);conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job. TokenCache.cleanUpTokenReferral(conf);if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t.decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it existsReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());}// Write job file to submit dir writeConf(conf, submitJobFile);//// Now, actually submit the job (using the submit name)// printTokens(jobId, job.getCredentials());status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); // 提交 jobif (status != null) {return status;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete(submitJobDir, true);}}}// ......private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}// ...... @SuppressWarnings("unchecked")private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}// ......//method to write splits for old api mapper.private int writeOldSplits(JobConf job, Path jobSubmitDir) throws IOException {org.apache.hadoop.mapred.InputSplit[] splits =job.getInputFormat().getSplits(job, job.getNumMapTasks());// sort the splits into order based on size, so that the biggest// go firstArrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {public int compare(org.apache.hadoop.mapred.InputSplit a,org.apache.hadoop.mapred.InputSplit b) {try {long left = a.getLength();long right = b.getLength();if (left == right) {return 0;} else if (left < right) {return 1;} else {return -1;}} catch (IOException ie) {throw new RuntimeException("Problem getting input split size", ie);}}});JobSplitWriter.createSplitFiles(jobSubmitDir, job, jobSubmitDir.getFileSystem(job), splits);return splits.length;} JobSubmitter?(1) 在抽象類 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext?) 方法,這里實際調用的是實現類?org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext?), ( 這里?FileInputFormat 有兩個相同的, 分別是?org.apache.hadoop.mapreduce.lib.input.FileInputFormat 和?org.apache.hadoop.mapred.FileInputFormat , 我們選擇org.apache.hadoop.mapreduce.lib.input.FileInputFormat 有以下幾點原因: 首先,?org.apache.hadoop.mapred.FileInputFormat 類是抽象類?InputFormat 的實現類; 其次, WordCount中的FileInputFormat 就是?org.apache.hadoop.mapreduce.lib.input.FileInputFormat.)
? ??我們介紹一個概念, 即新舊 MapReduce API , 從0.20.0版本開始, Hadoop 同時提供了新舊兩套 MapReduce API. 新 API 在舊 API 基礎上進行了封裝,使得其在擴展性和易用性方面更好. 舊版 API 放在?org.apache.hadoop.mapred 包中, 而新版 API 則放在?org.apache.hadoop.mapreduce 包及其子包中.
// 該段代碼是在 org.apache.hadoop.mapreduce.InputFormat 中/** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}* for processing.</p>** <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the* input files are not physically split into chunks. For e.g. a split could* be <i><input-file-path, start, offset></i> tuple. The InputFormat* also creates the {@link RecordReader} to read the {@link InputSplit}.* * @param context job configuration.* @return an array of {@link InputSplit}s for the job.*/public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;在?org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext?) 方法中,
?
(2) 在抽象類?org.apache.hadoop.mapred.InputFormat.getSplites(JobContext ,int ) 方法,這里實際調用的是實現類?org.apache.hadoop.mapred.FileInputFormat.getSplites(JobContext ,int ) ,? 這里 ?org.apache.hadoop.mapred.FileInputFormat 類是抽象類?InputFormat 的實現類. 具體參考?FileInputFormat類中split切分算法和host選擇算法介紹? .?
?
轉載于:https://www.cnblogs.com/zhangchao0515/p/8288298.html
總結
以上是生活随笔為你收集整理的WordCount作业提交到FileInputFormat类中split切分算法和host选择算法过程源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python入门 [输出,注释,列表,元
- 下一篇: windows下使用docker(一)—