Spark技术内幕: Task向Executor提交的源代码解析
在上文《Spark技術內幕:Stage劃分及提交源代碼分析》中,我們分析了Stage的生成和提交。可是Stage的提交,僅僅是DAGScheduler完畢了對DAG的劃分,生成了一個計算拓撲,即須要依照順序計算的Stage,Stage中包括了能夠以partition為單位并行計算的Task。我們并沒有分析Stage中得Task是怎樣生成而且終于提交到Executor中去的。
這就是本文的主題。
從org.apache.spark.scheduler.DAGScheduler#submitMissingTasks開始,分析Stage是怎樣生成TaskSet的。
假設一個Stage的全部的parent stage都已經計算完畢或者存在于cache中。那么他會調用submitMissingTasks來提交該Stage所包括的Tasks。
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的計算流程例如以下:
pipeline。能夠稱為大數據處理的基石。僅僅有數據進行pipeline處理,才干將其放到集群中去執行。
對于一個task來說,它從數據源獲得邏輯。然后依照拓撲順序,順序執行(實際上是調用rdd的compute)。
TaskSet是一個數據結構,存儲了這一組task:private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val attempt: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + attemptoverride def toString: String = "TaskSet " + id }管理調度這個TaskSet的時org.apache.spark.scheduler.TaskSetManager。TaskSetManager會負責task的失敗重試。跟蹤每一個task的執行狀態。處理locality-aware的調用。具體的調用堆棧例如以下:
首先看一下org.apache.spark.executor.Executor#launchTask: def launchTask(context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {val tr = new TaskRunner(context, taskId, taskName, serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr) // 開始在executor中執行}
TaskRunner會從序列化的task中反序列化得到task。這個須要看?org.apache.spark.executor.Executor.TaskRunner#run 的實現:task.run(taskId.toInt)。而task.run的實現是: final def run(attemptId: Long): T = {context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)context.taskMetrics.hostname = Utils.localHostName()taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}runTask(context)}
對于原來提到的兩種Task,即
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)metrics = Some(context.taskMetrics)try {func(context, rdd.iterator(partition, context))} finally {context.markTaskCompleted()}}
而org.apache.spark.scheduler.ShuffleMapTask#runTask則是寫shuffle的結果。
override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)//此處的taskBinary即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的metrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 將rdd計算的結果寫入memory或者diskreturn writer.stop(success = true).get} catch {case e: Exception =>if (writer != null) {writer.stop(success = false)}throw e} finally {context.markTaskCompleted()}}
這兩個task都不要依照拓撲順序調用rdd的compute來完畢對partition的計算。不同的是ShuffleMapTask須要shuffle write。以供child stage讀取shuffle的結果。
對于這兩個task都用到的taskBinary,即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的。
通過上述幾篇博文,實際上我們已經粗略的分析了從用戶定義SparkContext開始。集群是假設為每一個Application分配Executor的,回想一下這個序列圖:
還有就是用戶觸發某個action,集群是怎樣生成DAG,假設將DAG劃分為能夠成Stage,已經Stage是怎樣將這些能夠pipeline執行的task提交到Executor去執行的。當然了,具體細節還是很值得推敲的。
以后的每一個周末。都會奉上某個細節的實現。
歇息了。明天又會開始忙碌的一周。轉載于:https://www.cnblogs.com/llguanli/p/8601055.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Spark技术内幕: Task向Executor提交的源代码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bl小说里面有个机器人管家_机器人也有攀
- 下一篇: 安全可靠的透明加密软件