Spark源码分析 -- SchedulableBuilder
SchedulableBuilder就是對Scheduleable tree的封裝, 
在Pool層面(中間節點), 完成對TaskSet的調度(FIFO, FAIR) 
在TaskSetManager 層面(葉子節點), 完成對TaskSet中task的調度(locality)以及track(retry)
TaskSetManager
用于封裝TaskSet, 主要提供對單個TaskSet內部的tasks的track和schedule 
所以主要的接口, 
resourceOffer, 對于一個resource offer, 如何schedule一個task來執行 
statusUpdate, 對于task狀態的track
?
ClusterTaskSetManager
ClusterScheduler上對于TaskSetManager的實現
1 addPendingTask 
locality, 在schedule時候需要考慮, 應該優先執行盡可能近的task 
所有未被執行的tasks, 都是pending task, 并且是安裝不同locality粒度存儲在hashmap中的 
pendingTasksForExecutor, hashmap, 每個executor被指定的task 
pendingTasksForHost,? hashmap, 每個instance被指定的task 
pendingTasksForRack, hashmap, 每個機架被指定的task 
pendingTasksWithNoPrefs, ArrayBuffer, 沒有locality preferences的tasks, 隨便在那邊執行 
allPendingTasks, ArrayBuffer, 所有的pending task 
speculatableTasks, 重復的task, 熟悉hadoop的應該容易理解 
可以繼續看下addPendingTask, 如何把task加到各個list上去
addPendingTask(index: Int, readding: Boolean = false) 
兩個參數, 
index, task的index, 用于從taskset中取得task 
readding, 表示是否新的task, 因為當executor失敗的時候, 也需要把task重新再加到各個list中, list中有重復的task是沒有關系的, 因為選取task的時候會自動忽略已經run的task
?
2 resourceOffer 
解決如何在taskset內部schedule一個task, 主要需要考慮的是locality, 直接看注釋 
其中比較意思的是, 對currentLocalityIndex的維護 
初始時為0, PROCESS_LOCAL, 只能選擇PendingTasksForExecutor 
每次調用resourceOffer, 都會計算和前一次task launch之間的時間間隔, 如果超時(各個locality的超時時間不同), currentLocalityIndex會加1, 即不斷的放寬 
而代表前一次的lastLaunchTime, 只有在resourceOffer中成功的findTask時會被更新, 所以邏輯就是優先選擇更local的task, 但當findTask總失敗時, 說明需要放寬 
但是放寬后, 當有比較local的task被選中時, 這個currentLocalityIndex還會縮小, 因為每次都會把tasklocality賦值給currentLocality
?
3 statusUpdate 
應對statusUpdate, 主要是通過在clusterScheduler中注冊的listener通知DAGScheduler 
當然對于失敗的task, 還要再加到pending list里面去
// Set of pending tasks for each executor. These collections are actually// treated as stacks, in which new tasks are added to the end of the// ArrayBuffer and removed from the end. This makes it faster to detect// tasks that repeatedly fail because whenever a task failed, it is put// back at the head of the stack. They are also only cleaned up lazily;// when a task is launched, it remains in all the pending lists except// the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]// Set of pending tasks for each host. Similar to pendingTasksForExecutor,// but at host level.private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]// Set of pending tasks for each rack -- similar to the above.private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]// Set containing pending tasks with no locality preferences.val pendingTasksWithNoPrefs = new ArrayBuffer[Int]// Set containing all pending tasks (also used as a stack, as above).val allPendingTasks = new ArrayBuffer[Int]// Tasks that can be speculated. Since these will be a small fraction of total// tasks, we'll just hold them in a HashSet.val speculatableTasks = new HashSet[Int] ? // Figure out which locality levels we have in our TaskSet, so we can do delay schedulingval myLocalityLevels = computeValidLocalityLevels() // 當前TaskSet里面的task locality有哪些val localityWaits = myLocalityLevels.map(getLocalityWait) // 每個locality level默認的等待時間(從配置讀)// Delay scheduling variables: we keep track of our current locality level and the time we// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.// We then move down if we manage to launch a "more local" task.var currentLocalityIndex = 0 // 當前myLocalityLevels中的index, 從0開始, 從最小的開始schedulevar lastLaunchTime = clock.getTime() // 記錄最后launch task的時間, 用于后面會算超時, 如果發生超時, currentLocalityIndex+1 ? /*** Add a task to all the pending-task lists that it should be on. If readding is set, we are* re-adding the task so only include it in each list if it's not already there.*/private def addPendingTask(index: Int, readding: Boolean = false) {// Utility method that adds `index` to a list only if readding=false or it's not already theredef addTo(list: ArrayBuffer[Int]) {if (!readding || !list.contains(index)) { // 新的的task或在該list里面沒有list += index}}var hadAliveLocations = falsefor (loc <- tasks(index).preferredLocations) {for (execId <- loc.executorId) {if (sched.isExecutorAlive(execId)) {addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) // 首先加到相應的executor列表中hadAliveLocations = true}}if (sched.hasExecutorsAliveOnHost(loc.host)) {addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) // 加到host的列表中 for (rack <- sched.getRackForHost(loc.host)) {addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) // 加到Rack的列表中}hadAliveLocations = true}}if (!hadAliveLocations) { // 如果上面的選擇都失敗了, 或本來就沒有preferred locations, 那就加到pendingTasksWithNoPrefs中// Even though the task might've had preferred locations, all of those hosts or executors// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.addTo(pendingTasksWithNoPrefs)}if (!readding) { // 對于新的task, 需要加到allPendingTasks中allPendingTasks += index // No point scanning this whole list to find the old task there}} ? /*** Dequeue a pending task for a given node and return its index and locality level.* Only search for tasks matching the given locality constraint.*/private def findTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] ={// 先從Executor for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { // findTaskFromList, Dequeue a pending task from the given list and return its index.return Some((index, TaskLocality.PROCESS_LOCAL))}// Node, 需要先check localityif (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { // locality >= TaskLocality.NODE_LOCAL for (index <- findTaskFromList(getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL))}}// Rack, 需要先check locality if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- findTaskFromList(getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL))}}// Look for no-pref tasks after rack-local tasks since they can run anywhere.for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL))}if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {for (index <- findTaskFromList(allPendingTasks)) {return Some((index, TaskLocality.ANY))}}// Finally, if all else has failed, find a speculative taskreturn findSpeculativeTask(execId, host, locality)}
?
/*** Respond to an offer of a single executor from the scheduler by finding a task*/override def resourceOffer(execId: String,host: String,availableCpus: Int,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { // 前提是task沒有執行完和有足夠的available cores(>1)val curTime = clock.getTime()var allowedLocality = getAllowedLocalityLevel(curTime) // 取到當前allowed LocalityLevelif (allowedLocality > maxLocality) { // 不能超出作為參數傳入的maxLocality, 調用者限定allowedLocality = maxLocality // We're not allowed to search for farther-away tasks}findTask(execId, host, allowedLocality) match { // 調用findTask, 并對返回值進行case, findTask邏輯很簡單就是依次從不同的locality中取taskcase Some((index, taskLocality)) => {// Found a task; do some bookkeeping and return a task descriptionval task = tasks(index)val taskId = sched.newTaskId()// Figure out whether this should count as a preferred launchlogInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(taskSet.id, index, taskId, execId, host, taskLocality))// Do various bookkeepingcopiesRunning(index) += 1val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)taskInfos(taskId) = infotaskAttempts(index) = info :: taskAttempts(index)// Update our locality level for delay schedulingcurrentLocalityIndex = getLocalityIndex(taskLocality) // 用當前Task的locality來更新currentLocalityIndex, 這里index有可能會減少, 因為taskLocality <= currentLocality lastLaunchTime = curTime // 更新lastLaunchTime // Serialize and return the taskval startTime = clock.getTime()// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here// we assume the task can be serialized without exceptions.val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)val timeTaken = clock.getTime() - startTimeincreaseRunningTasks(1)logInfo("Serialized task %s:%d as %d bytes in %d ms".format(taskSet.id, index, serializedTask.limit, timeTaken))val taskName = "task %s:%d".format(taskSet.id, index)if (taskAttempts(index).size == 1)taskStarted(task,info)return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) // 最終返回schedule得到的那個task}case _ =>}}return None} ? /*** Get the level we can launch tasks according to delay scheduling, based on current wait time.*/private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) && // 發生超時currentLocalityIndex < myLocalityLevels.length - 1){// Jump to the next locality level, and remove our waiting time for the current one since// we don't want to count it again on the next onelastLaunchTime += localityWaits(currentLocalityIndex)currentLocalityIndex += 1 // currentLocalityIndex 加 1}myLocalityLevels(currentLocalityIndex)} ? /*** Find the index in myLocalityLevels for a given locality. This is also designed to work with* localities that are not in myLocalityLevels (in case we somehow get those) by returning the* next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.*/def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = { // 查詢locality在myLocalityLevels中的indexvar index = 0while (locality > myLocalityLevels(index)) {index += 1}index} /*** Compute the locality levels used in this TaskSet. Assumes that all tasks have already been* added to queues using addPendingTask.*/ // 僅僅從各個pending list中看看當前的taskset中的task有哪些preference locality, 從小到大 private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}val levels = new ArrayBuffer[TaskLocality.TaskLocality]if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {levels += PROCESS_LOCAL}if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {levels += NODE_LOCAL}if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {levels += RACK_LOCAL}levels += ANYlogDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))levels.toArray} /** Called by cluster scheduler when one of our tasks changes state */override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {SparkEnv.set(env)state match {case TaskState.FINISHED =>taskFinished(tid, state, serializedData)case TaskState.LOST =>taskLost(tid, state, serializedData)case TaskState.FAILED =>taskLost(tid, state, serializedData)case TaskState.KILLED =>taskLost(tid, state, serializedData)case _ =>}} def taskStarted(task: Task[_], info: TaskInfo) {sched.listener.taskStarted(task, info) } }?
Pool
一種對schedulableQueue的抽象, 什么是schedulable? 
注釋說的, 包含Pools and TaskSetManagers, 這里設計有問題, 你會發現Pools和TaskSetManagers的核心接口完全不同, 雖然TaskSetManagers里面也實現了這些接口, 但都是meanless的 
簡單理解成, 作者想要統一對待, 泛化Pools和TaskSetManagers, 所以這樣做了
所以對于Pool, 可以理解為TaskSetManagers的容器, 當然由于Pool本身也是Schedulable, 所以容器里面也可以放Pool 
核心接口getSortedTaskSetQueue, 通過配置不同的SchedulingAlgorithm來調度TaskSetManagers(或pool)
所以注意那些FIFO或FAIR都是用來調度TaskSet的, 所以Spark調度的基礎是stage
/*** An interface for schedulable entities.* there are two type of Schedulable entities(Pools and TaskSetManagers)*/ private[spark] trait Schedulable {var parent: Schedulable// child queuesdef schedulableQueue: ArrayBuffer[Schedulable]def schedulingMode: SchedulingModedef weight: Intdef minShare: Intdef runningTasks: Intdef priority: Intdef stageId: Intdef name: Stringdef increaseRunningTasks(taskNum: Int): Unitdef decreaseRunningTasks(taskNum: Int): Unitdef addSchedulable(schedulable: Schedulable): Unitdef removeSchedulable(schedulable: Schedulable): Unitdef getSchedulableByName(name: String): Schedulabledef executorLost(executorId: String, host: String): Unitdef checkSpeculatableTasks(): Booleandef getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]def hasPendingTasks(): Boolean }?
package org.apache.spark.scheduler.cluster //An Schedulable entity that represent collection of Pools or TaskSetManagers private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode,initMinShare: Int,initWeight: Int)extends Schedulablewith Logging {var schedulableQueue = new ArrayBuffer[Schedulable] // 用于buffer Schedulable, TaskSetManagervar schedulableNameToSchedulable = new HashMap[String, Schedulable]var priority = 0var stageId = 0var name = poolNamevar parent:Schedulable = nullvar taskSetSchedulingAlgorithm: SchedulingAlgorithm = { // SchedulingAlgorithm其實就是定義comparator,后面好將TaskSet排序schedulingMode match {case SchedulingMode.FAIR => new FairSchedulingAlgorithm() // Faircase SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm() // FIFO}}override def addSchedulable(schedulable: Schedulable) { // 增加一個TaskSetManagerschedulableQueue += schedulableschedulableNameToSchedulable(schedulable.name) = schedulableschedulable.parent= this}override def removeSchedulable(schedulable: Schedulable) { // 刪除一個TaskSetManager schedulableQueue -= schedulableschedulableNameToSchedulable -= schedulable.name}override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { // 返回排過序的TaskSetManager列表var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) // sortWith for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() // 這里的schedulable有可能也是pool, 所以需要遞歸調用}return sortedTaskSetQueue} }?
SchedulableBuilder
上面說了Pool里面可以是TaskSetManagers也可以是pool, 這樣是不是可以形成tree 
SchedulableBuilder就是對Schedulable Tree的封裝, 通過TaskSetManagers(葉節點)和pools(中間節點), 來生成Schedulable Tree 
這里只列出最簡單的FIFO, 看不出tree的感覺 
對于FIFO很簡單, 直接使用一個Pool就可以, 把所有的TaskSet使用addSchedulable加進去, 然后排序讀出來即可
這里沒有列出Fair的實現, 比較復雜, 后面再分析吧
/*** An interface to build Schedulable tree* buildPools: build the tree nodes(pools)* addTaskSetManager: build the leaf nodes(TaskSetManagers)*/ private[spark] trait SchedulableBuilder {def buildPools()def addTaskSetManager(manager: Schedulable, properties: Properties) }private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)extends SchedulableBuilder with Logging {override def buildPools() {// nothing}override def addTaskSetManager(manager: Schedulable, properties: Properties) {rootPool.addSchedulable(manager)} }轉載于:https://www.cnblogs.com/fxjwind/p/3507307.html
總結
以上是生活随笔為你收集整理的Spark源码分析 -- SchedulableBuilder的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java-eclipse快捷键及设置
- 下一篇: 《设计模式》-模板模式
