spark 调度模块详解及源码分析
spark 調(diào)度模塊詳解及源碼分析
@(SPARK)[spark]
- spark 調(diào)度模塊詳解及源碼分析
 - 一概述
- 一三個(gè)主要的類
- 1class DAGScheduler
 - 2trait TaskScheduler
 - 3trait SchedulerBackend
 
 - 二基本流程
 - 三TaskScheduler SchedulerBackend
 
 - 一三個(gè)主要的類
 - 二DAGScheduler
- 一用戶代碼中創(chuàng)建SparkContext對(duì)象SparkContext中創(chuàng)建DAGScheduler與TaskSchedulerTaskSchedulerBackend對(duì)象
- 1用戶代碼中創(chuàng)建SparkContext對(duì)象
 - 2SparkContext源碼簡(jiǎn)單分析
 - 3SparkContext創(chuàng)建DAGScheduler與TaskSchedulerTaskSchedulerBackend對(duì)象
 - 4createTaskScheduler創(chuàng)建TaskSchedulerTaskSchedulerBackend對(duì)象的具體過(guò)程
 
 - 二用戶代碼創(chuàng)建各種transformation與至少一個(gè)action這個(gè)action會(huì)通過(guò)SparkContextrunJob調(diào)用DAGSchedulerrunJob
- 1textFile
 - 2filter
 - 3count
 
 - 三步驟三DAGScheduler提交作業(yè)劃分stage并生成最終的TaskSet
- 1創(chuàng)建DAGScheduler對(duì)象的詳細(xì)實(shí)現(xiàn)
 - 2作業(yè)的提交
 - 3stage的劃分
 - 4任務(wù)的生成
 
 
 - 一用戶代碼中創(chuàng)建SparkContext對(duì)象SparkContext中創(chuàng)建DAGScheduler與TaskSchedulerTaskSchedulerBackend對(duì)象
 - 三TaskScheduler TaskSchedulerBackend
 
一、概述
通過(guò)spark-submit向集群提交應(yīng)用后,spark就開始了調(diào)度的過(guò)程,其調(diào)度模塊主要包括2部分: 
 * DAGScheduler:負(fù)責(zé)將用戶提交的計(jì)算任務(wù)分割成不同的stage。 
 * TaskScheduler & SchedulerBackend:負(fù)責(zé)將stage中的task提交到集群中。 
 先看一下2個(gè)核心的圖: 
  
 
(一)三個(gè)主要的類
與調(diào)度模塊相關(guān)的三個(gè)主要的類均位于org.apache.spark.scheduler,包括:
1、class DAGScheduler
負(fù)責(zé)分析用戶提交的應(yīng)用,并根據(jù)計(jì)算任務(wù)的依賴關(guān)系建立DAG,然后將DAG劃分成不同的Stage,其中每個(gè)stage由可以并發(fā)執(zhí)行的一組task構(gòu)成,這些task的邏輯完全相同,只是作用于不同的數(shù)據(jù)。 
 DAGScheduler在不同的資源管理框架下的實(shí)現(xiàn)是完全相同的。
2、trait TaskScheduler
TaskScheduler從DAGScheduler中接收不同的stage的任務(wù),并且向集群調(diào)度這些任務(wù)。 
 yarn-cluster的具體實(shí)現(xiàn)為:YarnClusterScheduler 
 yarn-client的具體實(shí)現(xiàn)為:YarnScheduler
3、trait SchedulerBackend
SchedulerBackend向當(dāng)前等待分配資源的task分配計(jì)算資源,并且在分配的executor中啟動(dòng)task。 
 yarn-cluster的具體實(shí)現(xiàn)為:YarnClusterSchedulerBackend。 
 yarn-client的具體實(shí)現(xiàn)為:YarnClientSchedulerBackend
(二)基本流程
 
 圖片來(lái)源于spark內(nèi)幕技術(shù)P45.
(三)TaskScheduler & SchedulerBackend
 
 圖片來(lái)源于spark內(nèi)幕技術(shù)P44. 
 TaskScheduler側(cè)重于調(diào)度,而SchedulerBackend是實(shí)際運(yùn)行。
