Map-Reduce的过程解析
一、客戶端
Map-Reduce的過程首先是由客戶端提交一個任務開始的。
提交任務主要是通過JobClient.runJob(JobConf)靜態函數實現的:
| public static RunningJob runJob(JobConf job) throws IOException { ? //首先生成一個JobClient對象 ? JobClient jc = new JobClient(job); ? …… ? //調用submitJob來提交一個任務 ? running = jc.submitJob(job); ? JobID jobId = running.getID(); ? …… ? while (true) { ???? //while循環中不斷得到此任務的狀態,并打印到客戶端console中 ? } ? return running; } |
其中JobClient的submitJob函數實現如下:
| ? public RunningJob submitJob(JobConf job) throws FileNotFoundException, ??????????????????????????????? InvalidJobConfException, IOException { ? //從JobTracker得到當前任務的id ? JobID jobId = jobSubmitClient.getNewJobId(); ? //準備將任務運行所需要的要素寫入HDFS: ? //任務運行程序所在的jar封裝成job.jar ? //任務所要處理的input split信息寫入job.split ? //任務運行的配置項匯總寫入job.xml ? Path submitJobDir = new Path(getSystemDir(), jobId.toString()); ? Path submitJarFile = new Path(submitJobDir, "job.jar"); ? Path submitSplitFile = new Path(submitJobDir, "job.split"); ? //此處將-libjars命令行指定的jar上傳至HDFS ? configureCommandLineOptions(job, submitJobDir, submitJarFile); ? Path submitJobFile = new Path(submitJobDir, "job.xml"); ? …… ? //通過input format的格式獲得相應的input split,默認類型為FileSplit ? InputSplit[] splits = ??? job.getInputFormat().getSplits(job, job.getNumMapTasks()); ? ? // 生成一個寫入流,將input split得信息寫入job.split文件 ? FSDataOutputStream out = FileSystem.create(fs, ????? submitSplitFile, new FsPermission(JOB_FILE_PERMISSION)); ? try { ??? //寫入job.split文件的信息包括:split文件頭,split文件版本號,split的個數,接著依次寫入每一個input split的信息。 ??? //對于每一個input split寫入:split類型名(默認FileSplit),split的大小,split的內容(對于FileSplit,寫入文件名,此split在文件中的起始位置),split的location信息(即在那個DataNode上)。 ??? writeSplitsFile(splits, out); ? } finally { ??? out.close(); ? } ? job.set("mapred.job.split.file", submitSplitFile.toString()); ? //根據split的個數設定map task的個數 ? job.setNumMapTasks(splits.length); ? // 寫入job的配置信息入job.xml文件?????? ? out = FileSystem.create(fs, submitJobFile, ????? new FsPermission(JOB_FILE_PERMISSION)); ? try { ??? job.writeXml(out); ? } finally { ??? out.close(); ? } ? //真正的調用JobTracker來提交任務 ? JobStatus status = jobSubmitClient.submitJob(jobId); ? …… } |
?
二、JobTracker
JobTracker作為一個單獨的JVM運行,其運行的main函數主要調用有下面兩部分:
- 調用靜態函數startTracker(new JobConf())創建一個JobTracker對象
- 調用JobTracker.offerService()函數提供服務
在JobTracker的構造函數中,會生成一個taskScheduler成員變量,來進行Job的調度,默認為JobQueueTaskScheduler,也即按照FIFO的方式調度任務。
在offerService函數中,則調用taskScheduler.start(),在這個函數中,為JobTracker(也即taskScheduler的taskTrackerManager)注冊了兩個Listener:
- JobQueueJobInProgressListener jobQueueJobInProgressListener用于監控job的運行狀態
- EagerTaskInitializationListener eagerTaskInitializationListener用于對Job進行初始化
EagerTaskInitializationListener中有一個線程JobInitThread,不斷得到jobInitQueue中的JobInProgress對象,調用JobInProgress對象的initTasks函數對任務進行初始化操作。
在上一節中,客戶端調用了JobTracker.submitJob函數,此函數首先生成一個JobInProgress對象,然后調用addJob函數,其中有如下的邏輯:
| synchronized (jobs) { ? synchronized (taskScheduler) { ??? jobs.put(job.getProfile().getJobID(), job); ??? //對JobTracker的每一個listener都調用jobAdded函數 ??? for (JobInProgressListener listener : jobInProgressListeners) { ????? listener.jobAdded(job); ??? } ? } } |
?
EagerTaskInitializationListener的jobAdded函數就是向jobInitQueue中添加一個JobInProgress對象,于是自然觸發了此Job的初始化操作,由JobInProgress得initTasks函數完成:
| public synchronized void initTasks() throws IOException { ? …… ? //從HDFS中讀取job.split文件從而生成input splits ? String jobFile = profile.getJobFile(); ? Path sysDir = new Path(this.jobtracker.getSystemDir()); ? FileSystem fs = sysDir.getFileSystem(conf); ? DataInputStream splitFile = ??? fs.open(new Path(conf.get("mapred.job.split.file"))); ? JobClient.RawSplit[] splits; ? try { ??? splits = JobClient.readSplitFile(splitFile); ? } finally { ??? splitFile.close(); ? } ? //map task的個數就是input split的個數 ? numMapTasks = splits.length; ? //為每個map tasks生成一個TaskInProgress來處理一個input split ? maps = new TaskInProgress[numMapTasks]; ? for(int i=0; i < numMapTasks; ++i) { ??? inputLength += splits[i].getDataLength(); ??? maps[i] = new TaskInProgress(jobId, jobFile, ???????????????????????????????? splits[i], ???????????????????????????????? jobtracker, conf, this, i); ? } ? //對于map task,將其放入nonRunningMapCache,是一個Map<Node, List<TaskInProgress>>,也即對于map task來講,其將會被分配到其input split所在的Node上。nonRunningMapCache將在JobTracker向TaskTracker分配map task的時候使用。 ? if (numMapTasks > 0) {? ? ? //創建reduce task ? this.reduces = new TaskInProgress[numReduceTasks]; ? for (int i = 0; i < numReduceTasks; i++) { ??? reduces[i] = new TaskInProgress(jobId, jobFile, ??????????????????????????????????? numMapTasks, i, ??????????????????????????????????? jobtracker, conf, this); ??? //reduce task放入nonRunningReduces,其將在JobTracker向TaskTracker分配reduce task的時候使用。 ??? nonRunningReduces.add(reduces[i]); ? } ? ? //創建兩個cleanup task,一個用來清理map,一個用來清理reduce. ? cleanup = new TaskInProgress[2]; ? cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], ????????? jobtracker, conf, this, numMapTasks); ? cleanup[0].setJobCleanupTask(); ? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, ???????????????????? numReduceTasks, jobtracker, conf, this); ? cleanup[1].setJobCleanupTask(); ? //創建兩個初始化 task,一個初始化map,一個初始化reduce. ? setup = new TaskInProgress[2]; ? setup[0] = new TaskInProgress(jobId, jobFile, splits[0], ????????? jobtracker, conf, this, numMapTasks + 1 ); ? setup[0].setJobSetupTask(); ? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, ???????????????????? numReduceTasks + 1, jobtracker, conf, this); ? setup[1].setJobSetupTask(); ? tasksInited.set(true);//初始化完畢 ? …… } |
?
三、TaskTracker
TaskTracker也是作為一個單獨的JVM來運行的,在其main函數中,主要是調用了new TaskTracker(conf).run(),其中run函數主要調用了:
| State offerService() throws Exception { ? long lastHeartbeat = 0; ? //TaskTracker進行是一直存在的 ? while (running && !shuttingDown) { ????? …… ????? long now = System.currentTimeMillis(); ????? //每隔一段時間就向JobTracker發送heartbeat ????? long waitTime = heartbeatInterval - (now - lastHeartbeat); ????? if (waitTime > 0) { ??????? synchronized(finishedCount) { ????????? if (finishedCount[0] == 0) { ??????????? finishedCount.wait(waitTime); ????????? } ????????? finishedCount[0] = 0; ??????? } ????? } ????? …… ????? //發送Heartbeat到JobTracker,得到response ????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); ????? …… ???? //從Response中得到此TaskTracker需要做的事情 ????? TaskTrackerAction[] actions = heartbeatResponse.getActions(); ????? …… ????? if (actions != null){ ??????? for(TaskTrackerAction action: actions) { ????????? if (action instanceof LaunchTaskAction) { ??????????? //如果是運行一個新的Task,則將Action添加到任務隊列中 ??????????? addToTaskQueue((LaunchTaskAction)action); ????????? } else if (action instanceof CommitTaskAction) { ??????????? CommitTaskAction commitAction = (CommitTaskAction)action; ??????????? if (!commitResponses.contains(commitAction.getTaskID())) { ????????????? commitResponses.add(commitAction.getTaskID()); ??????????? } ????????? } else { ??????????? tasksToCleanup.put(action); ????????? } ??????? } ????? } ? } ? return State.NORMAL; } |
其中transmitHeartBeat主要邏輯如下:
| private HeartbeatResponse transmitHeartBeat(long now) throws IOException { ? //每隔一段時間,在heartbeat中要返回給JobTracker一些統計信息 ? boolean sendCounters; ? if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { ??? sendCounters = true; ??? previousUpdate = now; ? } ? else { ??? sendCounters = false; ? } ? …… ? //報告給JobTracker,此TaskTracker的當前狀態 ? if (status == null) { ??? synchronized (this) { ????? status = new TaskTrackerStatus(taskTrackerName, localHostname, ???????????????????????????????????? httpPort, ???????????????????????????????????? cloneAndResetRunningTaskStatuses( ?????????????????????????????????????? sendCounters), ???????????????????????????????????? failures, ???????????????????????????????????? maxCurrentMapTasks, ???????????????????????????????????? maxCurrentReduceTasks); ??? } ? } ? …… ? //當滿足下面的條件的時候,此TaskTracker請求JobTracker為其分配一個新的Task來運行: ? //當前TaskTracker正在運行的map task的個數小于可以運行的map task的最大個數 ? //當前TaskTracker正在運行的reduce task的個數小于可以運行的reduce task的最大個數 ? boolean askForNewTask; ? long localMinSpaceStart; ? synchronized (this) { ??? askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || ???????????????????? status.countReduceTasks() < maxCurrentReduceTasks) && ??????????????????? acceptNewTasks; ??? localMinSpaceStart = minSpaceStart; ? } ? …… ? //向JobTracker發送heartbeat,這是一個RPC調用 ? HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, ??????????????????????????????????????????????????????????? justStarted, askForNewTask, ??????????????????????????????????????????????????????????? heartbeatResponseId); ? …… ? return heartbeatResponse; } |
?
四、JobTracker
當JobTracker被RPC調用來發送heartbeat的時候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函數被調用:
| public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, ??????????????????????????????????????????????? boolean initialContact, boolean acceptNewTasks, short responseId) ? throws IOException { ? …… ? String trackerName = status.getTrackerName(); ? …… ? short newResponseId = (short)(responseId + 1); ? …… ? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); ? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); ? //如果TaskTracker向JobTracker請求一個task運行 ? if (acceptNewTasks) { ??? TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); ??? if (taskTrackerStatus == null) { ????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); ??? } else { ????? //setup和cleanup的task優先級最高 ????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); ????? if (tasks == null ) { ??????? //任務調度器分配任務 ??????? tasks = taskScheduler.assignTasks(taskTrackerStatus); ????? } ????? if (tasks != null) { ??????? for (Task task : tasks) { ????????? //將任務放入actions列表,返回給TaskTracker ????????? expireLaunchingTasks.addNewTask(task.getTaskID()); ????????? actions.add(new LaunchTaskAction(task)); ??????? } ????? } ??? } ? } ? …… ? int nextInterval = getNextHeartbeatInterval(); ? response.setHeartbeatInterval(nextInterval); ? response.setActions( ????????????????????? actions.toArray(new TaskTrackerAction[actions.size()])); ? …… ? return response; } |
默認的任務調度器為JobQueueTaskScheduler,其assignTasks如下:
| public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) ??? throws IOException { ? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); ? int numTaskTrackers = clusterStatus.getTaskTrackers(); ? Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); ? int maxCurrentMapTasks = taskTracker.getMaxMapTasks(); ? int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks(); ? int numMaps = taskTracker.countMapTasks(); ? int numReduces = taskTracker.countReduceTasks(); ? //計算剩余的map和reduce的工作量:remaining ? int remainingReduceLoad = 0; ? int remainingMapLoad = 0; ? synchronized (jobQueue) { ??? for (JobInProgress job : jobQueue) { ????? if (job.getStatus().getRunState() == JobStatus.RUNNING) { ??????? int totalMapTasks = job.desiredMaps(); ??????? int totalReduceTasks = job.desiredReduces(); ??????? remainingMapLoad += (totalMapTasks - job.finishedMaps()); ??????? remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); ????? } ??? } ? } ? //計算平均每個TaskTracker應有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的個數。 ? int maxMapLoad = 0; ? int maxReduceLoad = 0; ? if (numTaskTrackers > 0) { ??? maxMapLoad = Math.min(maxCurrentMapTasks, ????????????????????????? (int) Math.ceil((double) remainingMapLoad / ????????????????????????????????????????? numTaskTrackers)); ??? maxReduceLoad = Math.min(maxCurrentReduceTasks, ???????????????????????????? (int) Math.ceil((double) remainingReduceLoad ???????????????????????????????????????????? / numTaskTrackers)); ? } ? …… ? ? //map優先于reduce,當TaskTracker上運行的map task數目小于平均的工作量,則向其分配map task ? if (numMaps < maxMapLoad) { ??? int totalNeededMaps = 0; ??? synchronized (jobQueue) { ????? for (JobInProgress job : jobQueue) { ??????? if (job.getStatus().getRunState() != JobStatus.RUNNING) { ????????? continue; ??????? } ??????? Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers, ??????????? taskTrackerManager.getNumberOfUniqueHosts()); ??????? if (t != null) { ????????? return Collections.singletonList(t); ??????? } ??????? …… ????? } ??? } ? } ? //分配完map task,再分配reduce task ? if (numReduces < maxReduceLoad) { ??? int totalNeededReduces = 0; ??? synchronized (jobQueue) { ????? for (JobInProgress job : jobQueue) { ??????? if (job.getStatus().getRunState() != JobStatus.RUNNING || ??????????? job.numReduceTasks == 0) { ????????? continue; ??????? } ??????? Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, ??????????? taskTrackerManager.getNumberOfUniqueHosts()); ??????? if (t != null) { ????????? return Collections.singletonList(t); ??????? } ??????? …… ????? } ??? } ? } ? return null; } |
從上面的代碼中我們可以知道,JobInProgress的obtainNewMapTask是用來分配map task的,其主要調用findNewMapTask,根據TaskTracker所在的Node從nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用來分配reduce task的,其主要調用findNewReduceTask,從nonRunningReduces查找TaskInProgress。
?
五、TaskTracker
在向JobTracker發送heartbeat后,返回的reponse中有分配好的任務LaunchTaskAction,將其加入隊列,調用addToTaskQueue,如果是map task則放入mapLancher(類型為TaskLauncher),如果是reduce task則放入reduceLancher(類型為TaskLauncher):
| private void addToTaskQueue(LaunchTaskAction action) { ? if (action.getTask().isMapTask()) { ??? mapLauncher.addToTaskQueue(action); ? } else { ??? reduceLauncher.addToTaskQueue(action); ? } } |
TaskLauncher是一個線程,其run函數從上面放入的queue中取出一個TaskInProgress,然后調用startNewTask(TaskInProgress tip)來啟動一個task,其又主要調用了localizeJob(TaskInProgress tip):
| private void localizeJob(TaskInProgress tip) throws IOException { ? //首先要做的一件事情是有關Task的文件從HDFS拷貝的TaskTracker的本地文件系統中:job.split,job.xml以及job.jar ? Path localJarFile = null; ? Task t = tip.getTask(); ? JobID jobId = t.getJobID(); ? Path jobFile = new Path(t.getJobFile()); ? …… ? Path localJobFile = lDirAlloc.getLocalPathForWrite( ????????????????????????????????? getLocalJobDir(jobId.toString()) ????????????????????????????????? + Path.SEPARATOR + "job.xml", ????????????????????????????????? jobFileSize, fConf); ? RunningJob rjob = addTaskToJob(jobId, tip); ? synchronized (rjob) { ??? if (!rjob.localized) { ????? FileSystem localFs = FileSystem.getLocal(fConf); ????? Path jobDir = localJobFile.getParent(); ????? …… ????? //將job.split拷貝到本地 ????? systemFS.copyToLocalFile(jobFile, localJobFile); ????? JobConf localJobConf = new JobConf(localJobFile); ????? Path workDir = lDirAlloc.getLocalPathForWrite( ?????????????????????? (getLocalJobDir(jobId.toString()) ?????????????????????? + Path.SEPARATOR + "work"), fConf); ????? if (!localFs.mkdirs(workDir)) { ??????? throw new IOException("Mkdirs failed to create " ??????????????????? + workDir.toString()); ????? } ????? System.setProperty("job.local.dir", workDir.toString()); ????? localJobConf.set("job.local.dir", workDir.toString()); ????? // copy Jar file to the local FS and unjar it. ????? String jarFile = localJobConf.getJar(); ????? long jarFileSize = -1; ????? if (jarFile != null) { ??????? Path jarFilePath = new Path(jarFile); ??????? localJarFile = new Path(lDirAlloc.getLocalPathForWrite( ?????????????????????????????????? getLocalJobDir(jobId.toString()) ?????????????????????????????????? + Path.SEPARATOR + "jars", ?????????????????????????????????? 5 * jarFileSize, fConf), "job.jar"); ??????? if (!localFs.mkdirs(localJarFile.getParent())) { ????????? throw new IOException("Mkdirs failed to create jars directory "); ??????? } ??????? //將job.jar拷貝到本地 ??????? systemFS.copyToLocalFile(jarFilePath, localJarFile); ??????? localJobConf.setJar(localJarFile.toString()); ?????? //將job得configuration寫成job.xml ??????? OutputStream out = localFs.create(localJobFile); ??????? try { ????????? localJobConf.writeXml(out); ??????? } finally { ????????? out.close(); ??????? } ??????? // 解壓縮job.jar ??????? RunJar.unJar(new File(localJarFile.toString()), ???????????????????? new File(localJarFile.getParent().toString())); ????? } ????? rjob.localized = true; ????? rjob.jobConf = localJobConf; ??? } ? } ? //真正的啟動此Task ? launchTaskForJob(tip, new JobConf(rjob.jobConf)); } |
當所有的task運行所需要的資源都拷貝到本地后,則調用launchTaskForJob,其又調用TaskInProgress的launchTask函數:
| public synchronized void launchTask() throws IOException { ??? …… ??? //創建task運行目錄 ??? localizeTask(task); ??? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { ????? this.taskStatus.setRunState(TaskStatus.State.RUNNING); ??? } ??? //創建并啟動TaskRunner,對于MapTask,創建的是MapTaskRunner,對于ReduceTask,創建的是ReduceTaskRunner ??? this.runner = task.createRunner(TaskTracker.this, this); ??? this.runner.start(); ??? this.taskStatus.setStartTime(System.currentTimeMillis()); } |
TaskRunner是一個線程,其run函數如下:
| public final void run() { ??? …… ??? TaskAttemptID taskid = t.getTaskID(); ??? LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); ??? File jobCacheDir = null; ??? if (conf.getJar() != null) { ????? jobCacheDir = new File( ??????????????????????? new Path(conf.getJar()).getParent().toString()); ??? } ??? File workDir = new File(lDirAlloc.getLocalPathToRead( ????????????????????????????? TaskTracker.getLocalTaskDir( ??????????????????????????????? t.getJobID().toString(), ??????????????????????????????? t.getTaskID().toString(), ??????????????????????????????? t.isTaskCleanupTask()) ????????????????????????????? + Path.SEPARATOR + MRConstants.WORKDIR, ????????????????????????????? conf). toString()); ??? FileSystem fileSystem; ??? Path localPath; ??? …… ??? //拼寫classpath ??? String baseDir; ??? String sep = System.getProperty("path.separator"); ??? StringBuffer classPath = new StringBuffer(); ??? // start with same classpath as parent process ??? classPath.append(System.getProperty("java.class.path")); ??? classPath.append(sep); ??? if (!workDir.mkdirs()) { ????? if (!workDir.isDirectory()) { ??????? LOG.fatal("Mkdirs failed to create " + workDir.toString()); ????? } ??? } ??? String jar = conf.getJar(); ??? if (jar != null) {?????? ????? // if jar exists, it into workDir ????? File[] libs = new File(jobCacheDir, "lib").listFiles(); ????? if (libs != null) { ??????? for (int i = 0; i < libs.length; i++) { ????????? classPath.append(sep);??????????? // add libs from jar to classpath ????????? classPath.append(libs[i]); ??????? } ????? } ????? classPath.append(sep); ????? classPath.append(new File(jobCacheDir, "classes")); ????? classPath.append(sep); ????? classPath.append(jobCacheDir); ??? } ??? …… ??? classPath.append(sep); ??? classPath.append(workDir); ??? //拼寫命令行java及其參數 ??? Vector<String> vargs = new Vector<String>(8); ??? File jvm = ????? new File(new File(System.getProperty("java.home"), "bin"), "java"); ??? vargs.add(jvm.toString()); ??? String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); ??? javaOpts = javaOpts.replace("@taskid@", taskid.toString()); ??? String [] javaOptsSplit = javaOpts.split(" "); ??? String libraryPath = System.getProperty("java.library.path"); ??? if (libraryPath == null) { ????? libraryPath = workDir.getAbsolutePath(); ??? } else { ????? libraryPath += sep + workDir; ??? } ??? boolean hasUserLDPath = false; ??? for(int i=0; i<javaOptsSplit.length ;i++) { ????? if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { ??????? javaOptsSplit[i] += sep + libraryPath; ??????? hasUserLDPath = true; ??????? break; ????? } ??? } ??? if(!hasUserLDPath) { ????? vargs.add("-Djava.library.path=" + libraryPath); ??? } ??? for (int i = 0; i < javaOptsSplit.length; i++) { ????? vargs.add(javaOptsSplit[i]); ??? } ??? //添加Child進程的臨時文件夾 ??? String tmp = conf.get("mapred.child.tmp", "./tmp"); ??? Path tmpDir = new Path(tmp); ??? if (!tmpDir.isAbsolute()) { ????? tmpDir = new Path(workDir.toString(), tmp); ??? } ??? FileSystem localFs = FileSystem.getLocal(conf); ??? if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { ????? throw new IOException("Mkdirs failed to create " + tmpDir.toString()); ??? } ??? vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); ??? // Add classpath. ??? vargs.add("-classpath"); ??? vargs.add(classPath.toString()); ??? //log文件夾 ??? long logSize = TaskLog.getTaskLogLength(conf); ??? vargs.add("-Dhadoop.log.dir=" + ??????? new File(System.getProperty("hadoop.log.dir") ??????? ).getAbsolutePath()); ??? vargs.add("-Dhadoop.root.logger=INFO,TLA"); ??? vargs.add("-Dhadoop.tasklog.taskid=" + taskid); ??? vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); ??? // 運行map task和reduce task的子進程的main class是Child ??? vargs.add(Child.class.getName());? // main of Child ??? …… ??? //運行子進程 ??? jvmManager.launchJvm(this, ??????? jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, ??????????? workDir, env, pidFile, conf)); } |
?
六、Child
真正的map task和reduce task都是在Child進程中運行的,Child的main函數的主要邏輯如下:
| while (true) { ? //從TaskTracker通過網絡通信得到JvmTask對象 ? JvmTask myTask = umbilical.getTask(jvmId); ? …… ? idleLoopCount = 0; ? task = myTask.getTask(); ? taskid = task.getTaskID(); ? isCleanup = task.isTaskCleanupTask(); ? JobConf job = new JobConf(task.getJobFile()); ? TaskRunner.setupWorkDir(job); ? numTasksToExecute = job.getNumTasksToExecutePerJvm(); ? task.setConf(job); ? defaultConf.addResource(new Path(task.getJobFile())); ? …… ? //運行task ? task.run(job, umbilical);???????????? // run the task ? if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) { ??? break; ? } } |
6.1、MapTask
如果task是MapTask,則其run函數如下:
| public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) ? throws IOException { ? //用于同TaskTracker進行通信,匯報運行狀況 ? final Reporter reporter = getReporter(umbilical); ? startCommunicationThread(umbilical); ? initialize(job, reporter); ? …… ? //map task的輸出 ? int numReduceTasks = conf.getNumReduceTasks(); ? MapOutputCollector collector = null; ? if (numReduceTasks > 0) { ??? collector = new MapOutputBuffer(umbilical, job, reporter); ? } else { ??? collector = new DirectMapOutputCollector(umbilical, job, reporter); ? } ? //讀取input split,按照其中的信息,生成RecordReader來讀取數據 instantiatedSplit = (InputSplit) ????? ReflectionUtils.newInstance(job.getClassByName(splitClass), job); ? DataInputBuffer splitBuffer = new DataInputBuffer(); ? splitBuffer.reset(split.getBytes(), 0, split.getLength()); ? instantiatedSplit.readFields(splitBuffer); ? if (instantiatedSplit instanceof FileSplit) { ??? FileSplit fileSplit = (FileSplit) instantiatedSplit; ??? job.set("map.input.file", fileSplit.getPath().toString()); ??? job.setLong("map.input.start", fileSplit.getStart()); ??? job.setLong("map.input.length", fileSplit.getLength()); ? } ? RecordReader rawIn =????????????????? // open input ??? job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter); ? RecordReader in = isSkipping() ? ????? new SkippingRecordReader(rawIn, getCounters(), umbilical) : ????? new TrackedRecordReader(rawIn, getCounters()); ? job.setBoolean("mapred.skip.on", isSkipping()); ? //對于map task,生成一個MapRunnable,默認是MapRunner ? MapRunnable runner = ??? ReflectionUtils.newInstance(job.getMapRunnerClass(), job); ? try { ??? //MapRunner的run函數就是依次讀取RecordReader中的數據,然后調用Mapper的map函數進行處理。 ??? runner.run(in, collector, reporter);????? ??? collector.flush(); ? } finally { ??? in.close();?????????????????????????????? // close input ??? collector.close(); ? } ? done(umbilical); } |
MapRunner的run函數就是依次讀取RecordReader中的數據,然后調用Mapper的map函數進行處理:
| public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, ??????????????? Reporter reporter) ? throws IOException { ? try { ??? K1 key = input.createKey(); ??? V1 value = input.createValue(); ??? while (input.next(key, value)) { ????? mapper.map(key, value, output, reporter); ????? if(incrProcCount) { ??????? reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, ??????????? SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); ????? } ??? } ? } finally { ??? mapper.close(); ? } } |
結果集全部收集到MapOutputBuffer中,其collect函數如下:
| public synchronized void collect(K key, V value) ??? throws IOException { ? reporter.progress(); ? …… ? //從此處看,此buffer是一個ring的數據結構 ? final int kvnext = (kvindex + 1) % kvoffsets.length; ? spillLock.lock(); ? try { ??? boolean kvfull; ??? do { ????? //在ring中,如果下一個空閑位置接上起始位置的話,則表示滿了 ????? kvfull = kvnext == kvstart; ????? //在ring中計算是否需要將buffer寫入硬盤的閾值 ????? final boolean kvsoftlimit = ((kvnext > kvend) ????????? ? kvnext - kvend > softRecordLimit ????????? : kvend - kvnext <= kvoffsets.length - softRecordLimit); ????? //如果到達閾值,則開始將buffer寫入硬盤,寫成spill文件。 ????? //startSpill主要是notify一個背后線程SpillThread的run()函數,開始調用sortAndSpill()開始排序,合并,寫入硬盤 ????? if (kvstart == kvend && kvsoftlimit) { ??????? startSpill(); ????? } ????? //如果buffer滿了,則只能等待寫入完畢 ????? if (kvfull) { ????????? while (kvstart != kvend) { ??????????? reporter.progress(); ??????????? spillDone.await(); ????????? } ????? } ??? } while (kvfull); ? } finally { ??? spillLock.unlock(); ? } ? try { ??? //如果buffer不滿,則將key, value寫入buffer ??? int keystart = bufindex; ??? keySerializer.serialize(key); ??? final int valstart = bufindex; ??? valSerializer.serialize(value); ??? int valend = bb.markRecord(); ??? //調用設定的partitioner,根據key, value取得partition id ??? final int partition = partitioner.getPartition(key, value, partitions); ??? mapOutputRecordCounter.increment(1); ??? mapOutputByteCounter.increment(valend >= keystart ??????? ? valend - keystart ??????? : (bufvoid - keystart) + valend); ??? //將parition id以及key, value在buffer中的偏移量寫入索引數組 ??? int ind = kvindex * ACCTSIZE; ??? kvoffsets[kvindex] = ind; ??? kvindices[ind + PARTITION] = partition; ??? kvindices[ind + KEYSTART] = keystart; ??? kvindices[ind + VALSTART] = valstart; ??? kvindex = kvnext; ? } catch (MapBufferTooSmallException e) { ??? LOG.info("Record too large for in-memory buffer: " + e.getMessage()); ??? spillSingleRecord(key, value); ??? mapOutputRecordCounter.increment(1); ??? return; ? } } |
內存buffer的格式如下:
(見幾位hadoop大俠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx?以及http://caibinbupt.javaeye.com/)
kvoffsets是為了寫入內存前排序使用的。
從上面可知,內存buffer寫入硬盤spill文件的函數為sortAndSpill:
| private void sortAndSpill() throws IOException { ? …… ? FSDataOutputStream out = null; ? FSDataOutputStream indexOut = null; ? IFileOutputStream indexChecksumOut = null; ? //創建硬盤上的spill文件 ? Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), ????????????????????????????????? numSpills, size); ? out = rfs.create(filename); ? …… ? final int endPosition = (kvend > kvstart) ??? ? kvend ??? : kvoffsets.length + kvend; ? //按照partition的順序對buffer中的數據進行排序 ? sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); ? int spindex = kvstart; ? InMemValBytes value = new InMemValBytes(); ? //依次一個一個parition的寫入文件 ? for (int i = 0; i < partitions; ++i) { ??? IFile.Writer<K, V> writer = null; ??? long segmentStart = out.getPos(); ??? writer = new Writer<K, V>(job, out, keyClass, valClass, codec); ??? //如果combiner為空,則直接寫入文件 ??? if (null == combinerClass) { ??????? …… ??????? writer.append(key, value); ??????? ++spindex; ???? } ???? else { ??????? …… ??????? //如果combiner不為空,則先combine,調用combiner.reduce(…)函數后再寫入文件 ??????? combineAndSpill(kvIter, combineInputCounter); ???? } ? } ? …… } |
當map階段結束的時候,MapOutputBuffer的flush函數會被調用,其也會調用sortAndSpill將buffer中的寫入文件,然后再調用mergeParts來合并寫入在硬盤上的多個spill:
| private void mergeParts() throws IOException { ??? …… ??? //對于每一個partition ??? for (int parts = 0; parts < partitions; parts++){ ????? //create the segments to be merged ????? List<Segment<K, V>> segmentList = ??????? new ArrayList<Segment<K, V>>(numSpills); ????? TaskAttemptID mapId = getTaskID(); ?????? //依次從各個spill文件中收集屬于當前partition的段 ????? for(int i = 0; i < numSpills; i++) { ??????? final IndexRecord indexRecord = ????????? getIndexInformation(mapId, i, parts); ??????? long segmentOffset = indexRecord.startOffset; ??????? long segmentLength = indexRecord.partLength; ??????? Segment<K, V> s = ????????? new Segment<K, V>(job, rfs, filename[i], segmentOffset, ??????????????????????????? segmentLength, codec, true); ??????? segmentList.add(i, s); ????? } ????? //將屬于同一個partition的段merge到一起 ????? RawKeyValueIterator kvIter = ??????? Merger.merge(job, rfs, ???????????????????? keyClass, valClass, ???????????????????? segmentList, job.getInt("io.sort.factor", 100), ???????????????????? new Path(getTaskID().toString()), ???????????????????? job.getOutputKeyComparator(), reporter); ????? //寫入合并后的段到文件 ????? long segmentStart = finalOut.getPos(); ????? Writer<K, V> writer = ????????? new Writer<K, V>(job, finalOut, keyClass, valClass, codec); ????? if (null == combinerClass || numSpills < minSpillsForCombine) { ??????? Merger.writeFile(kvIter, writer, reporter, job); ????? } else { ??????? combineCollector.setWriter(writer); ??????? combineAndSpill(kvIter, combineInputCounter); ????? } ????? …… ??? } } |
6.2、ReduceTask
ReduceTask的run函數如下:
| public void run(JobConf job, final TaskUmbilicalProtocol umbilical) ? throws IOException { ? job.setBoolean("mapred.skip.on", isSkipping()); ? //對于reduce,則包含三個步驟:拷貝,排序,Reduce ? if (isMapOrReduce()) { ??? copyPhase = getProgress().addPhase("copy"); ??? sortPhase? = getProgress().addPhase("sort"); ??? reducePhase = getProgress().addPhase("reduce"); ? } ? startCommunicationThread(umbilical); ? final Reporter reporter = getReporter(umbilical); ? initialize(job, reporter); ? //copy階段,主要使用ReduceCopier的fetchOutputs函數獲得map的輸出。創建多個線程MapOutputCopier,其中copyOutput進行拷貝。 ? boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); ? if (!isLocal) { ??? reduceCopier = new ReduceCopier(umbilical, job); ??? if (!reduceCopier.fetchOutputs()) { ??????? …… ??? } ? } ? copyPhase.complete(); ? //sort階段,將得到的map輸出合并,直到文件數小于io.sort.factor時停止,返回一個Iterator用于訪問key-value ? setPhase(TaskStatus.Phase.SORT); ? statusUpdate(umbilical); ? final FileSystem rfs = FileSystem.getLocal(job).getRaw(); ? RawKeyValueIterator rIter = isLocal ??? ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), ??????? job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), ??????? !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), ??????? new Path(getTaskID().toString()), job.getOutputKeyComparator(), ??????? reporter) ??? : reduceCopier.createKVIterator(job, rfs, reporter); ? mapOutputFilesOnDisk.clear(); ? sortPhase.complete(); ? //reduce階段 ? setPhase(TaskStatus.Phase.REDUCE); ? …… ? Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job); ? Class keyClass = job.getMapOutputKeyClass(); ? Class valClass = job.getMapOutputValueClass(); ? ReduceValuesIterator values = isSkipping() ? ???? new SkippingReduceValuesIterator(rIter, ????????? job.getOutputValueGroupingComparator(), keyClass, valClass, ????????? job, reporter, umbilical) : ????? new ReduceValuesIterator(rIter, ????? job.getOutputValueGroupingComparator(), keyClass, valClass, ????? job, reporter); ? //逐個讀出key-value list,然后調用Reducer的reduce函數 ? while (values.more()) { ??? reduceInputKeyCounter.increment(1); ??? reducer.reduce(values.getKey(), values, collector, reporter); ??? values.nextKey(); ??? values.informReduceProgress(); ? } ? reducer.close(); ? out.close(reporter); ? done(umbilical); } |
?
七、總結
Map-Reduce的過程總結如下圖:
總結
以上是生活随笔為你收集整理的Map-Reduce的过程解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HDFS读写过程解析
- 下一篇: Map-Reduce入门