Spark资源调度分配
1、任務調度與資源調度
任務調度:是指通過DAGScheduler,TaskScheduler,SchedulerBackend等進行的作業調度。
資源調度:是指應用程序獲取資源。
任務調度是在資源調度的基礎上,沒有資源調度,那么任務調度就沒有任何意義了。
2、分配Driver(只對cluster模式有效)
Spark的Driver的運行有2種模式,一種是Client模式(Driver程序運行在客戶端,適用于交互、調試,希望立即看到app的輸出),一種是cluster模式(真正的Driver就會在worker中的一臺機器上,在哪臺有Master決定)。
schedule的調用時機:每次有新的應用程序提交或者集群資源發生改變的時候(包括Executor的增加或減少,Worker的增加或減少)。
資源調度肯定是在master里面,因為master負責資源調度,每次資源變動或者注冊程序,或新任務提交等一大堆的東西都會產生schedule()的調用,如RegisterApplication里面最后一行就是schedule()
進入schedule方法里面,為我們當前等待的應用程序調度當前可用的資源(每當一個新的應用程度進來的時候這個方法都會被調用)。或者說資源的可用狀況改變,例如說那個executor掛掉了,或者worker掛掉了,或者新增加了機器,
(1)首先是判斷master的狀態,如果不是ALIVE的狀態,這個資源調度就無從談起,直接返回,也就是說Standby 的master不會進行application資源的調用。所以master必須是ALIVE的方式采用進行資源的調度。
(2)Random.shuffle()是將worker打亂打亂有利于負載均衡,workers是HashSet的數據結構里面是WorkInfo,WorkInfo里面的內容是注冊的時候把這些信息注冊進來的。shuffle方法中,打亂之前要判斷一下要工作就必須要讓worker的state是ALIVE的,所以就判斷所有Worker中哪些是ALIVE級別的,ALIVE才能參與資源的分配工作
(3)在shuffle方法中,他的隨機打亂是首先構建一個ArrayBuffer,把所有的worker都放進去,這個函數內部有定義了swap函數(這個函數是將兩個索引上的位置進行交換)。然后從ArrayBuffer中最后一個元素開始一直到2就是第3個索引,每次都減1.nextInt(n)是取出0到n-1里面的一個整數。然后將取出的索引與n-1索引位置的替換,這樣的話順序就特別的亂。
(4)然后將這個shuffledAlivedWorkers的長度賦值到numWorkerAlive,并定義變量curPos
(5)遍歷waitingDrivers隊列,也就是等待被調度的Driver隊列,使其的launched值為false,定義numWorkerVisited的值為0。waitingDrivers就是一個ArrayBuffer里面存放DriverInfo,DriverInfo里面的DriverDescription里面有supervise是因為如果是cluster模式,submit的時候有指定supervise在driver掛掉的時候會自動重啟。
注意:循環遍歷等待啟動的driver(cluster模式才有driver等待啟動,如果是client模式是不需要等待啟動driver的因為你提交driver就啟動了)。
(6)進行循環判斷,條件為狀態為Alive的worker數量大于0并且是沒有被調度的launched=fasle,那么將這些worker進行shuffle,被調用的worker的數量(numWorkerVisited)的值加1,進一步判斷,如果worker的剩余內存大于等于driver的進行調度所需的內存(driver.desc.mem)并且worker的剩余CPU數量大于等于driver調度所需的CPU數量(driver.desc.cores),(DriverInfo里面的DriverDescription中有當前Driver啟動是需要的內存和cpu等要求的內容)。調用launchDriver方法,將這個Driver從driver的等待隊列移除,并設置launched的值為true。curPos= (curPos + 1) % numWorkersAlive是將指針指向下一個worker。
(7)符合要求之后launchDriver,launch到循環時候的一個worker中去了,而這個worker是Shuffle之后的for循環隨機產生的一個worker,因為每次都調用Shuffle所以順序不一樣,所以我們的driver放到這個worker上,這就保證負載均衡
launchDriver中首先先打印一個log,然后就是worker.addDrievr(driver)這個worker是當前master中對這個集群元數據的一個描述,需要保存元數據現在還是在master上。然后driver.worker = Some(worker),driver說明自己在哪個worker上,相互引用。然后就是關鍵點了worker.endpoint.send(LaunchDriver(driver.id, driver.desc)),這個就是發遠程消息給worker,消息通信遠程rpc。所以master發指令給worker,讓遠程的worker具體啟動executor。啟動之后driver的state就變成running了。
注意:所有schedule的時候首先就是進行所有driver級別的launch,這說明一件事情要現有driver才有其他的。
3、為Application分配資源
(1)在schedule中繼續,最后一行是startExecutorsOnWorkers,為Application分配資源并啟動Executors。
調度和啟動Executor在Worker上為我們具體的當前程序。默認使用FIFO的方式,就是先滿足第一個應用程序,再滿足第二個應用程序...。Spark默認為應用程序啟動Executor的方式是FIFO的方式,也就是所有提交的應用程序都是放在調度的等待隊列中的,先進先出,只有滿足了前面應用程序的資源分配的基礎上才能滿足下一個應用程序資源的分配。
我們現在是Master類中,Master直接調這個方法,但是具體在哪些Executor上啟動executor還不知道。for循環waitingApps,要求app.coresLeft>0
equestedCores:總共需要的核數(--total-executor-cores指定,默認Int的最大值)
coresGranted:已經分配的值
假設整個程序要求1000個core,但是現在可用的只有100個,不能立即滿足,所以可能在等待隊列中,我們要看一下coresLeft(還需要的cores),如果>0就就還需要調度了,如果不大于0也就是不需要core就不會為應用程序分配executor
進入到for循環中這個是應用程序在提交的時候會配置很多參數,這里說每個executor需要的core有多少。過濾出worker是ALIVE的狀態(才能分配executor),然后worker的內存要大于配置內存,然后進行排序,誰的core多就排在前面。排序后就產生可用的useableWorkers。
現在不應該考慮數據本地性,因為不是資源分配的內容,這個不是job調度,還沒到計算,不考慮數據本地性。
(2)進入到scheduleExecutorsOnWorkers方法中
這個注釋的意思:具體調度在這個worker上啟動executor;
返回的是一個數組包含到每個worker上的賦值的具體的cores;
有2種啟動worker的方式,第一個方式是spread out嘗試把一個應用程序運行在盡可能多的worker上(我們把executor運行在盡可能多的node上是更符合數據本地性的表現,做基礎建設的時候是這么考慮的,因為數據有可能在所有的worker上,這其實也有風險也可能有500臺機器,數據存在200臺機器上,但是這個是默認的情況,默認層面上從資源調度的層面上考慮最大化的數據本地性,調度層面上必須決定這個代碼具體運行在哪幾臺機器上 。這個數據本地性只是順便帶來的,因為這樣更好的響應了并發處理能力,不是考慮數據的本地性,所以是潛在的數據本地性);
第二種方式是把我們當前應用程序運行在盡可能少的worker上。在為每一個executor分配多少個core是可配置的,可以在submit的時候設置。在一個worker下面可能會有多個executor,前提是worker上有足夠的cores和memory的時候。否則的話默認情況下分配一個executor把當前worker上所有的cores都拿走了。
一次在我們的executor上分配多個core是非常重要的,我們的集群現在4個worker,每個worker有16個cores。我們的用戶要求3個executor,配置的時候最大可以48個,每個executor16個。
大致意思是說有兩種分配模型,第一種是將executors分配到盡可能多的workers上;第二種與第一種相反。默認使用的是第一種模型,這種模型更加符合數據的本地性原則,為每個Executor分配的cores的個數是可以進行配置的(spark-submit 或者 spark-env.sh),如果設置了,多個executors可能會被分配在一個worker上(前提是該worker擁有足夠的cores和memory),否則每個executor會充分利用worker上的cores,這種情況下一個executor會被分配在一個worker上。具體在集群上分配cores的時候會盡可能的滿足我們的要求,如果需要的cores的個數大于workers中空閑的cores的個數,那么就先分配空閑的cores,盡可能的去滿足要求。
spreadOutApps是讓我們的應用程序盡可能多的運行在所有的node上
下面具體進入到scheduleExecutorsOnWorkers代碼中
(3)回到startExecutorsOnWorkers方法中現在還沒有真正的發生調度,在這里獲得元數據信息之后,前面決定了在哪些機器上分配多少個Executor,每個Executor上分配多少個cores,下面就開始循環找出可用的workers
這里就具體直接分配了,就是先決定后分配
分配肯定是遠程通信,循環遍歷要分配個數的executor,把要分配的executor元數據信息交給addExecutor沒后發送給worker
4、總結
(1)Master在接收搭配RegisterApplication之后創建一個applicationInfo對象 ,然后registerApplication方法中將application加入到隊列,持久化application ,發送RegisteredApplication消息到ClientEndPoint,ClientPoint接收到消息后設置registered=true ,最后schedule()進行資源分配;
(2)在schedule()中,先打亂可以使用的workers,主要看在這個worker上內存和cpu的cores滿不滿足啟動一個driver,滿足的話則在這個worker上啟動一個driver,直到找到一個合適的worker,然后launchDriver啟動Driver,然后調用startExecutorsOnWorkers;
(3)在startExecutorsOnWorkers 中,首先篩選出可以使用的workers,主要看內存和cpu ,然后調用shceduleExecutorsOnWorkers得到要分配的信息 ,shceduleExecutorsOnWorkers方法中主要是①獲取當前的app還有多少cpuCore沒有被分配 ②篩選出可以用來啟動executor的workers saber在篩選出來的worker上面進行executor的分配的信息的記載在assignedCores中 并返回;
(4)根據上面得到的要分配的信息,調用allocateWorkerResourceToExecutors()在每一個worker上面分配資源啟動executor 。在allocateWorkerResourceToExecutors方法中主要是①計算在當前的worker上啟動幾個executor ②計算在當前的worker上一個executor會分配幾個cpuCores ③調用launchExecutor進行啟動executor
總結
以上是生活随笔為你收集整理的Spark资源调度分配的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 签到 数据库php,php与数据库的连接
- 下一篇: Hive高级操作