每個(gè)SchedulerBackend都會(huì)對(duì)應(yīng)一個(gè)唯一的TaskScheduler。注意圖中的TaskScheduler & SchedulerBackend都會(huì)有針對(duì)于yarn的特定實(shí)現(xiàn)。
二、DAGScheduler
DAGScheduler在不同的資源管理框架下的實(shí)現(xiàn)是完全相同的。因?yàn)镈AGScheduler實(shí)現(xiàn)的功能是將DAG劃分為不同的stage,這是根據(jù)寬依賴進(jìn)行劃分的,每個(gè)寬依賴均會(huì)調(diào)用shuffle,以此作為一個(gè)新的stage。這與具體的資源管理框架無(wú)關(guān)。
每個(gè)stage由可以并發(fā)執(zhí)行的一組task構(gòu)成,這些task的執(zhí)行邏輯完全相同,只是作用于不同的數(shù)據(jù)。
DAGScheduler與TaskScheduler都是在SparkContext創(chuàng)建的時(shí)候創(chuàng)建的。其中TaskScheduler是通過(guò)SparkContext#createTaskScheduler創(chuàng)建的,而DAGScheduler是直接調(diào)用它的構(gòu)造函數(shù)創(chuàng)建的。只不過(guò),DAGScheduler保存了TaskScheduler的引用,因此需要先創(chuàng)建TaskScheduler。
================================= 
 步驟一:用戶代碼中創(chuàng)建SparkContext對(duì)象,SparkContext中創(chuàng)建DAGScheduler與TaskScheduler/TaskSchedulerBackend對(duì)象 
 步驟二:用戶代碼構(gòu)建各種tranformation及至少一個(gè)action,這個(gè)action會(huì)通過(guò)SparkContext#runJob調(diào)用DAGScheduler#runJob 
 步驟三:DAGScheduler提交作業(yè)到一隊(duì)列,handleJobSubmitted從這個(gè)隊(duì)列取出作業(yè),劃分stage,并開始生成最終的TaskSet,調(diào)用submitTasks()向TaskScheduler提交任務(wù)
(一)用戶代碼中創(chuàng)建SparkContext對(duì)象,SparkContext中創(chuàng)建DAGScheduler與TaskScheduler/TaskSchedulerBackend對(duì)象
1、用戶代碼中創(chuàng)建SparkContext對(duì)象
下面我們從一個(gè)簡(jiǎn)單的程進(jìn)行出發(fā),分析它的進(jìn)行過(guò)程。程序如下:
package com.lujinhong.sparkdemoimport org.apache.spark.SparkContext object GrepWord {def grepCountLog(sc:SparkContext,path: String, keyWord: String) {println("grep " + keyWord + " in " + path + ", the lineCount is: ")val all = sc.textFile(path)val ret = all.filter(line => line.contains(keyWord))println(ret.count)}def main(args: Array[String]) {val sc = new SparkContext();grepCountLog(sc,"/src/20151201", "\"keyword\": \"20302\"");} }上面的代碼很簡(jiǎn)單,指定一個(gè)目錄,搜索這個(gè)目錄中的文件有多少個(gè)keyword。分為三步:讀入文件,過(guò)濾關(guān)鍵字,count。 
 我們這里主要分析yarn-cluster/client的模式,根據(jù)前面的分析,我們向YARN提交應(yīng)用后,YARN會(huì)返回分配資源,然后啟動(dòng)AM。在AM中的driver會(huì)開始執(zhí)行用戶的代碼,開始進(jìn)行調(diào)度。詳細(xì)分析請(qǐng)見: 
 http://blog.csdn.net/lujinhong2/article/details/50344095
那我們這里就從用戶的代碼開始繼續(xù)往下分析。
用戶代碼中開始的時(shí)候必須首先創(chuàng)建一個(gè)SparkContext,我們看一下SparkContext的代碼,以及它被創(chuàng)建時(shí)執(zhí)行了哪些操作。
 
 先看一下官方的一個(gè)圖。DriverProgram就是用戶提交的程序,在用戶代碼中創(chuàng)建一個(gè)SparkContext的對(duì)象。SparkContext是所有Spark應(yīng)用的入口,它負(fù)責(zé)和整個(gè)集群的交互,包括創(chuàng)建RDD,累積器、廣播變量等。
