调度流程图_Flink 实现Locality 模式调度
背景
在計(jì)算與存儲(chǔ)一體化的情況,spark任務(wù)在調(diào)度task時(shí)會(huì)優(yōu)先將其調(diào)度在數(shù)據(jù)所在的節(jié)點(diǎn)上或者相同的rack上,這樣可以減少數(shù)據(jù)在不同節(jié)點(diǎn)或者不同rack上移動(dòng)所帶來的性能消耗;目前在Flink on yarn模式下,TaskExecutor的資源位置完全由yarn自主控制的,那么就可能會(huì)造成任務(wù)所在的節(jié)點(diǎn)與kafka數(shù)據(jù)所在的節(jié)點(diǎn)不在同一個(gè)機(jī)房,從而產(chǎn)生跨機(jī)房的流量消耗,在這樣的一個(gè)環(huán)境背景下,需要將任務(wù)調(diào)度在數(shù)據(jù)所在機(jī)房,以減少流量消耗。(注:基于Flink-1.10.1)
Flink on Yarn調(diào)度流程
在Flink-1.9版本以前使用的調(diào)度模式是LAZY_FROM_SOURCES即以source-vertex為起始節(jié)點(diǎn)開始調(diào)度,當(dāng)有數(shù)據(jù)輸出到下游節(jié)點(diǎn)時(shí)開始調(diào)度下游的vertex,以這種方式部署所有的vertex;在1.9及1.9版本以后使用EAGER調(diào)度模式即會(huì)立刻調(diào)度所有的vertex。下面看一下具體的調(diào)度流程圖:任務(wù)調(diào)度與部署是在JobMaster中通過DefaultScheduler完成,其會(huì)首先為所有的ExecutionVertex向SlotPoo(1)l申請(qǐng)資源然后部署,SlotPool會(huì)向ResourceManager中SlotManager(2)申請(qǐng)資源,如果沒有可用的資源,那么就會(huì)向Yarn申請(qǐng)一個(gè)Container(3),待yarn分配了資源之后,回調(diào)給YarnResourceManager,進(jìn)而啟動(dòng)TaskExecutor(4),TaskExecutor啟動(dòng)之后就會(huì)向YarnResourceManager匯報(bào)其資源情況(5),在YarnResourceManager進(jìn)行資源匹配之后就會(huì)向TaskExecutor申請(qǐng)資源(6),然后TaskExecutor會(huì)將自身的資源分配給SlotPool(7), 最后告知給DefaultScheduler(8)將任務(wù)部署到對(duì)應(yīng)的TaskExecutor上。至此完成一次完整的任務(wù)調(diào)度過程。
在SlotPool向SlotManager申請(qǐng)資源前,會(huì)生成一個(gè)AllocationId的唯一標(biāo)識(shí)(資源ID),并且在申請(qǐng)的時(shí)候會(huì)將這個(gè)標(biāo)識(shí)一起攜帶過去,當(dāng)TaskExecutor向YarnResourceManager匯報(bào)自身資源情況時(shí),在YarnResourceManager中會(huì)做一個(gè)資源請(qǐng)求(攜帶AllocationId)與實(shí)際資源匹配的過程,主要是通過資源大小(cpu、內(nèi)存)匹配,匹配成功之后YarnResourceManager會(huì)向TaskExecutor發(fā)送一個(gè)申請(qǐng)slot請(qǐng)求(攜帶AllocationId),待請(qǐng)求成功之后TaskExecutor會(huì)將資源分配給對(duì)應(yīng)的AllocationId的請(qǐng)求(7),完成資源匹配過程。
Locality 調(diào)度實(shí)現(xiàn)分析
通常Flink與kafka是部署在不同的集群上,這里所說的Locality僅僅是實(shí)現(xiàn)rack級(jí)別的調(diào)度,即將任務(wù)調(diào)度在kafka對(duì)應(yīng)分區(qū)數(shù)據(jù)所在的rack上,為了實(shí)現(xiàn)此功能,分為以下幾個(gè)步驟:
1)數(shù)據(jù)分配:Flink每一個(gè)Source-Task拉取partition是按照一定規(guī)則進(jìn)行分配的,為了實(shí)現(xiàn)相同rack的partition在同一個(gè)task,因此需要改變其分配策略;為了保證每一個(gè)rack的數(shù)據(jù)都被消費(fèi)到,需要對(duì)source并行度進(jìn)行擴(kuò)張,以前可能一個(gè)task消費(fèi)所有rack的數(shù)據(jù),現(xiàn)在需要每一個(gè)rack上的數(shù)據(jù)都有對(duì)應(yīng)的task去拉取數(shù)據(jù)
實(shí)現(xiàn):在flink-conf.yaml 中配置yarn集群機(jī)器分布情況,包括ip以及對(duì)應(yīng)的rack信息,那么任務(wù)啟動(dòng)會(huì)獲取這些信息;在StreamGraphGenerator中的transformSource方法提前生成每個(gè)source-task消費(fèi)的對(duì)應(yīng)topic與partition信息,以及其需要調(diào)度到的rack信息。這里主要說明一下目前的分配策略:
例如:有a,b,c 三個(gè)rack, topic1對(duì)應(yīng)partition:[0,1,2,3,4,5], 可通過KafkaConsumer的partitionsFor方法獲取對(duì)應(yīng)的partition信息,parition的分布情況是:a ->[0,1],b->[2,3],c->[4,5]如果設(shè)置的并行度為:1 ,則分配規(guī)則是:task0(a)->[0,1],task1(b)->[2,3],task2(c)->[4,5]如果設(shè)置的并行度為:4 ,則分配規(guī)則是:task0(a)->[0],task1(b)->[2],task2(c)->[4],task3(a)->[1],task4(b)->[3],task5(c)->[5]注:task0 表示下標(biāo)為0的task擴(kuò)充規(guī)則是:userSourceParallelism%numRack==0?userSourceParallelism:(1+userSourceParallelism/numRack)*numRack, 即生成的并行度是rack個(gè)數(shù)的整數(shù)倍。
生成的配置放在ExecutionConfig中的GlobalParameters中,實(shí)際效果圖:
代表著下標(biāo)為0的task消費(fèi)partition-2,同時(shí)部署在rack-a中的機(jī)器上,下標(biāo)為1的task消費(fèi)partition-1,同時(shí)部署在rack-b的機(jī)器上,下標(biāo)為2的task消費(fèi)partition-0,同時(shí)部署在rack-c中的機(jī)器上。
2)資源申請(qǐng):默認(rèn)情況下在Flink向Yarn申請(qǐng)資源是不攜帶任何NodeManager信息的,通常需要向yarn申請(qǐng)資源的流程是當(dāng)遇到新的Source-Task時(shí)才會(huì)去走這個(gè)流程(根據(jù)slot-shared機(jī)制),因此只需要在Source對(duì)應(yīng)的ExecutionVetex上打上對(duì)應(yīng)的rack標(biāo)簽即可,將這個(gè)rack一直傳遞到Y(jié)arnResourceManager端,然后獲取該rack對(duì)應(yīng)的機(jī)器,從這些機(jī)器上申請(qǐng)資源。
實(shí)現(xiàn):在申請(qǐng)資源前會(huì)給ExecutionVertex配置相關(guān)的資源信息,在ExecutionVertexSchedulingRequirementsMapper.getPhysicalSlotResourceProfile中完成,因此在這里對(duì)ExecutionVertex的資源信息打上rack信息
boolean hasNoConnectedInputs=executionVertex.getJobVertex().getJobVertex().hasNoConnectedInputs(); if(hasNoConnectedInputs){ try{ int index=executionVertex.getParallelSubtaskIndex(); ExecutionConfig executionConfig=executionVertex.getJobVertex().getJobVertex().getJobGraph().getExecutionConfig(); Map map=executionConfig.getGlobalJobParameters().toMap(); String index2Zone=map.get("index2Zone"); String zone=""; ObjectMapper objectMapper=new ObjectMapper(); //index 表示該ExecutionVertext的下標(biāo)Index zone=objectMapper.readTree(index2Zone).findValue(String.valueOf(index)).asText(); //賦予區(qū)域信息 ResourceProfile resourceProfile1=resourceProfile.copy2ZoneUnknown(resourceProfile,zone); LOG.debug("vertexName:{},ResourceProfile:{}",executionVertex.getJobVertex().getName(),resourceProfile1); return resourceProfile1; }catch (Throwable e){ LOG.error("parse resourceProfile error:{}",e); } }在這里重新定義了ResourceProfile,賦予了其rack信息,ResourceProfile會(huì)一直傳遞到Y(jié)arnResourceManager資源申請(qǐng)端:
public CollectionstartNewWorker(ResourceProfile resourceProfile) { if (!resourceProfilesPerWorker.iterator().next().isMatching(resourceProfile)) { return Collections.emptyList(); } //zone 表示 rack信息 String zone=resourceProfile.getZone(); if(zone!=null){ requestYarnContainer(zone); }else{ requestYarnContainer(); } return resourceProfilesPerWorker; }重新定義了requestYarnContainer流程,使請(qǐng)求包含rack信息:AMRMClient.ContainerRequest getContainerRequest(String zone) { String[] ipList= ResourceManager.ZONE_IPS.get(zone).split(",");//獲取該rack下的所有iplist LOG.debug("request slot from [{}] for zone [{}]",ipList,zone); AMRMClient.ContainerRequest request= new AMRMClient.ContainerRequest( getContainerResource(), ipList, null, RM_REQUEST_PRIORITY,false);//false:RelaxLocality表示不允許資源降級(jí)申請(qǐng),一定要使其分布在指定的機(jī)器上 containerRequestList.add(request); return request; }由于yarn返回的是一個(gè)滿足請(qǐng)求的一個(gè)資源集合,因此需要在滿足的集合中做資源過濾,將多余資源返回給yarn,因此在回調(diào)方法onContainersAllocated中:
public void onContainersAllocated(List containers) { runAsync(() -> { log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); //final Collection pendingReques ts = getPendingRequests(); //請(qǐng)求到的host List requestedHost=new ArrayList<>(); containers.stream().map(container -> container.getNodeId().getHost()).forEach(requestedHost::add); //獲取滿足匹配的請(qǐng)求 final Collection pendingRequests=containerRequestList.stream().map(containerRequest -> Tuple2.of(containerRequest.getNodes(),containerRequest)) .filter(tuple2-> requestedHost.stream().filter(host->tuple2.f0.contains(host)) .count()>0 ) .map(map->map.f1).collect(Collectors.toList()); int matchRequest=pendingRequests.size(); log.info("recevied container size : {}, matching request:{}",containers.size(),matchRequest); final Iterator pendingRequestsIterator = pendingRequests.iterator(); // number of allocated containers can be larger than the number of pending container requests //final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); final int numAcceptedContainers = Math.min(matchRequest, numPendingContainerRequests); final List requiredContainers = containers.subList(0, numAcceptedContainers); final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); for (int i = 0; i < requiredContainers.size(); i++) { //removeContainerRequest(pendingRequestsIterator.next()); AMRMClient.ContainerRequest needRemoveRequest=pendingRequestsIterator.next(); containerRequestList.remove(needRemoveRequest); removeContainerRequest(needRemoveRequest); } //返回多余的資源 excessContainers.forEach(this::returnExcessContainer); requiredContainers.forEach(this::startTaskExecutorInContainer); // if we are waiting for no further containers, we can go to the // regular heartbeat interval if (numPendingContainerRequests <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } }); }3) 資源匹配:默認(rèn)情況下,在YarnResourceManager中做分配到的資源與申請(qǐng)的資源匹配時(shí)是按照大小進(jìn)行的,因此需要改為按照rack進(jìn)行匹配
實(shí)現(xiàn):匹配的流程在SlotManager.findExactlyMatchingPendingTaskManagerSlot中:
private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile,String zone) { for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { LOG.info("zone:{},request_zone:{}",zone,pendingTaskManagerSlot.getResourceProfile().getZone()); /** * 區(qū)域匹配 */ if(zone.equals(pendingTaskManagerSlot.getResourceProfile().getZone())){ LOG.debug("get resource zone:{},resourceProfile:{}",zone,pendingTaskManagerSlot.getResourceProfile()); return pendingTaskManagerSlot; }完成了這個(gè)資源匹配過程,并且在后續(xù)的流程中由AllocationId完成資源與具體的ExecutionVertex請(qǐng)求匹配,就可以將ExecutionVertex部署到匹配的機(jī)器上。
4) 指定source的消費(fèi)數(shù)據(jù):在數(shù)據(jù)分配中已經(jīng)將每個(gè)task消費(fèi)的數(shù)據(jù)指定好了,因此在source端只需要獲取對(duì)應(yīng)的分區(qū)信息即可,同時(shí)需要放棄默認(rèn)的分配策略
實(shí)現(xiàn):FlinkKafkaConsumerBase.open 中:
final List allPartitions = new ArrayList<>(); //從配置里面獲取 Map globalMaps=getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap(); String index2TopicPartitionsStr=globalMaps.get("index2TopicPartitions"); ObjectMapper objectMapper=new ObjectMapper(); JsonNode rootNode=objectMapper.readTree(index2TopicPartitionsStr); JsonNode topicPartitionNode=rootNode.findValue(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); topicPartitionNode.fieldNames().forEachRemaining(topic->{ JsonNode partitionsNode=topicPartitionNode.findValue(topic); partitionsNode.iterator().forEachRemaining(jsonNode -> { allPartitions.add(new KafkaTopicPartition(topic,jsonNode.asInt())); }); }); allPartitions.stream().forEach(x->{ LOG.debug("consumer topic:{}, partition:{}",x.getTopic(),x.getPartition()); });allPartitions 就代表了該task需要消費(fèi)的數(shù)據(jù)。
至此整個(gè)流程完成。
總結(jié)
在實(shí)現(xiàn)該方案前,也做過在任務(wù)調(diào)度后直接在FlinkKafkaConsumerBase中自定義partition的分配,即根據(jù)機(jī)器的所在rack去獲取對(duì)應(yīng)的rack上的數(shù)據(jù),但是經(jīng)常會(huì)出現(xiàn)有數(shù)據(jù)的rack上沒有對(duì)應(yīng)的rack任務(wù),只能做降級(jí)處理,將這些rack上的分區(qū)數(shù)據(jù)分配給其他rack上的任務(wù),仍然會(huì)有部分的數(shù)據(jù)跨機(jī)房拉取,流量成本消耗縮減效果并不好,因此才做了這個(gè)Locality的方案,由于涉及的內(nèi)容比較多,本文只提供了一個(gè)實(shí)現(xiàn)的思路與關(guān)鍵的部分代碼。目前的實(shí)現(xiàn)方案仍然存在以下幾個(gè)限制:
? 1.一個(gè)任務(wù)只能消費(fèi)一個(gè)kafka集群的數(shù)據(jù),由于slot-share機(jī)制,不同的JobVertext可以分配到同一個(gè)Slot上,如果有多個(gè)kafka集群的話,source就會(huì)對(duì)應(yīng)多個(gè)JobVertex,那么在后續(xù)的JobVertext在申請(qǐng)資源的時(shí)候就會(huì)尋找前面已經(jīng)申請(qǐng)到資源的JobVertext,很有可能會(huì)匹配到其他的rack的資源,目前并未對(duì)這塊進(jìn)行改造。
?? 2.一個(gè)TaskExecutor只分配一個(gè)Slot,如果有多個(gè)slot的話,第一次申請(qǐng)后,后續(xù)SlotPool向YarnResourceManager申請(qǐng)資源時(shí),直接發(fā)現(xiàn)有可用的Slot就會(huì)直接分配,很有可能會(huì)匹配到其他的rack的資源,目前并未對(duì)這塊進(jìn)行改造。
? ?3.如果topic的partition在rack分配不均勻,可能會(huì)造成流量?jī)A斜,因此需要在topic創(chuàng)建中做好partition的分布。
?? 4.由于source-vertext的擴(kuò)充,會(huì)導(dǎo)致需要的資源變多,因此需要在cpu/內(nèi)存與流量成本消耗之間權(quán)衡。
目前在使用上主要是針對(duì)大的topic采取該方案,流量成本也有很顯著的縮減效果,后續(xù)會(huì)對(duì)以上問題進(jìn)行優(yōu)化。總結(jié)
以上是生活随笔為你收集整理的调度流程图_Flink 实现Locality 模式调度的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 画一个圆角多边形_CAD零基础教程,矩形
- 下一篇: 虚拟机的分类_「面试必备」Java虚拟机