深入理解spark两种调度模式:FIFO,FAIR模式
深入理解spark-兩種調(diào)度模式FIFO,FAIR模式
前面我們應(yīng)知道了一個(gè)任務(wù)提交會(huì)由DAG拆分為job,stage,task,最后提交給TaskScheduler,在提交taskscheduler中會(huì)根據(jù)master初始化taskscheduler和schedulerbackend兩個(gè)類,并且初始化一個(gè)調(diào)度池;
1.調(diào)度池比較
根據(jù)mode初始化調(diào)度池pool
def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to empty 這里可以看到調(diào)度池初始化最小設(shè)置為0rootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)}}schedulableBuilder.buildPools()}FIFO模式
這個(gè)會(huì)根據(jù)spark.scheduler.mode 來設(shè)置FIFO or FAIR,默認(rèn)的是FIFO模式;
FIFO模式什么都不做,實(shí)現(xiàn)默認(rèn)的schedulerableBUilder方法,建立的調(diào)度池也為空,addTasksetmaneger也是調(diào)用默認(rèn)的;
可以簡(jiǎn)單的理解為,默認(rèn)模式FIFO什么也不做。。
FAIR模式
fair模式則重寫了buildpools的方法,讀取默認(rèn)路徑 $SPARK_HOME/conf/fairscheduler.xml文件,也可以通過參數(shù)spark.scheduler.allocation.file設(shè)置用戶自定義配置文件。
文件中配置的是
poolname 線程池名
schedulermode 調(diào)度模式(FIFO,FAIR僅有兩種)
minshare 初始大小的線程核數(shù)
wight 調(diào)度池的權(quán)重
override def buildPools() {var is: Option[InputStream] = Nonetry {is = Option {schedulerAllocFile.map { f =>new FileInputStream(f)}.getOrElse {Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)}}is.foreach { i => buildFairSchedulerPool(i) }} finally {is.foreach(_.close())}// finally create "default" poolbuildDefaultPool()}同時(shí)也重寫了addtaskmanager方法
override def addTaskSetManager(manager: Schedulable, properties: Properties) {var poolName = DEFAULT_POOL_NAMEvar parentPool = rootPool.getSchedulableByName(poolName)if (properties != null) {poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)parentPool = rootPool.getSchedulableByName(poolName)if (parentPool == null) {// we will create a new pool that user has configured in app// instead of being defined in xml fileparentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)rootPool.addSchedulable(parentPool)logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))}}parentPool.addSchedulable(manager)logInfo("Added task set " + manager.name + " tasks to pool " + poolName)}這一段邏輯中是把配置文件中的pool,或者default pool放入rootPool中,然后把TaskSetManager存入rootPool對(duì)應(yīng)的子pool;
2.調(diào)度算法比較
除了初始化的調(diào)度池不一致外,其實(shí)現(xiàn)的調(diào)度算法也不一致
實(shí)現(xiàn)的調(diào)度池Pool,在內(nèi)部實(shí)現(xiàn)方法中也會(huì)根據(jù)mode不一致來實(shí)現(xiàn)調(diào)度的不同
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()}}FIFO模式
FIFO模式的調(diào)度方式很容易理解,比較stageID,誰小誰先執(zhí)行;
這也很好理解,stageID小的任務(wù)一般來說是遞歸的最底層,是最先提交給調(diào)度池的;
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}} }FAIR模式
fair模式來說的話,稍微復(fù)雜一點(diǎn);
但是還是比較容易看懂,
1.先比較兩個(gè)stage的 runningtask使用的核數(shù),其實(shí)也可以理解為task的數(shù)量,誰小誰的優(yōu)先級(jí)高;
2.比較兩個(gè)stage的 runningtask 權(quán)重,誰的權(quán)重大誰先執(zhí)行;
3.如果前面都一直,則比較名字了(字符串比較),誰大誰先執(zhí)行;
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDoubleval minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDoubleval taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDoublevar compare: Int = 0if (s1Needy && !s2Needy) {return true} else if (!s1Needy && s2Needy) {return false} else if (s1Needy && s2Needy) {compare = minShareRatio1.compareTo(minShareRatio2)} else {compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}總結(jié):雖然了解一下spark的調(diào)度模式,以前在執(zhí)行中基本都沒啥用到,沒想到spark還有這樣的隱藏功能。
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的深入理解spark两种调度模式:FIFO,FAIR模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: typora及vue主题安装
- 下一篇: 在 KubeSphere 中部署 Har