每個(gè)JVM中只能有一個(gè)SparkContext,在創(chuàng)建一個(gè)新的Context前,你必須先stop()舊的。這個(gè)限制可能會(huì)在以后去掉,見SPARK-2243。
2、SparkContext源碼簡(jiǎn)單分析
SparkContext完成了以下幾個(gè)主要的功能: 
 (1)創(chuàng)建RDD,通過(guò)類似textFile等的方法。 
 (2)與資源管理器交互,通過(guò)runJob等方法啟動(dòng)應(yīng)用。 
 (3)創(chuàng)建DAGScheduler、TaskScheduler等。
3、SparkContext創(chuàng)建DAGScheduler與TaskScheduler/TaskSchedulerBackend對(duì)象
創(chuàng)建一個(gè)SparkContext,只需要一個(gè)SparkConf參數(shù),表示一些配置項(xiàng)。如果未指定參數(shù),則會(huì)創(chuàng)建一個(gè)默認(rèn)的SparkConf,如我們代碼中的:
val sc = new SparkContext();創(chuàng)建的代碼為:
def this() = this(new SparkConf())在SparkContext的一個(gè)try模塊中,會(huì)進(jìn)行一些初始化的工作,其中一部分是創(chuàng)建了DAGScheduler與TaskScheduler/TaskSchedulerBackend對(duì)象。
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this)// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor _taskScheduler.start()其中TaskScheduler/TaskSchedulerBackend對(duì)象通過(guò)createTaskScheduler()方法進(jìn)行創(chuàng)建,而DAGScheduler對(duì)象直接使用構(gòu)建函數(shù)創(chuàng)建。
4、createTaskScheduler:創(chuàng)建TaskScheduler/TaskSchedulerBackend對(duì)象的具體過(guò)程
SparkContext通過(guò)createTaskScheduler來(lái)同時(shí)創(chuàng)建TaskScheduler/TaskSchedulerBackend對(duì)象。它接收的參數(shù)mater是一個(gè)url,或者一個(gè)yarn-cluster等的字符串,指明了使用哪種運(yùn)行模式。
createTaskScheduler函數(shù)中主要就是match各種master,然后創(chuàng)建相應(yīng)的TaskScheduler/TaskSchedulerBackend對(duì)象。
我們先看一下yarn-cluster的:
case "yarn-standalone" | "yarn-cluster" =>if (master == "yarn-standalone") {logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")}val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {// TODO: Enumerate the exact reasons why it can fail// But irrespective of it, it means we cannot proceed !case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)TaskScheduler的實(shí)現(xiàn)類為:
org.apache.spark.scheduler.cluster.YarnClusterSchedulerTaskSchedulerBacked的實(shí)現(xiàn)類為:
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend其中后者需要前者來(lái)創(chuàng)建:
scheduler.initialize(backend)最后返回一個(gè)元組:
(backend, scheduler)再看看yarn-client的:
case "yarn-client" =>val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)TaskScheduler的實(shí)現(xiàn)類為:
org.apache.spark.scheduler.cluster.YarnSchedulerTaskSchedulerBacked的實(shí)現(xiàn)類為:
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend(二)用戶代碼創(chuàng)建各種transformation與至少一個(gè)action,這個(gè)action會(huì)通過(guò)SparkContext#runJob調(diào)用DAGScheduler#runJob
在用戶代碼中創(chuàng)建了一個(gè)SparkContext對(duì)象后,就可以開始創(chuàng)建RDD,轉(zhuǎn)換RDD等了。我們的代碼只有3行:
val all = sc.textFile(path) val ret = all.filter(line => line.contains(keyWord)) println(ret.count)1、textFile
我們先看一下如何創(chuàng)建一個(gè)RDD:
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString)}讀取一個(gè)hadoop支持的類型的文件,返回一個(gè)String類型的RDD。 
 它接收2個(gè)參數(shù),一個(gè)是文件路徑,一個(gè)是最小分區(qū)數(shù),默認(rèn)為:
