Spark任务调度流程及调度策略分析
Spark任務調度
TaskScheduler調度入口:
(1)?????? CoarseGrainedSchedulerBackend 在啟動時會創建DriverEndPoint. 而DriverEndPoint中存在一定時任務,每隔一定時間(spark.scheduler.revive.interval, 默認為1s)進行一次調度(給自身發送ReviveOffers消息, 進行調用makeOffers進行調度)。代碼如下所示
override def onStart() {// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}?
(2)當Executor執行完成已分配任務時,會向Driver發送StatusUpdate消息,當Driver接收到消后會調用 makeOffers(executorId)方法,進行任務調度, CoarseGrainedExecutorBackend 狀態變化時向Driver (DriverEndPoint)向送StatusUpdate消息
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}Dirver接收到StatusUpdate消息時將會觸發設調度(makeOffers),為完成任務的Executor分配任務。
override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}
其中makeOffers方法,會調用TaskSchedulerImpl中的resourceOffers方法,依其的調度策略為Executor分配適合的任務。具體代碼如下:
a、為所有資源分配任務
// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))}b、為單個executor分配任務
// Make fake resource offers on just one executorprivate def makeOffers(executorId: String) {// Filter out executors under killingif (!executorsPendingToRemove.contains(executorId)) {val executorData = executorDataMap(executorId)val workOffers = Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))launchTasks(scheduler.resourceOffers(workOffers))}}分配完任務后,向Executor發送LaunchTask指令,啟動任務,執行用戶邏輯代碼
// Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}} View Code?
Spark任務調度策略
ò? FIFO
FIFO(先進先出)方式調度Job,如下圖所示,每個Job被切分成多個Stage.第一個Job優先獲取所有可用資源,接下來第二個Job再獲取剩余可用資源。(每個Stage對應一個TaskSetManager)
?
ò? FAIR
FAIR共享模式調度下,Spark以在多Job之間輪詢方式為任務分配資源,所有的任務擁有大致相當的優先級來共享集群的資源。FAIR調度模型如下圖:
?
?
下面從源碼的角度對調度策略進行說明:
當觸發調度時,會調用TaskSchedulerImpl的resourceOffers方法,方法中會依照調度策略選出要執行的TaskSet, 然后取出適合(考慮本地性)的task交由Executor執行, 其代碼如下:
/*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) {executorIdToHost(o.executorId) = o.hostactiveExecutorIds += o.executorIdif (!executorsByHost.contains(o.host)) {executorsByHost(o.host) = new HashSet[String]()executorAdded(o.executorId, o.host)newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// Randomly shuffle offers to avoid always placing tasks on the same set of workers.val shuffledOffers = Random.shuffle(offers)// Build a list of tasks to assign to each worker.val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYvar launchedTask = falsefor (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {do {launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)} while (launchedTask)}if (tasks.size > 0) {hasLaunchedTask = true}return tasks} View Code
經過分析可知,通過rootPool.getSortedTaskSetQueue對隊列中的TaskSet進行排序,getSortedTaskSetQueue的具體實現如下:
由上述代碼可知,其通過算法做為比較器對taskSet進行排序, 其中調度算法有FIFO和FAIR兩種,下面分別進行介紹。
FIFO
???????? 優先級(Priority): 在DAGscheduler創建TaskSet時使用JobId做為優先級的值。
FIFO調度算法實現如下所示
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}} } View Code?由源碼可知,FIFO依據JobId進行挑選較小值。因為越早提交的作業,JobId越小。
對同一個作業(Job)來說越先生成的Stage,其StageId越小,有依賴關系的多個Stage之間,DAGScheduler會控制Stage是否會被提交到調度隊列中(若其依賴的Stage未執行完前,此Stage不會被提交),其調度順序可通過此來保證。但若某Job中有兩個無入度的Stage的話,則先調度StageId小的Stage.
Fair
??? Fair調度隊列相比FIFO較復雜,其可存在多個調度隊列,且隊列呈樹型結構(現階段Spark的Fair調度只支持兩層樹結構),每用戶可以使用sc.setLocalProperty(“spark.scheduler.pool”, “poolName”)來指定要加入的隊列,默認情況下會加入到buildDefaultPool。每個隊列中還可指定自己內部的調度策略,且Fair還存在一些特殊的屬性:
schedulingMode: 設置調度池的調度模式FIFO或FAIR, 默認為FIFO.
minShare:最少資源保證量,當一個隊列最少資源未滿足時,它將優先于其它同級隊列獲取資源。
weight: 在一個隊列內部分配資源時,默認情況下,采用公平輪詢的方法將資源分配給各個應用程序,而該參數則將打破這種平衡。例如,如果用戶配置一個指定調度池權重為2, 那么這個調度池將會獲得相對于權重為1的調度池2倍的資源。
以上參數,可通過conf/fairscheduler.xml文件配置調度池的屬性。
Fair調度算法實現如下所示:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDoubleval minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDoubleval taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare: Int = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}} } View Code由原碼可知,未滿足minShare規定份額的資源的隊列或任務集先執行;如果所有均不滿足minShare的話,則選擇缺失比率小的先調度;如果均不滿足,則按執行權重比進行選擇,先調度執行權重比小的。如果執行權重也相同的話則會選擇StageId小的進行調度(name=“TaskSet_”+ taskSet.stageId.toString)。
以此為標準將所有TaskSet進行排序, 然后選出優先級最高的進行調度。
Spark 任務調度之任務本地性
當選出TaskSet后,將按本地性從中挑選適合Executor的任務,在Executor上執行。
(詳細見http://www.cnblogs.com/barrenlake/p/4550800.html一小節相關內容)
?
文章地址: http://www.cnblogs.com/barrenlake/p/4891589.html
?
?
轉載于:https://www.cnblogs.com/barrenlake/p/4891589.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Spark任务调度流程及调度策略分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bootstrap中图片的一些小事情
- 下一篇: jdbc链接数据库mysql