Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交
Task提交
在上一節(jié)中的 Stage提交中我們提到,最終stage被封裝成TaskSet,使用taskScheduler.submitTasks提交,具體代碼如下:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))Stage由一系列的tasks組成,這些task被封裝成TaskSet,TaskSet類定義如下:
/*** A set of tasks submitted together to the low-level TaskScheduler, usually representing* missing partitions of a particular stage.*/ private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val stageAttemptId: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + stageAttemptIdoverride def toString: String = "TaskSet " + id }submitTasks方法定義在TaskScheduler Trait當中,目前TaskScheduler 只有一個子類TaskSchedulerImpl,其submitTasks方法源碼如下:
//TaskSchedulerImpl類中的submitTasks方法 override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//創(chuàng)建TaskSetManager,TaskSetManager用于對TaskSet中的Task進行調(diào)度,包括跟蹤Task的運行、Task失敗重試等val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//schedulableBuilder中添加TaskSetManager,用于完成所有TaskSet的調(diào)度,即整個Spark程序生成的DAG圖對應(yīng)Stage的TaskSet調(diào)度schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//為Task分配運行資源backend.reviveOffers()}SchedulerBackend有多種實現(xiàn),如下圖所示:
我們以SparkDeploySchedulerBackend為例進行說明,SparkDeploySchedulerBackend繼承自CoarseGrainedSchedulerBackend中的reviveOffers方法,具有代碼如下:
driverEndpoint的類型是RpcEndpointRef
//CoarseGrainedSchedulerBackend中的成員變量driverEndpoint var driverEndpoint: RpcEndpointRef = null它具有如下定義形式:
//RpcEndpointRef是遠程RpcEndpoint的引用,它是一個抽象類,有一個子類AkkaRpcEndpointRef /*** A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.*/ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)extends Serializable with Logging //在底層采用的是Akka進行實現(xiàn) private[akka] class AkkaRpcEndpointRef(@transient defaultAddress: RpcAddress,@transient _actorRef: => ActorRef,@transient conf: SparkConf,@transient initInConstructor: Boolean = true)extends RpcEndpointRef(conf) with Logging {lazy val actorRef = _actorRefoverride lazy val address: RpcAddress = {val akkaAddress = actorRef.path.addressRpcAddress(akkaAddress.host.getOrElse(defaultAddress.host),akkaAddress.port.getOrElse(defaultAddress.port))}override lazy val name: String = actorRef.path.nameprivate[akka] def init(): Unit = {// Initialize the lazy valsactorRefaddressname}if (initInConstructor) {init()}override def send(message: Any): Unit = {actorRef ! AkkaMessage(message, false)} //其它代碼省略DriverEndpoint中的receive方法接收driverEndpoint.send(ReviveOffers)發(fā)來的消息,DriverEndpoint繼承了ThreadSafeRpcEndpoint trait,具體如下:
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])extends ThreadSafeRpcEndpoint with LoggingThreadSafeRpcEndpoint 繼承 RpcEndpoint trait,RpcEndpoint對receive方法進行了描述,具體如下:
/*** Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a* unmatched message, [[SparkException]] will be thrown and sent to `onError`.*/def receive: PartialFunction[Any, Unit] = {case _ => throw new SparkException(self + " does not implement 'receive'")}DriverEndpoint 中的對其receive方法進行了重寫,具體實現(xiàn)如下:
override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}//重要!處理發(fā)送來的ReviveOffers消息case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}從上面的代碼可以看到,處理ReviveOffers消息時,調(diào)用的是makeOffers方法
// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killing//所有可用的Executorval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))//WorkOffer表示Executor上可用的資源,val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq//先調(diào)用TaskSchedulerImpl的resourceOffers方法,為Task的運行分配資源//再調(diào)用CoarseGrainedSchedulerBackend中的launchTasks方法啟動Task的運行,最終Task被提交到Worker節(jié)點上的Executor上運行launchTasks(scheduler.resourceOffers(workOffers))}上面的代碼邏輯全部是在Driver端進行的,調(diào)用完launchTasks方法后,Task的執(zhí)行便在Worker節(jié)點上運行了,至此完成Task的提交。
關(guān)于resourceOffers方法及l(fā)aunchTasks方法的具體內(nèi)容,在后續(xù)章節(jié)中將進行進一步的解析。
總結(jié)
以上是生活随笔為你收集整理的Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mysql配置优化浅谈
- 下一篇: 小、快、灵:康宁称雄光通信市场的秘诀