注意,這里使用的是min,因此如果沒(méi)指定分區(qū)數(shù)量,最大的情況下就是2個(gè)分區(qū)了,詳細(xì)分析請(qǐng)見: https://github.com/mesos/spark/pull/718
最后是調(diào)用hadoopFile來(lái)創(chuàng)建RDD的:
def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: InputFormat[K, V]],keyClass: Class[K],valueClass: Class[V],minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {assertNotStopped()// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)}2、filter
僅返回符合條件的元素組成的RDD。
3、count
count的邏輯很簡(jiǎn)單:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sumsum用于統(tǒng)計(jì)一個(gè)集合中的元素?cái)?shù)量。 
 但它調(diào)用了SparkContext的runJob開始執(zhí)行任務(wù)了,我們分析一下這個(gè)過(guò)程。 
 SparkContext定義了多個(gè)runJob的形式,但它最后的調(diào)用為:
關(guān)鍵代碼就是一行:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)開始調(diào)用DAGScheduler的runJob了。
(三)步驟三:DAGScheduler提交作業(yè),劃分stage,并生成最終的TaskSet
經(jīng)過(guò)上面的分析,我們知道了DAGScheduler與TaskScheduler/TaskSchedulerBackend對(duì)象的對(duì)象是如何創(chuàng)建的,并分析到了由用戶代碼出發(fā),如果至調(diào)用dagScheduler.runJob。下面我們分析一下dagScheduler.runJob完成了什么功能。
1、創(chuàng)建DAGScheduler對(duì)象的詳細(xì)實(shí)現(xiàn)
上面介紹過(guò)在SparkContext中會(huì)通過(guò)DAGScheduler的構(gòu)建函數(shù)創(chuàng)建一個(gè)DAGScheduler對(duì)象,具體是如何實(shí)現(xiàn)的呢?
class DAGScheduler(private[scheduler] val sc: SparkContext,private[scheduler] val taskScheduler: TaskScheduler,listenerBus: LiveListenerBus,mapOutputTracker: MapOutputTrackerMaster,blockManagerMaster: BlockManagerMaster,env: SparkEnv,clock: Clock = new SystemClock())extends Logging {......}看一下DAGScheduler的主構(gòu)造函數(shù)。 
 * SparkContext:就是前面創(chuàng)建的對(duì)象。 
 * taskScheduler:DAGScheduler保存一個(gè)taskScheduler,當(dāng)最后處理完生成TaskSet時(shí),需要調(diào)用submitMissingTasks,而在這個(gè)方法中會(huì)調(diào)用taskScheduler.submitTasks(),就是將TaskSet交由taskScheduler進(jìn)行下一步的處理。 
 * mapOutputTracker:是運(yùn)行在Driver端管理shuffle的中間輸出位置信息的。 
 * blockManagerMaster:也是運(yùn)行在Driver端的,它是管理整個(gè)Job的Bolck信息。
2、作業(yè)的提交
首先注意區(qū)分2個(gè)概述: 
 job: 每個(gè)action都是執(zhí)行runJob方法,可以將之視為一個(gè)job。 
 stage:在這個(gè)job內(nèi)部,會(huì)根據(jù)寬依賴,劃分成多個(gè)stage。
前面說(shuō)過(guò),用戶代碼中存在一個(gè)action時(shí),它最終會(huì)調(diào)用SparkContext#runJob(),而SparkContext#runJob()的最后一步都是調(diào)用DAGScheduler#runJob()
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)而DAGScheduler#runJob()的核心代碼為:
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)即調(diào)用submitJob方法,我們進(jìn)一步看看submitJob()
def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = { .... val jobId = nextJobId.getAndIncrement() .....val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter}submitJob()方法主要完成了以下3個(gè)工作: 
 * 獲取一個(gè)新的jobId 
 * 生成一個(gè)JobWaiter,它會(huì)監(jiān)聽Job的執(zhí)行狀態(tài),而Job是由多個(gè)Task組成的,因此只有當(dāng)Job的所有Task均已完成,Job才會(huì)標(biāo)記成功 
 * 最后調(diào)用eventProcessLoop.post()將Job提交到一個(gè)隊(duì)列中,等待處理。這是一個(gè)典型的生產(chǎn)者消費(fèi)者模式。這些消息都是通過(guò)handleJobSubmitted來(lái)處理。
