Stage划分和Task最佳位置
目錄
?
1、Job Stage劃分
2、Task最佳位置
3、總結
3.1 Stage劃分總結:
3.2 Task最佳位置總結:
1、Job Stage劃分
Spark Application中因為不同的Action觸發眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是由是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。而Stage劃分的依據就是寬依賴。下面以RDD的collect方法為例:
(1)他是一個action會觸發一個具體的作業runJob
(2)runJob有很多重載方法,不斷地往里調用,最后交給dagScheduler的runJob,在dagScheduler的runJob交給了submitJob,后面還有一個等待作業結果看成功還是失敗,會有相應的動作。
(3)在submitJob中首先看一下分區長度,是因為要進行計算,這個肯定是RDD導致的action他要校驗一下是不是在運行的時候相應的Partition存在。
eventProcessLoop調用post的時候有個Jobsubmitted的參數,他是一個case class,因為一個application中可能有很多的Job,不同的job的Jobsubmitted實例不一樣所以不能用case object。他里面封裝了job的id,最后一個RDD,具體對RDD操作的函數,有哪些Partition要被計算,監聽作業狀態等。
他的核心就是將Jobsubmitted交給eventProcessLoop。他是通過post方法post給eventProcessLoop,這個post其實就是發往EventLoop里面的eventQueue
(4)發現在EventLoop里面開辟了一個線程,他是setDaemon方式作為后臺線程,因為要在后臺做不斷的循環(如果是前臺線程的話對垃圾回收是有影響的),在run方法里面會不斷的循環我們的消息隊列,從eventQueue(是一個LinkedBlockingDeque,我們可以往他里面信息)中獲得消息,調用了onReceive,發現在里面沒有具體的實現所以在DAGSchedulerEventProcessLoop中對onReceive進行了實現,這里就收到了DAGSchedulerEvent,這里面再調用doOnReceive。doOnReceive收到信息就開始處理
(5)接下來就是HandleJobSubmited。這個時候Stage就開始了。我們知道最后一個Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。
(6)發現有個getOrCreateParentStages的方法,開始創建ResultStage的父stage,里面有多個嵌套獲取shuffle依賴和循環創建shuffleMapStage,若沒有shuffle,操作則返回空list
進入到創建父Stage的方法getOrCreateParentStages,這里僅僅是抽取當前RDD的shuffle依賴,shuffleMapStage,如果不是shuffleDependency就繼續抽取父RDD,迭代遍歷一直到抽取出為止或者沒有
進入getOrCreateShuffleMapStage方法中,進行匹配能不能取到ParentStage的值,當沒有parentStage的時候會返回空,能取到就返回stage,ShuffleMapStage是根據遍歷出的ShuffleDependencies一次次創建出來的
進入createShuffleMapStage方法 此方法是遞歸循環創建shuffleMapStage的過程
這個時候ShuffleMapStage已經創建完成了,并不是一次就創建完成,而是遇見shuffle的時候會由下往上遞歸創建ShuffleMapStage
(7)構建完所有的ShuffleMapStage后,將其作為參數創建ResultStage
(8)最后將Stage和id關聯,更新job所有的Stage,并將Stage返回出去。
(9)回到handleJobsubmited方法中,finalStage構建完之后,新建一個ActiveJob保存了當前job的一些信息,打印一堆日志之類。getMissingParentStages(finalStage)根據finalStage,剛才找父Stage的時候如果有的話直接返回,如果沒有的話就會創建,所以如果曾經有就不需要再去做。listenerBus.post監聽事件,最后submitStage(finalStage)。
首先獲得id,如果jobId是defined的話再次getMissingParentStages(stage)獲得missing的stage之后判斷一下是否為空,如果為空的話就submitMissingTasks(stage, jobId.get)個就是沒有前置性的Tasks,也就是沒有父Stage。在這個底層其實是DAGScheduler把這個處理的過程交給具體的TaskScheduler去處理
2、Task最佳位置
(1)在handleJobsubmited方法中最后是最后調用submitStage,在他里面會調用submitMissingTasks
(2)這里面有很多代碼,我們要關心Stage本身的算法以及Task任務本地性把當前的Stage加進去,然后對Stage進行判斷,一種是ShuffleMapStage,一種是ResultStage。繼續往下走會看到taskIdToLocations這是關鍵的代碼,taskIdToLocations是一個Map
partitionsToCompute這里面獲得是具體的要計算的PartitionID,我們我們這邊看到的map里面的id是Partition的id。這里面匿名函數,產生的是tuple根據Partition的id。后面toMap就是Partition的id和TaskLocation的位置。
(3)進入到getPreferredLocs(stage.rdd, id),進來的是RDD,PartitionID返回的是一個集合。
再進入getPreferredLocsInternal
visited: HashSet[(RDD[_], Int)]這個HashSet開始是空,所以直接傳進來一個new HashSet,然后判斷visited如果已經有的話,那么添加就不成功,那么就是已經計算了數據本地性了,就返回Nil。
下面的cached就是已經在DAGScheduler的內存數據結構中了。進入getCacheLocs,這邊返回的是序列,cacheLocs是一個HashMap,這包含了每個RDD的Partition的id以及id對應的taskLocation,這個包含了Stage本身也包含了Stage內部任務的本地性
(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有沒有緩存根據Partition而保存的數據本地性的內容,如果不為空的話就把內容返回。然后調用下面的getpreferdLocations(如果自定義一個RDD的話是一定要寫這個方法的)
(5)最后判斷一下如果是窄依賴的話就自己調用自己
3、總結
3.1 Stage劃分總結:
(1)Action觸發Job,開始逆向分析job執行過程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分區數,其他),提交Job作業;
(2)DAGScheduler的runJob中調用submitJob并返回監聽waiter,生命周期內監聽Job狀態;
(3)在submitJob內部,將該獲取到的Job(已有JobId),插入到名為eventProcessLoop的LinkedBlockingDeque結構的事件處理隊列中;
(4)eventProcessLoop放入新事件后,調起底層的DAGSchedulerEventProcessLoop的onReceive方法;
(5)執行doOnReceive,根據DAGSchedulerEvent的具體類型如JobSubmitted事件或者MapStageSubmitted事件,調取具體的Submitted handle函數提交具體的Job;
(6)以JobSubmitted為例,在handleJobSubmitted內部,返回從ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同時開始逆向構建缺失的stage;
(7)DAG構建完畢,提交stage,submitStage(finalStage),submitStage中stage提交為tasks,submitMissingTasks(),submitMissingTasks,根據ShuffleMapStage還是ResultStage創建 ShuffleMapTask 或 ResultTask。
(7)taskScheduler.submitTasks()開始調起具體的task
3.2 Task最佳位置總結:
(1)在劃分Stage的時候submitMissingTasks方法中會有一個taskIdToLocations的屬性,他的結構為 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其對應的最佳位置
(2)在對taskIdToLocations賦值的時候會調用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]
(3)在getPreferredLocsInternal方法中
①判斷rdd的partition是否被訪問過,如果被訪問過,則什么都不做
②然后判斷DAGScheduler的內存中是否cache了在當前Paritition的信息,如果有的話直接返回
③如果沒有cache,則調用rdd.getPreferredLocations方法,獲取RDD partition的最佳位置
④遍歷RDD的依賴,如果有窄依賴,遍歷父依賴的partition,對遍歷到的每個partition,遞歸調用getPreferredLocsInternal方法
即從第一個窄依賴的第一個partition開始,然后將每個partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列
注意:DAGScheduler計算數據本地性的時候借助了RDD自身的getPreferredLocations中的數據,因為getPreferredLocations中表明了每個Partition的數據本地性,雖然當前Partition可能被persist或者checkpoint,但是persist或者checkpoint默認情況下肯定是和getPreferredLocations中的Partition的數據本地性是一致的,所以這就極大的簡化Task數據本地性算法的實現和效率的優化。
總結
以上是生活随笔為你收集整理的Stage划分和Task最佳位置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 刷机后如何升级android,手机系统怎
- 下一篇: c语言结构内部定义指针,C语言知识补漏(