spark提交应用的全流程分析
spark提交應用的全流程分析
@(SPARK)[spark]
本文分析一下spark的應用通過spark-submit后,如何提交到集群中并開始運行。
先介紹一下spark從提交到運行的全流程,下面再詳細分析。
- 1、用戶通過spark-submit腳本提交應用。
 - 2、spark-submit根據用戶代碼及配置確定使用哪個資源管理器,以及在合適的位置啟動driver。
 - 3、driver與集群管理器(如YARN)通信,申請資源以啟動executor。
 - 4、集群管理器啟動executor。
 - 5、driver進程執行用戶的代碼,根據程序中定義的transformation和action,進行stage的劃分,然后以task的形式發送到executor。(通過DAGScheduler劃分stage,通過TaskScheduler和TaskSchedulerBackend來真正申請資源運行task)
 - 6、task在executor中進行計算并保存結果。
 - 7、如果driver中的main()方法執行完成退出,或者調用了SparkContext#stop(),driver會終止executor進程,并且通過集群管理器釋放資源。
 
一、提交前準備
(一)腳本調用
1、spark-submit
spark通過spark-submit腳本來向集群提交應用,舉個例子:
/home/hadoop/spark/bin/spark-submit --master yarn-client --num-executors 10 --class com.lujinhong.spark.ml.TrainModel myusml-0.0.1-SNAPSHOT.jar args1 args2 args3我們看看spark-submit腳本,很簡單,只有3行:
SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"# disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"其實就是調用spark-class這個腳本。
2、spark-class
spark-class完成了配置的加載:
. "$SPARK_HOME"/bin/load-spark-env.sh以及調用上面說的SparkSubmit類
3、load-spark-env
有興趣的可以看看如何加載配置,主要是spark-evn.sh文件,以及scala的版本等。
(二)SparkSubmit
1、main函數
很簡單,appArgs解釋命令行中的參數,然后判斷action是什么,并執行相應的操作。
def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {// scalastyle:off printlnprintStream.println(appArgs)// scalastyle:on println}appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}verbose是一個布爾值,用于確定是否打印一些JVM的信息,默認為false。 
 action的默認值是submit,我們這里也只分析submit的過程,因此下面將進入submit函數看目的地。
2、submit(args: SparkSubmitArguments): Unit
submit函數先是將參數轉化為一個4元組的形式:
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)然后就使用這些參數調用runMain函數了:
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)其它內容分別考慮了使用proxy以及standalone的情形。下一步:runMain函數。
3、runMain函數
runMain函數開始執行Client類中的main函數了。
首先是一大堆的環境變量及參數的加載,判斷類是否存在等,最后的目的是執行Client類中的main函數。
找到主類:
mainClass = Utils.classForName(childMainClass)然后是主函數:
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)最后調用main方法:
mainMethod.invoke(null, childArgs.toArray)那mainClass是哪個類呢?對于yarn-cluster來說,是:
if (isYarnCluster) childMainClass = "org.apache.spark.deploy.yarn.Client"如果是yarn-client,是:
// In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath if (deployMode == CLIENT) childMainClass = args.mainClass即,client類就是用戶定義的主類,直接開始運行主類即可。
二、提交應用
(一)yarn-cluster方式
我們先看一下yarn-cluster方式,由上面的分析可知,yarn-cluster使用的是org.apache.spark.deploy.yarn.Client這個類進行任務提交,先看一下流程圖: 
  
 圖片來自于spark技術內幕P84,下同。 
 先說一下總體的流程步驟: 
 ====================================== 
 步驟一:Client類提交應用到YARN ResourceManager,向RM申請資源作為AM 
 步驟二:在申請到的機器中啟動driver,注冊成為AM,并調用用戶代碼,然后創建SparkContext。(driver是一個邏輯概念,并不實際存在,通過抽象出driver這一層,所有的運行模式都可以說是在driver中調用用戶代碼了) 
 步驟三:SparkContext中創建DAGScheduler與YarnClusterScheduler與YarnClusterSchedulerBackend。當在用戶代碼中遇到action時,即會調用DAGScheduler的runJob,任務開始調度執行。 
 步驟四:YarnClusterSchedulerBackend在NodeManager上啟動Executor 
 步驟五:Executor啟動Task,開始執行任務 
 ====================================== 
 簡單的說就是: 
 向RM申請資源建立driver——->在driver中執行用戶代碼,并創建AM——->遇到action時調用runJob——->開始調度、執行的過程了 
 因此3個比較復雜的流程分別為: 
 * 1、如何向YARN中申請資源,這涉及YARN的源碼 
 * 2、如何調度,涉及DAGScheduler、YarnClusterScheduler與YarnClusterSchedulerBackend 
 * 3、如何執行任務,涉及Executor與Task。這3個部分會有專門的章節來討論,我們這里先把整個流程理順。
