深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析
概述
前幾篇博文都在介紹Spark的調度,這篇博文我們從更加宏觀的調度看Spark,講講Spark的部署模式。Spark部署模式分以下幾種:
- local 模式
- local-cluster 模式
- Standalone 模式
- YARN 模式
- Mesos 模式
我們先來簡單介紹下YARN模式,然后深入講解Standalone模式。
YARN 模式介紹
YARN介紹
YARN是一個資源管理、任務調度的框架,主要包含三大模塊:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。
其中,ResourceManager負責所有資源的監控、分配和管理;ApplicationMaster負責每一個具體應用程序的調度和協調;NodeManager負責每一個節點的維護。
對于所有的applications,RM擁有絕對的控制權和對資源的分配權。而每個AM則會和RM協商資源,同時和NodeManager通信來執行和監控task。幾個模塊之間的關系如圖所示。
Yarn Cluster 模式
Spark的Yarn Cluster 模式流程如下:
- 本地用YARN Client 提交App 到 Yarn Resource Manager
- Yarn Resource Manager 選個 YARN Node Manager,用它來
- 創建個ApplicationMaster,SparkContext相當于是這個ApplicationMaster管的APP,生成YarnClusterScheduler與YarnClusterSchedulerBackend
- 選擇集群中的容器啟動CoarseCrainedExecutorBackend,用來啟動spark.executor。
- ApplicationMaster與CoarseCrainedExecutorBackend會有遠程調用。
Yarn Client 模式
Spark的Yarn Client 模式流程如下:
- 本地啟動SparkContext,生成YarnClientClusterScheduler 和 YarnClientClusterSchedulerBackend
- YarnClientClusterSchedulerBackend啟動yarn.Client,用它提交App 到 Yarn Resource Manager
- Yarn Resource Manager 選個 YARN Node Manager,用它來選擇集群中的容器啟動CoarseCrainedExecutorBackend,用來啟動spark.executor
- YarnClientClusterSchedulerBackend與CoarseCrainedExecutorBackend會有遠程調用。
Standalone 模式介紹
Standalone 啟動集群
啟動Master
master.Master
我們先來看下Master對象的main函數做了什么:
private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創建SparkConfval conf = new SparkConf//解析SparkConf參數val args = new MasterArguments(argStrings, conf)val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//創建Masterval masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)//返回 Master RpcEnv,//web UI 端口,//其他服務的端口(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)} }- 1
master.MasterArguments
接下來我們看看master是如何解析參數的:
private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {//默認配置var host = Utils.localHostName()var port = 7077var webUiPort = 8080//Spark屬性文件 //默認為 spark-default.confvar propertiesFile: String = null// 檢查環境變量if (System.getenv("SPARK_MASTER_IP") != null) {logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")host = System.getenv("SPARK_MASTER_IP")}if (System.getenv("SPARK_MASTER_HOST") != null) {host = System.getenv("SPARK_MASTER_HOST")}if (System.getenv("SPARK_MASTER_PORT") != null) {port = System.getenv("SPARK_MASTER_PORT").toInt}if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt}parse(args.toList)// 轉變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)//環境變量的SPARK_MASTER_WEBUI_PORT//會被Spark屬性spark.master.ui.port所覆蓋if (conf.contains("spark.master.ui.port")) {webUiPort = conf.get("spark.master.ui.port").toInt}//解析命令行參數//命令行參數會把環境變量和Spark屬性都覆蓋@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case Nil => case _ =>printUsageAndExit(1)}private def printUsageAndExit(exitCode: Int) {System.err.println("Usage: Master [options]\n" +"\n" +"Options:\n" +" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +" -h HOST, --host HOST Hostname to listen on\n" +" -p PORT, --port PORT Port to listen on (default: 7077)\n" +" --webui-port PORT Port for web UI (default: 8080)\n" +" --properties-file FILE Path to a custom Spark properties file.\n" +" Default is conf/spark-defaults.conf.")System.exit(exitCode)} }- 1
- 37
我們可以看到上述參數設置的優先級別為:
系統環境變量<spark?default.conf中的屬性<命令行參數<應用級代碼中的參數設置
啟動Worker
worker.Worker
我們先來看下Worker對象的main函數做了什么:
private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]) {Utils.initDaemon(log)//創建SparkConfval conf = new SparkConf//解析SparkConf參數val args = new WorkerArguments(argStrings, conf)val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,cores: Int,memory: Int,masterUrls: Array[String],workDir: String,workerNumber: Option[Int] = None,conf: SparkConf = new SparkConf): RpcEnv = {val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))//創建WorkerrpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))rpcEnv}***worker.WorkerArguments
worker.WorkerArguments與master.MasterArguments類似:
private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {var host = Utils.localHostName()var port = 0var webUiPort = 8081var cores = inferDefaultCores()var memory = inferDefaultMemory()var masters: Array[String] = nullvar workDir: String = nullvar propertiesFile: String = null// 檢查環境變量if (System.getenv("SPARK_WORKER_PORT") != null) {port = System.getenv("SPARK_WORKER_PORT").toInt}if (System.getenv("SPARK_WORKER_CORES") != null) {cores = System.getenv("SPARK_WORKER_CORES").toInt}if (conf.getenv("SPARK_WORKER_MEMORY") != null) {memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))}if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt}if (System.getenv("SPARK_WORKER_DIR") != null) {workDir = System.getenv("SPARK_WORKER_DIR")}parse(args.toList)// 轉變SparkConfpropertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)if (conf.contains("spark.worker.ui.port")) {webUiPort = conf.get("spark.worker.ui.port").toInt}checkWorkerMemory()@tailrecprivate def parse(args: List[String]): Unit = args match {case ("--ip" | "-i") :: value :: tail =>Utils.checkHost(value, "ip no longer supported, please use hostname " + value)host = valueparse(tail)case ("--host" | "-h") :: value :: tail =>Utils.checkHost(value, "Please use hostname " + value)host = valueparse(tail)case ("--port" | "-p") :: IntParam(value) :: tail =>port = valueparse(tail)case ("--cores" | "-c") :: IntParam(value) :: tail =>cores = valueparse(tail)case ("--memory" | "-m") :: MemoryParam(value) :: tail =>memory = valueparse(tail)//工作目錄case ("--work-dir" | "-d") :: value :: tail =>workDir = valueparse(tail)case "--webui-port" :: IntParam(value) :: tail =>webUiPort = valueparse(tail)case ("--properties-file") :: value :: tail =>propertiesFile = valueparse(tail)case ("--help") :: tail =>printUsageAndExit(0)case value :: tail =>if (masters != null) { // Two positional arguments were givenprintUsageAndExit(1)}masters = Utils.parseStandaloneMasterUrls(value)parse(tail)case Nil =>if (masters == null) { // No positional argument was givenprintUsageAndExit(1)}case _ =>printUsageAndExit(1)}***資源回收
我們在概述中提到了“ app運行完成后,SparkContext會進行資源回收,銷毀Worker的CoarseGrainedExecutorBackend進程,然后注銷自己。”接下來我們就來講解下Master和Executor是如何感知到Application的退出的。
調用棧如下:
- SparkContext.stop
- DAGScheduler.stop
- TaskSchedulerImpl.stop
- CoarseGrainedSchedulerBackend.stop
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedExecutorBackend.receive
- Executor.stop
- CoarseGrainedExecutorBackend.receive
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.stop
- TaskSchedulerImpl.stop
- DAGScheduler.stop
SparkContext.stop
SparkContext.stop會調用DAGScheduler.stop
***if (_dagScheduler != null) {Utils.tryLogNonFatalError {_dagScheduler.stop()}_dagScheduler = null}***DAGScheduler.stop
DAGScheduler.stop會調用TaskSchedulerImpl.stop
def stop() {//停止消息調度messageScheduler.shutdownNow()//停止事件處理循環eventProcessLoop.stop()//調用TaskSchedulerImpl.stoptaskScheduler.stop()}TaskSchedulerImpl.stop
TaskSchedulerImpl.stop會調用CoarseGrainedSchedulerBackend.stop
override def stop() {//停止推斷speculationScheduler.shutdown()//調用CoarseGrainedSchedulerBackend.stopif (backend != null) {backend.stop()}//停止結果獲取if (taskResultGetter != null) {taskResultGetter.stop()}starvationTimer.cancel()}CoarseGrainedSchedulerBackend.stop
override def stop() {//調用stopExecutors()stopExecutors()try {if (driverEndpoint != null) {//發送StopDriver信號driverEndpoint.askWithRetry[Boolean](StopDriver)}} catch {case e: Exception =>throw new SparkException("Error stopping standalone scheduler's driver endpoint", e)}}CoarseGrainedSchedulerBackend.stopExecutors
我們先來看下CoarseGrainedSchedulerBackend.stopExecutors
def stopExecutors() {try {if (driverEndpoint != null) {logInfo("Shutting down all executors")//發送StopExecutors信號driverEndpoint.askWithRetry[Boolean](StopExecutors)}} catch {case e: Exception =>throw new SparkException("Error asking standalone scheduler to shut down executors", e)}}- 1
- 12
CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
DriverEndpoint接收并回應該信號:
case StopExecutors =>logInfo("Asking each executor to shut down")for ((_, executorData) <- executorDataMap) {//給CoarseGrainedExecutorBackend發送StopExecutor信號executorData.executorEndpoint.send(StopExecutor)}context.reply(true)- 1
CoarseGrainedExecutorBackend.receive
CoarseGrainedExecutorBackend接收該信號:
case StopExecutor =>stopping.set(true)logInfo("Driver commanded a shutdown")//這里并沒有直接關閉Executor,//因為Executor必須先返回確認幀給CoarseGrainedSchedulerBackend//所以,這的策略是給自己再發一個Shutdown信號,然后處理self.send(Shutdown)case Shutdown =>stopping.set(true)new Thread("CoarseGrainedExecutorBackend-stop-executor") {override def run(): Unit = {// executor.stop() 會調用 `SparkEnv.stop()` // 直到 RpcEnv 徹底結束 // 但是, 如果 `executor.stop()` 運行在和RpcEnv相同的線程里面, // RpcEnv 會等到`executor.stop()`結束后才能結束,// 這就產生了死鎖// 因此,我們需要新建一個線程executor.stop()}- 1
Executor.stop
def stop(): Unit = {env.metricsSystem.report()//關閉心跳heartbeater.shutdown()heartbeater.awaitTermination(10, TimeUnit.SECONDS)//關閉線程池threadPool.shutdown()if (!isLocal) {//停止SparkEnvenv.stop()}}CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
我們回過頭來看CoarseGrainedSchedulerBackend.stop,調用stopExecutors()結束后,會給 driverEndpoint發送StopDriver信號。CoarseGrainedSchedulerBackend.DriverEndpoint.接收信號并回復:
case StopDriver =>context.reply(true)//停止driverEndpointstop()總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (五):Standalone模式运行的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (