Spark的stage划分算法源码分析
Spark Application中可以有不同的Action觸發多個Job,也就是說一個Application中可以有很多的Job,每個Job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
然而Stage劃分的依據就是寬依賴,什么時候產生寬依賴(產生shuffle)呢?例如reduceByKey,groupByKey等等。
DAGScheduler的handleJobSubmitted方法主要是用來創建最后一個stage,同時將job劃分成多個stage。
一、stage的劃分算法’
/**** 來處理這次提交的Job來處理這次提交的Job*/private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],allowLocal: Boolean,callSite: CallSite,listener: JobListener,properties: Properties = null){// 一、使用觸發job的RDD最后一個stagevar finalStage: Stage = nulltry {// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.// stage的劃分是從最后一個stage往前倒序劃分的,最后一個就是一個stage// 并將stage放入DAGschedule的緩存中finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}if (finalStage != null) {// 用finalStage創建job,也就是說這個job最后一個stage,肯定就是finalstage// 這里就創建了一個job了val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(job.jobId, callSite.shortForm, partitions.length, allowLocal))logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val shouldRunLocally =localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1val jobSubmissionTime = clock.getTimeMillis()if (shouldRunLocally) {// Compute very short actions like first() or take() with no parent stages locally.listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))runLocally(job)} else {// 三、將job添加到內存緩存中jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.resultOfJob = Some(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 四、提交最后一個stage// 這個方法會提交第一個stage 并把其余的stage放在緩存中submitStage(finalStage)}}// 在提交等待隊列的stagesubmitWaitingStages()} /*** 提交stage的方法* 同時包含stage的劃分算法* @param stage*/private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// 這個很重要,獲取某個stage 的父stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 如果返回為空if (missing == Nil) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {// 繼續遞歸調用劃分stagefor (parent <- missing) {submitStage(parent)}// 同時將stage加入到等待隊列waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id)}}獲取某個stage的父stage
/*** 獲取stage劃分的父stage* @param stage* @return*/private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visiting// 壓棧的方式 先入后出val waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddif (getCacheLocs(rdd).contains(Nil)) {// 遍歷rdd的依賴for (dep <- rdd.dependencies) {dep match {// 寬依賴(shuffle依賴)case shufDep: ShuffleDependency[_, _, _] =>// 使用那個寬依賴創建一個ShuffleMapStage,并且會將isshuffleMap設置為true// 那么默認最后一個stage,不是shuffleMap stage// 但是finalStage之前所有的stage,都是shuffleMap stageval mapStage = getShuffleMapStage(shufDep, stage.jobId)if (!mapStage.isAvailable) {missing += mapStage}// 窄依賴,直接壓棧case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}// 首先往棧中壓入一個RDDwaitingForVisit.push(stage.rdd)//遍歷RDDwhile (!waitingForVisit.isEmpty) {visit(waitingForVisit.pop())}missing.toList}通過以上兩個方法遞歸循環調用將所有的stage保存在waitingStages緩存中。
循環調用下面的方法將stage提交
private def submitWaitingStages() {// TODO: We might want to run this less often, when we are sure that something has become// runnable that wasn't before.logTrace("Checking for newly runnable parent stages")logTrace("running: " + runningStages)logTrace("waiting: " + waitingStages)logTrace("failed: " + failedStages)val waitingStagesCopy = waitingStages.toArraywaitingStages.clear()// 循環提交stagefor (stage <- waitingStagesCopy.sortBy(_.jobId)) {submitStage(stage)}}stage劃分算法的總結:
1、stage從finalstage倒推
2、通過寬依賴,來對新的stage進行劃分提交
3、通過遞歸的方式,優先提交父stage
對于產生shuffle的算子,底層會產生三個RDD,分別是MappartitionRDD、shuffleRDD和MappartitionRDD,第一個MappartitionRDD和ShuffleRDD之間會產生shuffle,所以這個就是stage分配的分割點。
二、task的最佳位置計算算法
/*** 提交stage,為stage創建一批task,task 的數量與partition數量相同*/private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// Get our pending tasks and remember them in our pendingTasks entrystage.pendingTasks.clear()// First figure out the indexes of partition ids to compute.// 獲取創建task的數量val partitionsToCompute: Seq[Int] = {if (stage.isShuffleMap) {(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)} else {val job = stage.resultOfJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}}val properties = if (jobIdToActiveJob.contains(jobId)) {jobIdToActiveJob(stage.jobId).properties} else {// this stage will be assigned to "default" poolnull}// 將stage加入到runningStagesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))outputCommitCoordinator.stageStart(stage.id)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] =if (stage.isShuffleMap) {closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()} else {closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString)runningStages -= stagereturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")runningStages -= stagereturn}// 為stage創建指定數量的task// task最佳位置的計算算法// 最后一個stage的Task是ResultTask,其他的都是ShuffleMapTaskval tasks: Seq[Task[_]] = if (stage.isShuffleMap) {partitionsToCompute.map { id =>// 給每個partition創建一個task// 并計算task的最佳位置val locs = getPreferredLocs(stage.rdd, id)val part = stage.rdd.partitions(id)// 對于finalStage之外的stage,他的isshuffleMap設置為true// 所以會創建ShuffleMapTasknew ShuffleMapTask(stage.id, taskBinary, part, locs)}} else {// 不是ShuffleMapTask,那就是finalStage// finalStage,是用來創建ResultTask的val job = stage.resultOfJob.getpartitionsToCompute.map { id =>val p: Int = job.partitions(id)val part = stage.rdd.partitions(p)val locs = getPreferredLocs(stage.rdd, p)new ResultTask(stage.id, taskBinary, part, locs, id)}}if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingTasks ++= taskslogDebug("New pending tasks: " + stage.pendingTasks)// 最后通過taskScheduler提交task settaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should post// SparkListenerStageCompleted here in case there are no tasks to run.outputCommitCoordinator.stageEnd(stage.id)listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))logDebug("Stage " + stage + " is actually done; %b %d %d".format(stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))runningStages -= stage}}總結:
1、一個stage內部會有很多個task來執行
2、task的位置是根據cache和checkpoint決定的。
3、從stage的最后一個RDD開始,去查找RDD的partition上尋找是不是被cache了還是checkpoint了,如果有的話那么最佳位置就是cache或者checkpoint的partition上,因為在這個partition上,就不需要計算以前的父RDD了
4、如果既有cache還有checkpoint,那么以cache的partition為準。
5、如果沒有查找到最佳位置,那么最后由taskschedule來決定。
6、只有最后一個stage的task是resultTask,其他的都是shuffleMaptTask
總結
以上是生活随笔為你收集整理的Spark的stage划分算法源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。