spark 源码分析之二十 -- Stage的提交
引言
上篇?spark 源碼分析之十九 -- DAG的生成和Stage的劃分?中,主要介紹了下圖中的前兩個(gè)階段DAG的構(gòu)建和Stage的劃分。
本篇文章主要剖析,Stage是如何提交的。
rdd的依賴關(guān)系構(gòu)成了DAG,DAGScheduler根據(jù)shuffle依賴關(guān)系將DAG圖劃分為一個(gè)一個(gè)小的stage。具體可以看?spark 源碼分析之十九 -- DAG的生成和Stage的劃分?做進(jìn)一步了解。
緊接上篇文章
上篇文章中,DAGScheduler的handleJobSubmitted方法我們只剖析了stage的生成部分,下面我們看一下stage的提交部分源碼。
提交Stage的思路
首先構(gòu)造ActiveJob對(duì)象,其次清除緩存的block location信息,然后記錄jobId和job對(duì)象的映射關(guān)系到j(luò)obIdToActiveJob map集合中,并且將該jobId記錄到活動(dòng)的job集合中。
獲取到Job所有的stage的唯一標(biāo)識(shí),并且根據(jù)唯一標(biāo)識(shí)來(lái)獲取stage對(duì)象,并且調(diào)用其lastestInfo方法獲取其StageInfo對(duì)象。
然后進(jìn)一步封裝成?SparkListenerJobStart 事件對(duì)象,并post到 listenerBus中,listenerBus 是一個(gè)?LiveListenerBus 對(duì)象,其內(nèi)部封裝了四個(gè)消息隊(duì)列組成的集合,具體可以看?spark 源碼分析之三 -- LiveListenerBus介紹?文章做進(jìn)一步了解。
最后調(diào)用submitStage 方法執(zhí)行Stage的提交。
先來(lái)看一下ActiveJob的說(shuō)明。
ActiveJob
類說(shuō)明
A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a ResultStage to execute an action, or a map-stage job, which computes the map outputs for a ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive query planning, to look at map output statistics before submitting later stages. We distinguish between these two types of jobs using the finalStage field of this class. Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's submitJob or submitMapStage methods. However, either type of job may cause the execution of other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of these previous stages. These dependencies are managed inside DAGScheduler.
它代表了正運(yùn)行在DAGScheduler中的一個(gè)job,job有兩種類型:result job,其通過(guò)計(jì)算一個(gè)ResultStage來(lái)執(zhí)行一個(gè)action操作;map-stage job,它在下游的stage提交之前,為ShuffleMapStage計(jì)算map的輸出。
構(gòu)造方法
finalStages是這個(gè)job的最后一個(gè)stage。
提交Stage前的準(zhǔn)備
直接先來(lái)看submitStage方法,如下:
思路: 首先先獲取可能丟失的父stage信息,如果該stage的父stage被遺漏了,則遞歸調(diào)用查看其爺爺stage是否被遺漏。
查找遺漏父Stage
getMissingParentStages方法如下:
思路:不斷創(chuàng)建父stage,可以看上篇文章?spark 源碼分析之十九 -- DAG的生成和Stage的劃分?做進(jìn)一步了解。
提交Stage
submitMissingTasks方法過(guò)于長(zhǎng),為方便分析,按功能大致分為如下部分:
獲取Stage需要計(jì)算的partition信息
org.apache.spark.scheduler.ResultStage#findMissingPartitions 方法如下:
?org.apache.spark.scheduler.ShuffleMapStage#findMissingPartitions 方法如下:
org.apache.spark.MapOutputTrackerMaster#findMissingPartitions 方法如下:
將stage和分區(qū)記錄到OutputCommitCoordinator中
?OutputCommitCoordinator 的 stageStart實(shí)現(xiàn)如下:
本質(zhì)上就是把它放入到一個(gè)map中了。
?
獲取分區(qū)的優(yōu)先位置
?
思路:根據(jù)stage的RDD和分區(qū)id獲取到其rdd中的分區(qū)的優(yōu)先位置。
下面看一下?getPreferredLocs 方法:
?
?
注釋中說(shuō)到,它是線程安全的,下面看一下,它是如何實(shí)現(xiàn)的,即 getPrefferredLocsInternal 方法。
這個(gè)方法中提到四種情況:
1. 如果之前獲取到過(guò),那么直接返回Nil即可。
2. 如果之前已經(jīng)緩存在內(nèi)存中,直接從緩存的內(nèi)存句柄中取出返回即可。
3. 如果RDD對(duì)應(yīng)的是HDFS輸入的文件等,則使用RDD記錄的優(yōu)先位置。
4. 如果上述三種情況都不滿足,且是narrowDependency,則調(diào)用該方法,獲取子RDDpartition對(duì)應(yīng)的父RDD的partition的優(yōu)先位置。
下面仔細(xì)說(shuō)一下中間兩種情況。
從緩存中取
getCacheLocs 方法如下:
思路:先查看rdd的存儲(chǔ)級(jí)別,如果沒(méi)有存儲(chǔ)級(jí)別,則直接返回Nil,否則根據(jù)RDD和分區(qū)id組成BlockId集合,請(qǐng)求存儲(chǔ)系統(tǒng)中的BlockManager來(lái)獲取block的位置,然后轉(zhuǎn)換為T(mén)askLocation信息返回。
獲取RDD的優(yōu)先位置
RDD的?preferredLocations 方法如下:
思路:先從checkpoint中找,如果checkpoint中沒(méi)有,則返回默認(rèn)的為Nil。
?
返回對(duì)象是TaskLocation對(duì)象,做一下簡(jiǎn)單的說(shuō)明。
TaskLocation
類說(shuō)明
A location where a task should run. This can either be a host or a (host, executorID) pair. In the latter case, we will prefer to launch the task on that executorID, but our next level of preference will be executors on the same host if this is not possible.
它有三個(gè)子類,如下:
這三個(gè)類定義如下:
很簡(jiǎn)單,不做過(guò)多說(shuō)明。
TaskLocation伴隨對(duì)象如下,現(xiàn)在用的方法是第二種 apply 方法:
創(chuàng)建新的StageInfo?
對(duì)應(yīng)方法如下:
org.apache.spark.scheduler.Stage#makeNewStageAttempt 方法如下:
很簡(jiǎn)單,主要是調(diào)用了StageInfo的fromStage方法。
先來(lái)看Stage類。
StageInfo
StageInfo封裝了關(guān)于Stage的一些信息,用于調(diào)度和SparkListener傳遞stage信息。
其伴生對(duì)象如下:
廣播要執(zhí)行task函數(shù)
對(duì)應(yīng)源碼如下:
通過(guò)broadcast機(jī)制,將數(shù)據(jù)廣播到spark集群中的driver和各個(gè)executor中。關(guān)于broadcast的實(shí)現(xiàn)細(xì)節(jié),可以查看?spark 源碼分析之十四 -- broadcast 是如何實(shí)現(xiàn)的?做進(jìn)一步了解。
生成Task集合
根據(jù)stage的類型生成不同的類型Task。關(guān)于過(guò)多Task 的內(nèi)容,在階段四進(jìn)行剖析。
TaskScheduler提交TaskSet
對(duì)應(yīng)代碼如下:
其中taskScheduler是 TaskSchedulerImpl,它是TaskScheduler的唯一子類實(shí)現(xiàn)。它負(fù)責(zé)task的調(diào)度。
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks方法實(shí)現(xiàn)如下:
其中 createTaskSetManager 方法如下:
SchedulableBuilder類是構(gòu)建Schedulable樹(shù)的接口。
schedulableBuilder 定義如下:
其中schedulingMode 可以通過(guò)參數(shù)?spark.scheduler.mode 來(lái)調(diào)整,默認(rèn)為FIFO。
schedulableBuilder 初始化如下:
schedulableBuilder的 addTaskSetManager (FIFO)方法如下:
即調(diào)用了內(nèi)部Pool對(duì)象的addSchedulable 方法:
?
?
關(guān)于更多TaskSetManager的內(nèi)容,將在階段四進(jìn)行剖析。
backend是一個(gè)?SchedulerBackend 實(shí)例。在SparkContetx的初始化過(guò)程中調(diào)用 createTaskScheduler 初始化 backend,具體可以看?spark 源碼分析之四 -- TaskScheduler的創(chuàng)建和啟動(dòng)過(guò)程?做深入了解。
在yarn 模式下,它有兩個(gè)實(shí)現(xiàn)yarn-client 模式下的?org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend實(shí)現(xiàn) 和 yarn-cluster 模式下的?org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 實(shí)現(xiàn)。
這兩個(gè)類在spark 項(xiàng)目的 resource-managers 目錄下的 yarn 目錄下定義實(shí)現(xiàn),當(dāng)然它也支持 kubernetes 和 mesos,不做過(guò)多說(shuō)明。
這兩個(gè)類的繼承關(guān)系如下:
?
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers 實(shí)現(xiàn)如下:
發(fā)送ReviveOffers 請(qǐng)求給driver。
driver端的?CoarseGrainedSchedulerBackend 的 receive 方法有如下事件處理分支:
其內(nèi)部經(jīng)過(guò)一系列RPC過(guò)程,關(guān)于 RPC 可以看?spark 源碼分析之十二--Spark RPC剖析之Spark RPC總結(jié)?做進(jìn)一步了解。
即會(huì)調(diào)用driver端的makeOffsers方法,如下:
總結(jié)
本篇文章剖析了從DAGScheduler生成的Stage是如何被提交給TaskScheduler,以及TaskScheduler是如何把TaskSet提交給ResourceManager的。
下面就是task的運(yùn)行部分了,下篇文章對(duì)其做詳細(xì)介紹。跟task執(zhí)行關(guān)系很密切的TaskSchedulerBackend、Task等內(nèi)容,也將在下篇文章做更詳細(xì)的說(shuō)明。
轉(zhuǎn)載于:https://www.cnblogs.com/johnny666888/p/11251642.html
總結(jié)
以上是生活随笔為你收集整理的spark 源码分析之二十 -- Stage的提交的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 房车是什么?
- 下一篇: 车头插红旗的车是什么背景