下面按按被調用的類來詳細分析一下:
1、Client
Client類作為向YARN提交應用的客戶端
步驟一:Client類提交應用到YARN ResourceManager,向RM申請資源作為 AM
(1)main函數 
 我們從main函數開始入手:
將不關鍵代碼去掉后,就剩下一行,它調用run方法,繼續看run方法
(2)run方法 
 好吧,它的主要內容也只是一行:
它調用了submitApplication方法。
(3)submitApplication方法
def submitApplication(): ApplicationId = {var appId: ApplicationId = nulltry {// Setup the credentials before doing anything else,// so we have don't have issues at any point.setupCredentials()yarnClient.init(yarnConf)yarnClient.start()// Get a new application from our RMval newApp = yarnClient.createApplication()val newAppResponse = newApp.getNewApplicationResponse()appId = newAppResponse.getApplicationId()// Verify whether the cluster has enough resources for our AMverifyClusterResources(newAppResponse)// Set up the appropriate contexts to launch our AMval containerContext = createContainerLaunchContext(newAppResponse)val appContext = createApplicationSubmissionContext(newApp, containerContext)// Finally, submit and monitor the applicationyarnClient.submitApplication(appContext)appId}在submitApplication方法中,先對yarnClient進行了初始化,并從RM中申請到一個application,設置合適的AM(見下一點),最后就向RM提交應用了,并返回應用的ID。
(4)createContainerLaunchContext方法 
 上面在啟動一個應用前,調用了createContainerLaunchContext方法,用于指定的appContext使用哪個AM:
上面代碼中指定了當yarn-cluster模式和yarn-client時,分別使用哪個類作為AM。
當向RM提交應用后,RM就會開始啟動AM。YARN中啟動AM的源碼分析以后再補充。
步驟二:在申請到的機器中啟動driver,注冊成為AM,并調用用戶代碼,然后創建SparkContext。
2、ApplicationMaster
(1)main函數 
 當RM啟動AM后,AM就開始執行main函數了
關鍵是調用了run方法,我們繼續看run方法。
(2)run方法 
 先是設置了一些參數,并加載yarn的配置文件。然后設置了一些鉤子 
 最后關鍵是執行了這2個方法:
分別對應yarn-cluster模式和yarn-client模式。
(3)runDriver方法 
 定義了如何啟動driver,這也是yarn-cluster和yarn-client最大的區別,前者在yarn分配一臺機器啟動driver,并注冊成為AM,而后者在本地上啟動driver,再注冊成為AM。
startUserApplication主要執行了調用用戶的代碼,以及創建了一個spark driver的進程。 
 Start the user class, which contains the spark driver, in a separate Thread.
registerAM向RM中正式注冊AM。有了AM以后,用戶代碼就可以執行了,開始將任務切分、調度、執行。我們繼續往下看。
然后,用戶代碼中的action會調用SparkContext的runJob,SparkContext中有很多個runJob,但最后都是調用DAGScheduler的runJob
步驟三:SparkContext中創建DAGScheduler與YarnClusterScheduler與YarnClusterSchedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched_taskScheduler = ts _dagScheduler = new DAGScheduler(this)然后調用DAGScheduler的runJob: 
 * Run a function on a given set of partitions in an RDD and pass the results to the given handler function. This is the main entry point for all actions in Spark.*
至此,應用就正式提交到集群準備運行了。
然后就開始DAGScheduler調用YarnClusterScheduler,YarnClusterScheduler調用YarnClusterSchedulerBackend,Executor啟動Task開始執行任務的具體流程了。* 這些內容在之后的專題中詳細分析。*
步驟四:YarnClusterSchedulerBackend在NodeManager上啟動Executor
步驟五:Executor啟動Task
(二)yarn-cluster方式
yarn-client的流程與yarn-cluster類似,主要區別在于它在本地運行driver,而cluster是在AM上運行driver。 
 先看一下流程圖: 
 
1、區別一:主類入口不同
如果是yarn-client,是:
// In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath if (deployMode == CLIENT) childMainClass = args.mainClass即,client類就是用戶定義的主類,直接開始運行主類即可。 
 cluster是在專門的Client類中開始執行的,而yarn-client是在用戶代碼中開始執行的。
2、啟動driver的方式不一樣
client模式將在本機啟動進程,并注冊成為AM。
if (isClusterMode) {runDriver(securityMgr)} else {runExecutorLauncher(securityMgr)}private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {val port = sparkConf.getInt("spark.yarn.am.port", 0)rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)val driverRef = waitForSparkDriver()addAmIpFilter()registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)// In client mode the actor will stop the reporter thread.reporterThread.join()}總結
以上是生活随笔為你收集整理的spark提交应用的全流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 朴素贝叶斯原理及实现
 - 下一篇: spark RDD详解及源码分析