簡(jiǎn)單看一下handleJobSubmitted是如何被調(diào)用的。 
 首先是DAGSchedulerEventProcessLoop#onReceive調(diào)用doOnReceive:
DAGSchedulerEventProcessLoop是EventLoop的子類,它重寫了EventLoop的onReceive方法。以后再分析這個(gè)EventLoop。
然后,doOnReceive會(huì)調(diào)用handleJobSubmitted。
3、stage的劃分
剛才說(shuō)到handleJobSubmitted會(huì)從eventProcessLoop中取出Job來(lái)進(jìn)行處理,處理的第一步就是將Job劃分成不同的stage。handleJobSubmitted主要2個(gè)工作,一是進(jìn)行stage的劃分,這是這部分要介紹的內(nèi)容;二是創(chuàng)建一個(gè)activeJob,并生成一個(gè)任務(wù),這在下一小節(jié)介紹。
private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {...finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite).....activeJobs += job......submitStage(finalStage)}submitWaitingStages()}newResultStage()經(jīng)過(guò)多層調(diào)用后,最終會(huì)調(diào)用getParentStages()。 
 因?yàn)槭菑淖罱K的stage往回推算的,這需要計(jì)算最終stage所依賴的各個(gè)stage。
4、任務(wù)的生成
回到handleJobSubmitted中的代碼:
submitStage(finalStage)submitStage會(huì)提交finalStage,如果這個(gè)stage的某些parentStage未提交,則遞歸調(diào)用submitStage(),直至所有的stage均已計(jì)算完成。
submitStage()會(huì)調(diào)用submitMissingTasks():
submitMissingTasks(stage, jobId.get)而submitMissingTasks()會(huì)完成DAGScheduler最后的工作:它判斷出哪些Partition需要計(jì)算,為每個(gè)Partition生成Task,然后這些Task就會(huì)封閉到TaskSet,最后提交給TaskScheduler進(jìn)行處理。
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())三、TaskScheduler && TaskSchedulerBackend
上文分析到在DAGScheduler中最終會(huì)執(zhí)行taskScheduler.submitTasks()方法,我們先簡(jiǎn)單看一下從這里開始往下的執(zhí)行邏輯:
(1)taskScheduler#submitTasks() 
 (2) schedulableBuilder#addTaskSetManager() 
 (3)CoarseGrainedSchedulerBackend#reviveOffers() 
 (4)CoarseGrainedSchedulerBackend#makeOffers() 
 (5)TaskSchedulerImpl#resourceOffers 
 (6)CoarseGrainedSchedulerBackend#launchTasks 
 (7)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
步驟一、二中主要將這組任務(wù)的TaskSet加入到一個(gè)TaskSetManager中。TaskSetManager會(huì)根據(jù)數(shù)據(jù)就近原則為task分配計(jì)算資源,監(jiān)控task的執(zhí)行狀態(tài)等,比如失敗重試,推測(cè)執(zhí)行等。 
 步驟三、四邏輯較為簡(jiǎn)單。 
 步驟五為每個(gè)task具體分配資源,它的輸入是一個(gè)Executor的列表,輸出是TaskDescription的二維數(shù)組。TaskDescription包含了TaskID, Executor ID和task執(zhí)行的依賴信息等。 
 步驟六、七就是將任務(wù)真正的發(fā)送到executor中執(zhí)行了,并等待executor的狀態(tài)返回。
總結(jié)
以上是生活随笔為你收集整理的spark 调度模块详解及源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
                            
                        - 上一篇: 遍历HashMap的最佳方法
 - 下一篇: 朴素贝叶斯原理及实现