追源索骥:透过源码看懂Flink核心框架的执行流程
https://www.cnblogs.com/bethunebtj/p/9168274.html
- 追源索驥:透過源碼看懂Flink核心框架的執(zhí)行流程
- 前言
- 1.從 Hello,World WordCount開始
- 1.1 flink執(zhí)行環(huán)境
- 1.2 算子(Operator)的注冊(聲明)
- 1.3 程序的執(zhí)行
- 1.3.1 本地模式下的execute方法
- 1.3.2 遠程模式(RemoteEnvironment)的execute方法
- 1.3.3 程序啟動過程
- 2.理解flink的圖結構
- 2.1 flink的三層圖結構
- 2.2 StreamGraph的生成
- 2.2.1 StreamTransformation類代表了流的轉換
- 2.2.2 StreamGraph生成函數(shù)分析
- 2.2.3 WordCount函數(shù)的StreamGraph
- 2.3 JobGraph的生成
- 2.3.1 JobGraph生成源碼
- 2.3.2 operator chain的邏輯
- 2.3.3 JobGraph的提交
- 2.4 ExecutionGraph的生成
- 3. 任務的調度與執(zhí)行
- 3.1 計算資源的調度
- 3.2 JobManager執(zhí)行job
- 3.2.1 JobManager的組件
- 3.2.2 JobManager的啟動過程
- 3.2.3 JobManager啟動Task
- 3.3 TaskManager執(zhí)行task
- 3.3.1 TaskManager的基本組件
- 3.3.2 TaskManager執(zhí)行Task
- 3.3.2.1 生成Task對象
- 3.3.2.2 運行Task對象
- 3.3.2.3 StreamTask的執(zhí)行邏輯
- 3.4 StreamTask與StreamOperator
- 4. StreamOperator的抽象與實現(xiàn)
- 4.1 數(shù)據(jù)源的邏輯——StreamSource與時間模型
- 4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator
- 4.3 StreamSink
- 5. 為執(zhí)行保駕護航——Fault Tolerant與保證Exactly-Once語義
- 5.1 Fault Tolerant演進之路
- 5.1.1 Storm的Record acknowledgement模式
- 5.1.2 Spark streaming的micro batch模式
- 5.1.3 Google Cloud Dataflow的事務式模型
- 5.1.4 Flink的分布式快照機制
- 5.2 checkpoint的生命周期
- 5.2.1 觸發(fā)checkpoint
- 5.2.2 Task層面checkpoint的準備工作
- 5.2.3 操作符的狀態(tài)保存及barrier傳遞
- 5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend
- 5.1 Fault Tolerant演進之路
- 6.數(shù)據(jù)流轉——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程
- 6.1 flink的數(shù)據(jù)抽象
- 6.1.1 MemorySegment
- 6.1.2 ByteBuffer與NetworkBufferPool
- 6.1.3 RecordWriter與Record
- 6.2 數(shù)據(jù)流轉過程
- 6.2.1 整體過程
- 6.2.2 數(shù)據(jù)跨task傳遞
- 6.3 Credit漫談
- 6.3.1 背壓問題
- 6.3.2 使用Credit實現(xiàn)ATM網(wǎng)絡流控
- 6.1 flink的數(shù)據(jù)抽象
- 7.其他核心概念
- 7.1 EventTime時間模型
- 7.2 FLIP-6 部署及處理模型演進
- 7.2.1 現(xiàn)有模型不足
- 7.2.2 核心變更
- 7.2.3 Cluster Manager的架構
- 7.2.4 組件設計及細節(jié)
- 8.后記
前言
Flink是大數(shù)據(jù)處理領域最近很火的一個開源的分布式、高性能的流式處理框架,其對數(shù)據(jù)的處理可以達到毫秒級別。本文以一個來自官網(wǎng)的WordCount例子為引,全面闡述flink的核心架構及執(zhí)行流程,希望讀者可以借此更加深入的理解Flink邏輯。
本文跳過了一些基本概念,如果對相關概念感到迷惑,請參考官網(wǎng)文檔。另外在本文寫作過程中,Flink正式發(fā)布了其1.5 RELEASE版本,在其發(fā)布之后完成的內容將按照1.5的實現(xiàn)來組織。
1.從?Hello,World?WordCount開始
首先,我們把WordCount的例子再放一遍:
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {if (args.length != 2){System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");return;}String hostName = args[0];Integer port = Integer.parseInt(args[1]);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input dataDataStream<String> text = env.socketTextStream(hostName, port);text.flatMap(new LineSplitter()).setParallelism(1)// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1).setParallelism(1).print();// execute programenv.execute("Java WordCount from SocketTextStream Example");}/*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).*/public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}}首先從命令行中獲取socket對端的ip和端口,然后啟動一個執(zhí)行環(huán)境,從socket中讀取數(shù)據(jù),split成單個單詞的流,并按單詞進行總和的計數(shù),最后打印出來。這個例子相信接觸過大數(shù)據(jù)計算或者函數(shù)式編程的人都能看懂,就不過多解釋了。
1.1 flink執(zhí)行環(huán)境
程序的啟動,從這句開始:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()。?
這行代碼會返回一個可用的執(zhí)行環(huán)境。執(zhí)行環(huán)境是整個flink程序執(zhí)行的上下文,記錄了相關配置(如并行度等),并提供了一系列方法,如讀取輸入流的方法,以及真正開始運行整個代碼的execute方法等。對于分布式流處理程序來說,我們在代碼中定義的flatMap,keyBy等等操作,事實上可以理解為一種聲明,告訴整個程序我們采用了什么樣的算子,而真正開啟計算的代碼不在此處。由于我們是在本地運行flink程序,因此這行代碼會返回一個LocalStreamEnvironment,最后我們要調用它的execute方法來開啟真正的任務。我們先接著往下看。
1.2 算子(Operator)的注冊(聲明)
我們以flatMap為例,text.flatMap(new LineSplitter())這一句話跟蹤進去是這樣的:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),getType(), Utils.getCallLocationName(), true);return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));}?
里面完成了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個Operator。flink流式計算的核心概念,就是將數(shù)據(jù)從輸入流一個個傳遞給Operator進行鏈式處理,最后交給輸出流的過程。對數(shù)據(jù)的每一次處理在邏輯上成為一個operator,并且為了本地化處理的效率起見,operator之間也可以串成一個chain一起處理(可以參考責任鏈模式幫助理解)。下面這張圖表明了flink是如何看待用戶的處理流程的:抽象化為一系列operator,以source開始,以sink結尾,中間的operator做的操作叫做transform,并且可以把幾個操作串在一起執(zhí)行。?
?
我們也可以更改flink的設置,要求它不要對某個操作進行chain處理,或者從某個操作開啟一個新chain等。?
上面代碼中的最后一行transform方法的作用是返回一個SingleOutputStreamOperator,它繼承了Datastream類并且定義了一些輔助方法,方便對流的操作。在返回之前,transform方法還把它注冊到了執(zhí)行環(huán)境中(后面生成執(zhí)行圖的時候還會用到它)。其他的操作,包括keyBy,sum和print,都只是不同的算子,在這里出現(xiàn)都是一樣的效果,即生成一個operator并注冊給執(zhí)行環(huán)境用于生成DAG。
1.3 程序的執(zhí)行
程序執(zhí)行即env.execute("Java WordCount from SocketTextStream Example")這行代碼。
1.3.1 本地模式下的execute方法
這行代碼主要做了以下事情:
- 生成StreamGraph。代表程序的拓撲結構,是從用戶代碼直接生成的圖。
- 生成JobGraph。這個圖是要交給flink去生成task的圖。
- 生成一系列配置
- 將JobGraph和配置交給flink集群去運行。如果不是本地運行的話,還會把jar文件通過網(wǎng)絡發(fā)給其他節(jié)點。
- 以本地模式運行的話,可以看到啟動過程,如啟動性能度量、web模塊、JobManager、ResourceManager、taskManager等等
- 啟動任務。值得一提的是在啟動任務之前,先啟動了一個用戶類加載器,這個類加載器可以用來做一些在運行時動態(tài)加載類的工作。
1.3.2 遠程模式(RemoteEnvironment)的execute方法
遠程模式的程序執(zhí)行更加有趣一點。第一步仍然是獲取StreamGraph,然后調用executeRemotely方法進行遠程執(zhí)行。?
該方法首先創(chuàng)建一個用戶代碼加載器
?
然后創(chuàng)建一系列配置,交給Client對象。Client這個詞有意思,看見它就知道這里絕對是跟遠程集群打交道的客戶端。
ClusterClient client;try {client = new StandaloneClusterClient(configuration);client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());}}try {return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();}?
client的run方法首先生成一個JobGraph,然后將其傳遞給JobClient。關于Client、JobClient、JobManager到底誰管誰,可以看這張圖:?
?
確切的說,JobClient負責以異步的方式和JobManager通信(Actor是scala的異步模塊),具體的通信任務由JobClientActor完成。相對應的,JobManager的通信任務也由一個Actor完成。
?
可以看到,該方法阻塞在awaitJobResult方法上,并最終返回了一個JobListeningContext,透過這個Context可以得到程序運行的狀態(tài)和結果。
1.3.3 程序啟動過程
上面提到,整個程序真正意義上開始執(zhí)行,是這里:
遠程模式和本地模式有一點不同,我們先按本地模式來調試。?
我們跟進源碼,(在本地調試模式下)會啟動一個miniCluster,然后開始執(zhí)行代碼:
?
這個方法里有一部分邏輯是與生成圖結構相關的,我們放在第二章里講;現(xiàn)在我們先接著往里跟:
//MiniCluster.java public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");//在這里,最終把job提交給了jobMasterfinal CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));......}?
正如我在注釋里寫的,這一段代碼核心邏輯就是調用那個submitJob方法。那么我們再接著看這個方法:
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {final DispatcherGateway dispatcherGateway;try {dispatcherGateway = getDispatcherGateway();} catch (LeaderRetrievalException | InterruptedException e) {ExceptionUtils.checkInterrupted(e);return FutureUtils.completedExceptionally(e);}// we have to allow queued scheduling in Flip-6 mode because we need to request slots// from the ResourceManagerjobGraph.setAllowQueuedScheduling(true);final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(//在這里執(zhí)行了真正的submit操作(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));return acknowledgeCompletableFuture.thenApply((Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));}?
這里的Dispatcher是一個接收job,然后指派JobMaster去啟動任務的類,我們可以看看它的類結構,有兩個實現(xiàn)。在本地環(huán)境下啟動的是MiniDispatcher,在集群上提交任務時,集群上啟動的是StandaloneDispatcher。?
那么這個Dispatcher又做了什么呢?它啟動了一個JobManagerRunner(這里我要吐槽Flink的命名,這個東西應該叫做JobMasterRunner才對,flink里的JobMaster和JobManager不是一個東西),委托JobManagerRunner去啟動該Job的JobMaster。我們看一下對應的代碼:
//jobManagerRunner.javaprivate void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {......final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);......}?
然后,JobMaster經(jīng)過了一堆方法嵌套之后,執(zhí)行到了這里:
private void scheduleExecutionGraph() {checkState(jobStatusListener == null);// register self as job status change listenerjobStatusListener = new JobManagerJobStatusListener();executionGraph.registerJobStatusListener(jobStatusListener);try {//這里調用了ExecutionGraph的啟動方法executionGraph.scheduleForExecution();}catch (Throwable t) {executionGraph.failGlobal(t);}}?
我們知道,flink的框架里有三層圖結構,其中ExecutionGraph就是真正被執(zhí)行的那一層,所以到這里為止,一個任務從提交到真正執(zhí)行的流程就走完了,我們再回顧一下(順便提一下遠程提交時的流程區(qū)別):
- 客戶端代碼的execute方法執(zhí)行;
- 本地環(huán)境下,MiniCluster完成了大部分任務,直接把任務委派給了MiniDispatcher;
- 遠程環(huán)境下,啟動了一個RestClusterClient,這個類會以HTTP Rest的方式把用戶代碼提交到集群上;
- 遠程環(huán)境下,請求發(fā)到集群上之后,必然有個handler去處理,在這里是JobSubmitHandler。這個類接手了請求后,委派StandaloneDispatcher啟動job,到這里之后,本地提交和遠程提交的邏輯往后又統(tǒng)一了;
- Dispatcher接手job之后,會實例化一個JobManagerRunner,然后用這個runner啟動job;
- JobManagerRunner接下來把job交給了JobMaster去處理;
- JobMaster使用ExecutionGraph的方法啟動了整個執(zhí)行圖;整個任務就啟動起來了。
至此,第一部分就講完了。
2.理解flink的圖結構
第一部分講到,我們的主函數(shù)最后一項任務就是生成StreamGraph,然后生成JobGraph,然后以此開始調度任務運行,所以接下來我們從這里入手,繼續(xù)探索flink。
2.1 flink的三層圖結構
事實上,flink總共提供了三種圖的抽象,我們前面已經(jīng)提到了StreamGraph和JobGraph,還有一種是ExecutionGraph,是用于調度的基本數(shù)據(jù)結構。?
?
上面這張圖清晰的給出了flink各個圖的工作原理和轉換過程。其中最后一個物理執(zhí)行圖并非flink的數(shù)據(jù)結構,而是程序開始執(zhí)行后,各個task分布在不同的節(jié)點上,所形成的物理上的關系表示。
- 從JobGraph的圖里可以看到,數(shù)據(jù)從上一個operator流到下一個operator的過程中,上游作為生產者提供了IntermediateDataSet,而下游作為消費者需要JobEdge。事實上,JobEdge是一個通信管道,連接了上游生產的dataset和下游的JobVertex節(jié)點。
- 在JobGraph轉換到ExecutionGraph的過程中,主要發(fā)生了以下轉變:?
- 加入了并行度的概念,成為真正可調度的圖結構
- 生成了與JobVertex對應的ExecutionJobVertex,ExecutionVertex,與IntermediateDataSet對應的IntermediateResult和IntermediateResultPartition等,并行將通過這些類實現(xiàn)
- ExecutionGraph已經(jīng)可以用于調度任務。我們可以看到,flink根據(jù)該圖生成了一一對應的Task,每個task對應一個ExecutionGraph的一個Execution。Task用InputGate、InputChannel和ResultPartition對應了上面圖中的IntermediateResult和ExecutionEdge。
那么,flink抽象出這三層圖結構,四層執(zhí)行邏輯的意義是什么呢??
StreamGraph是對用戶邏輯的映射。JobGraph在此基礎上進行了一些優(yōu)化,比如把一部分操作串成chain以提高效率。ExecutionGraph是為了調度存在的,加入了并行處理的概念。而在此基礎上真正執(zhí)行的是Task及其相關結構。
2.2 StreamGraph的生成
在第一節(jié)的算子注冊部分,我們可以看到,flink把每一個算子transform成一個對流的轉換(比如上文中返回的SingleOutputStreamOperator是一個DataStream的子類),并且注冊到執(zhí)行環(huán)境中,用于生成StreamGraph。實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations)?其中的transformations是一個list,里面記錄的就是我們在transform方法中放進來的算子。
2.2.1 StreamTransformation類代表了流的轉換
StreamTransformation代表了從一個或多個DataStream生成新DataStream的操作。順便,DataStream類在內部組合了一個StreamTransformation類,實際的轉換操作均通過該類完成。?
?
我們可以看到,從source到各種map,union再到sink操作全部被映射成了StreamTransformation。?
其映射過程如下所示:?
以MapFunction為例:
- 首先,用戶代碼里定義的UDF會被當作其基類對待,然后交給StreamMap這個operator做進一步包裝。事實上,每一個Transformation都對應了一個StreamOperator。
- 由于map這個操作只接受一個輸入,所以再被進一步包裝為OneInputTransformation。
-
最后,將該transformation注冊到執(zhí)行環(huán)境中,當執(zhí)行上文提到的generate方法時,生成StreamGraph圖結構。
另外,并不是每一個 StreamTransformation 都會轉換成runtime層中的物理操作。有一些只是邏輯概念,比如union、split/select、partition等。如下圖所示的轉換樹,在運行時會優(yōu)化成下方的操作圖。?
2.2.2 StreamGraph生成函數(shù)分析
我們從StreamGraphGenerator.generate()方法往下看:
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {return new StreamGraphGenerator(env).generateInternal(transformations);}//注意,StreamGraph的生成是從sink開始的private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {for (StreamTransformation<?> transformation: transformations) {transform(transformation);}return streamGraph;}//這個方法的核心邏輯就是判斷傳入的steamOperator是哪種類型,并執(zhí)行相應的操作,詳情見下面那一大堆if-elseprivate Collection<Integer> transform(StreamTransformation<?> transform) {if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {// if the max parallelism hasn't been set, then first use the job wide max parallelism// from theExecutionConfig.int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// call at least once to trigger exceptions about MissingTypeInfotransform.getOutputType();Collection<Integer> transformedIds;//這里對操作符的類型進行判斷,并以此調用相應的處理邏輯.簡而言之,處理的核心無非是遞歸的將該節(jié)點和節(jié)點的上游節(jié)點加入圖if (transform instanceof OneInputTransformation<?, ?>) {transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);} else if (transform instanceof SourceTransformation<?>) {transformedIds = transformSource((SourceTransformation<?>) transform);} else if (transform instanceof SinkTransformation<?>) {transformedIds = transformSink((SinkTransformation<?>) transform);} else if (transform instanceof UnionTransformation<?>) {transformedIds = transformUnion((UnionTransformation<?>) transform);} else if (transform instanceof SplitTransformation<?>) {transformedIds = transformSplit((SplitTransformation<?>) transform);} else if (transform instanceof SelectTransformation<?>) {transformedIds = transformSelect((SelectTransformation<?>) transform);} else if (transform instanceof FeedbackTransformation<?>) {transformedIds = transformFeedback((FeedbackTransformation<?>) transform);} else if (transform instanceof CoFeedbackTransformation<?>) {transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);} else if (transform instanceof PartitionTransformation<?>) {transformedIds = transformPartition((PartitionTransformation<?>) transform);} else if (transform instanceof SideOutputTransformation<?>) {transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);} else {throw new IllegalStateException("Unknown transformation: " + transform);}//注意這里和函數(shù)開始時的方法相對應,在有向圖中要注意避免循環(huán)的產生// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}if (transform.getBufferTimeout() > 0) {streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());}if (transform.getUid() != null) {streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (transform.getMinResources() != null && transform.getPreferredResources() != null) {streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}return transformedIds;}?
因為map,filter等常用操作都是OneInputStreamOperator,我們就來看看transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {Collection<Integer> inputIds = transform(transform.getInput());// 在遞歸處理節(jié)點過程中,某個節(jié)點可能已經(jīng)被其他子節(jié)點先處理過了,需要跳過if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}//這里是獲取slotSharingGroup。這個group用來定義當前我們在處理的這個操作符可以跟什么操作符chain到一個slot里進行操作//因為有時候我們可能不滿意flink替我們做的chain聚合//一個slot就是一個執(zhí)行task的基本容器String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);//把該operator加入圖streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());//對于keyedStream,我們還要記錄它的keySelector方法//flink并不真正為每個keyedStream保存一個key,而是每次需要用到key的時候都使用keySelector方法進行計算//因此,我們自定義的keySelector方法需要保證冪等性//到后面介紹keyGroup的時候我們還會再次提到這一點if (transform.getStateKeySelector() != null) {TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());//為當前節(jié)點和它的依賴節(jié)點建立邊//這里可以看到之前提到的select union partition等邏輯節(jié)點被合并入edge的過程for (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}return Collections.singleton(transform.getId());}public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null,new ArrayList<String>(),null);}//addEdge的實現(xiàn),會合并一些邏輯節(jié)點private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag) {//如果輸入邊是側輸出節(jié)點,則把side的輸入邊作為本節(jié)點的輸入邊,并遞歸調用if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);//如果輸入邊是select,則把select的輸入邊作為本節(jié)點的輸入邊} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames = virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);//如果是partition節(jié)點} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {//正常的edge處理邏輯StreamNode upstreamNode = getStreamNode(upStreamVertexID);StreamNode downstreamNode = getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}}?
2.2.3 WordCount函數(shù)的StreamGraph
flink提供了一個StreamGraph可視化顯示工具,在這里?
我們可以把我們的程序的執(zhí)行計劃打印出來System.out.println(env.getExecutionPlan());?復制到這個網(wǎng)站上,點擊生成,如圖所示:?
?
可以看到,我們源程序被轉化成了4個operator。?
另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程。由于我設定了每個操作符的并行度都是1,所以在每個操作符之間都是直接FORWARD,不存在shuffle的過程。
2.3 JobGraph的生成
flink會根據(jù)上一步生成的StreamGraph生成JobGraph,然后將JobGraph發(fā)送到server端進行ExecutionGraph的解析。
2.3.1 JobGraph生成源碼
與StreamGraph類似,JobGraph的入口方法是StreamingJobGraphGenerator.createJobGraph()。我們直接來看源碼
private JobGraph createJobGraph() {// 設置啟動模式為所有節(jié)點均在一開始就啟動jobGraph.setScheduleMode(ScheduleMode.EAGER);// 為每個節(jié)點生成hash idMap<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// 為了保持兼容性創(chuàng)建的hashList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();//生成jobvertex,串成chain等//這里的邏輯大致可以理解為,挨個遍歷節(jié)點,如果該節(jié)點是一個chain的頭節(jié)點,就生成一個JobVertex,如果不是頭節(jié)點,就要把自身配置并入頭節(jié)點,然后把頭節(jié)點和自己的出邊相連;對于不能chain的節(jié)點,當作只有頭節(jié)點處理即可setChaining(hashes, legacyHashes, chainedOperatorHashes);//設置輸入邊edgesetPhysicalEdges();//設置slot共享groupsetSlotSharing();//配置檢查點configureCheckpointing();// 如果有之前的緩存文件的配置的話,重新讀入for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());}// 傳遞執(zhí)行環(huán)境配置try {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());}catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +"This indicates that non-serializable types (like custom serializers) were registered");}return jobGraph;}?
2.3.2 operator chain的邏輯
為了更高效地分布式執(zhí)行,Flink會盡可能地將operator的subtask鏈接(chain)在一起形成task。每個task在一個線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時提高整體的吞吐量。
?
上圖中將KeyAggregation和Sink兩個operator進行了合并,因為這兩個合并后并不會改變整體的拓撲結構。但是,并不是任意兩個 operator 就能 chain 一起的,其條件還是很苛刻的:
- 上下游的并行度一致
- 下游節(jié)點的入度為1 (也就是說下游節(jié)點沒有來自其他節(jié)點的輸入)
- 上下游節(jié)點都在同一個 slot group 中(下面會解釋 slot group)
- 下游節(jié)點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)
- 上游節(jié)點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)
- 兩個節(jié)點間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒有禁用 chain
flink的chain邏輯是一種很常見的設計,比如spring的interceptor也是類似的實現(xiàn)方式。通過把操作符串成一個大操作符,flink避免了把數(shù)據(jù)序列化后通過網(wǎng)絡發(fā)送給其他節(jié)點的開銷,能夠大大增強效率。
2.3.3 JobGraph的提交
前面已經(jīng)提到,JobGraph的提交依賴于JobClient和JobManager之間的異步通信,如圖所示:?
?
在submitJobAndWait方法中,其首先會創(chuàng)建一個JobClientActor的ActorRef,然后向其發(fā)起一個SubmitJobAndWait消息,該消息將JobGraph的實例提交給JobClientActor。發(fā)起模式是ask,它表示需要一個應答消息。
?
該SubmitJobAndWait消息被JobClientActor接收后,最終通過調用tryToSubmitJob方法觸發(fā)真正的提交動作。當JobManager的actor接收到來自client端的請求后,會執(zhí)行一個submitJob方法,主要做以下事情:
- 向BlobLibraryCacheManager注冊該Job;
- 構建ExecutionGraph對象;
- 對JobGraph中的每個頂點進行初始化;
- 將DAG拓撲中從source開始排序,排序后的頂點集合附加到Exec> - utionGraph對象;
- 獲取檢查點相關的配置,并將其設置到ExecutionGraph對象;
- 向ExecutionGraph注冊相關的listener;
- 執(zhí)行恢復操作或者將JobGraph信息寫入SubmittedJobGraphStore以在后續(xù)用于恢復目的;
- 響應給客戶端JobSubmitSuccess消息;
- 對ExecutionGraph對象進行調度執(zhí)行;
最后,JobManger會返回消息給JobClient,通知該任務是否提交成功。
2.4 ExecutionGraph的生成
與StreamGraph和JobGraph不同,ExecutionGraph并不是在我們的客戶端程序生成,而是在服務端(JobManager處)生成的,順便flink只維護一個JobManager。其入口代碼是ExecutionGraphBuilder.buildGraph(...)?
該方法長200多行,其中一大半是checkpoiont的相關邏輯,我們暫且略過,直接看核心方法executionGraph.attachJobGraph(sortedTopology)?
因為ExecutionGraph事實上只是改動了JobGraph的每個節(jié)點,而沒有對整個拓撲結構進行變動,所以代碼里只是挨個遍歷jobVertex并進行處理:
?
至此,ExecutorGraph就創(chuàng)建完成了。
3. 任務的調度與執(zhí)行
關于flink的任務執(zhí)行架構,官網(wǎng)的這兩張圖就是最好的說明:?
?
Flink 集群啟動后,首先會啟動一個 JobManger 和多個的 TaskManager。用戶的代碼會由JobClient 提交給 JobManager,JobManager 再把來自不同用戶的任務發(fā)給 不同的TaskManager 去執(zhí)行,每個TaskManager管理著多個task,task是執(zhí)行計算的最小結構, TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager。TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸。上述除了task外的三者均為獨立的 JVM 進程。?
要注意的是,TaskManager和job并非一一對應的關系。flink調度的最小單元是task而非TaskManager,也就是說,來自不同job的不同task可能運行于同一個TaskManager的不同線程上。?
?
一個flink任務所有可能的狀態(tài)如上圖所示。圖上畫的很明白,就不再贅述了。
3.1 計算資源的調度
Task slot是一個TaskManager內資源分配的最小載體,代表了一個固定大小的資源子集,每個TaskManager會將其所占有的資源平分給它的slot。?
通過調整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味著每個task運行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。?
而在同一個JVM進程中的task,可以共享TCP連接(基于多路復用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡傳輸,也能共享一些數(shù)據(jù)結構,一定程度上減少了每個task的消耗。?
每個slot可以接受單個task,也可以接受多個連續(xù)task組成的pipeline,如下圖所示,FlatMap函數(shù)占用一個taskslot,而key Agg函數(shù)和sink函數(shù)共用一個taskslot:?
?
為了達到共用slot的目的,除了可以以chain的方式pipeline算子,我們還可以允許SlotSharingGroup,如下圖所示:?
?
我們可以把不能被chain成一條的兩個操作如flatmap和key&sink放在一個TaskSlot里執(zhí)行,這樣做可以獲得以下好處:
- 共用slot使得我們不再需要計算每個任務需要的總task數(shù)目,直接取最高算子的并行度即可
- 對計算資源的利用率更高。例如,通常的輕量級操作map和重量級操作Aggregate不再分別需要一個線程,而是可以在同一個線程內執(zhí)行,而且對于slot有限的場景,我們可以增大每個task的并行度了。?
接下來我們還是用官網(wǎng)的圖來說明flink是如何重用slot的:? - TaskManager1分配一個SharedSlot0
- 把source task放入一個SimpleSlot0,再把該slot放入SharedSlot0
- 把flatmap task放入一個SimpleSlot1,再把該slot放入SharedSlot0
- 因為我們的flatmap task并行度是2,因此不能再放入SharedSlot0,所以向TaskMange21申請了一個新的SharedSlot0
- 把第二個flatmap task放進一個新的SimpleSlot,并放進TaskManager2的SharedSlot0
- 開始處理key&sink task,因為其并行度也是2,所以先把第一個task放進TaskManager1的SharedSlot
- 把第二個key&sink放進TaskManager2的SharedSlot
3.2 JobManager執(zhí)行job
JobManager負責接收 flink 的作業(yè),調度 task,收集 job 的狀態(tài)、管理 TaskManagers。被實現(xiàn)為一個 akka actor。
3.2.1 JobManager的組件
- BlobServer 是一個用來管理二進制大文件的服務,比如保存用戶上傳的jar文件,該服務會將其寫到磁盤上。還有一些相關的類,如BlobCache,用于TaskManager向JobManager下載用戶的jar文件
- InstanceManager 用來管理當前存活的TaskManager的組件,記錄了TaskManager的心跳信息等
- CompletedCheckpointStore 用于保存已完成的checkpoint相關信息,持久化到內存中或者zookeeper上
- MemoryArchivist 保存了已經(jīng)提交到flink的作業(yè)的相關信息,如JobGraph等
3.2.2 JobManager的啟動過程
先列出JobManager啟動的核心代碼
def runJobManager(configuration: Configuration,executionMode: JobManagerMode,listeningAddress: String,listeningPort: Int): Unit = {val numberProcessors = Hardware.getNumberCPUCores()val futureExecutor = Executors.newScheduledThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-future"))val ioExecutor = Executors.newFixedThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-io"))val timeout = AkkaUtils.getTimeout(configuration)// we have to first start the JobManager ActorSystem because this determines the port if 0// was chosen before. The method startActorSystem will update the configuration correspondingly.val jobManagerSystem = startActorSystem(configuration,listeningAddress,listeningPort)val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,ioExecutor,AddressResolution.NO_ADDRESS_RESOLUTION)val metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration))metricRegistry.startQueryService(jobManagerSystem, null)val (_, _, webMonitorOption, _) = try {startJobManagerActors(jobManagerSystem,configuration,executionMode,listeningAddress,futureExecutor,ioExecutor,highAvailabilityServices,metricRegistry,classOf[JobManager],classOf[MemoryArchivist],Option(classOf[StandaloneResourceManager]))} catch {case t: Throwable =>futureExecutor.shutdownNow()ioExecutor.shutdownNow()throw t}// block until everything is shut downjobManagerSystem.awaitTermination()....... }?
- 配置Akka并生成ActorSystem,啟動JobManager
- 啟動HA和metric相關服務
- 在startJobManagerActors()方法中啟動JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等
- 阻塞等待終止
- 集群通過LeaderService等選出JobManager的leader
3.2.3 JobManager啟動Task
JobManager 是一個Actor,通過各種消息來完成核心邏輯:
override def handleMessage: Receive = {case GrantLeadership(newLeaderSessionID) =>log.info(s"JobManager $getAddress was granted leadership with leader session ID " +s"$newLeaderSessionID.")leaderSessionID = newLeaderSessionID.......?
有幾個比較重要的消息:
- GrantLeadership 獲得leader授權,將自身被分發(fā)到的 session id 寫到 zookeeper,并恢復所有的 jobs
- RevokeLeadership 剝奪leader授權,打斷清空所有的 job 信息,但是保留作業(yè)緩存,注銷所有的 TaskManagers
- RegisterTaskManagers 注冊 TaskManager,如果之前已經(jīng)注冊過,則只給對應的 Instance 發(fā)送消息,否則啟動注冊邏輯:在 InstanceManager 中注冊該 Instance 的信息,并停止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】,同時使用 watch 監(jiān)聽 task manager 的存活
- SubmitJob 提交 jobGraph?
最后一項SubmintJob就是我們要關注的,從客戶端收到JobGraph,轉換為ExecutionGraph并執(zhí)行的過程。
?
首先做一些準備工作,然后獲取一個ExecutionGraph,判斷是否是恢復的job,然后將job保存下來,并且通知客戶端本地已經(jīng)提交成功了,最后如果確認本JobManager是leader,則執(zhí)行executionGraph.scheduleForExecution()方法,這個方法經(jīng)過一系列調用,把每個ExecutionVertex傳遞給了Excution類的deploy方法:
public void deploy() throws JobException {......try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) {throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}// race double check, did we fail/cancel and do we need to release the slot?if (this.state != DEPLOYING) {slot.releaseSlot();return;}if (LOG.isInfoEnabled()) {LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),attemptNumber, getAssignedResourceLocation().getHostname()));}final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId,slot,taskState,attemptNumber);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);......}catch (Throwable t) {markFailed(t);ExceptionUtils.rethrow(t);}}?
我們首先生成了一個TaskDeploymentDescriptor,然后交給了taskManagerGateway.submitTask()方法執(zhí)行。接下來的部分,就屬于TaskManager的范疇了。
3.3 TaskManager執(zhí)行task
3.3.1 TaskManager的基本組件
TaskManager是flink中資源管理的基本組件,是所有執(zhí)行任務的基本容器,提供了內存管理、IO管理、通信管理等一系列功能,本節(jié)對各個模塊進行簡要介紹。?
1. MemoryManager flink并沒有把所有內存的管理都委托給JVM,因為JVM普遍存在著存儲對象密度低、大內存時GC對系統(tǒng)影響大等問題。所以flink自己抽象了一套內存管理機制,將所有對象序列化后放在自己的MemorySegment上進行管理。MemoryManger涉及內容較多,將在后續(xù)章節(jié)進行繼續(xù)剖析。?
2. IOManager flink通過IOManager管理磁盤IO的過程,提供了同步和異步兩種寫模式,又進一步區(qū)分了block、buffer和bulk三種讀寫方式。?
IOManager提供了兩種方式枚舉磁盤文件,一種是直接遍歷文件夾下所有文件,另一種是計數(shù)器方式,對每個文件名以遞增順序訪問。?
在底層,flink將文件IO抽象為FileIOChannle,封裝了底層實現(xiàn)。?
?
可以看到,flink在底層實際上都是以異步的方式進行讀寫。?
3. NetworkEnvironment 是TaskManager的網(wǎng)絡 IO 組件,包含了追蹤中間結果和數(shù)據(jù)交換的數(shù)據(jù)結構。它的構造器會統(tǒng)一將配置的內存先分配出來,抽象成 NetworkBufferPool 統(tǒng)一管理內存的申請和釋放。意思是說,在輸入和輸出數(shù)據(jù)時,不管是保留在本地內存,等待chain在一起的下個操作符進行處理,還是通過網(wǎng)絡把本操作符的計算結果發(fā)送出去,都被抽象成了NetworkBufferPool。后續(xù)我們還將對這個組件進行詳細分析。
3.3.2 TaskManager執(zhí)行Task
對于TM來說,執(zhí)行task就是把收到的TaskDeploymentDescriptor對象轉換成一個task并執(zhí)行的過程。TaskDeploymentDescriptor這個類保存了task執(zhí)行所必須的所有內容,例如序列化的算子,輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個subtask執(zhí)行等等。?
按照正常邏輯思維,很容易想到TM的submitTask方法的行為:首先是確認資源,如尋找JobManager和Blob,而后建立連接,解序列化算子,收集task相關信息,接下來就是創(chuàng)建一個新的Task對象,這個task對象就是真正執(zhí)行任務的關鍵所在。
?
如果讀者是從頭開始看這篇blog,里面有很多對象應該已經(jīng)比較明確其作用了(除了那個brVarManager,這個是管理廣播變量的,廣播變量是一類會被分發(fā)到每個任務中的共享變量)。接下來的主要任務,就是把這個task啟動起來,然后報告說已經(jīng)啟動task了:
// all good, we kick off the task, which performs its own initialization task.startTaskThread()sender ! decorateMessage(Acknowledge.get())?
3.3.2.1 生成Task對象
在執(zhí)行new Task()方法時,第一步是把構造函數(shù)里的這些變量賦值給當前task的fields。?
接下來是初始化ResultPartition和InputGate。這兩個類描述了task的輸出數(shù)據(jù)和輸入數(shù)據(jù)。
?
最后,創(chuàng)建一個Thread對象,并把自己放進該對象,這樣在執(zhí)行時,自己就有了自身的線程的引用。
3.3.2.2 運行Task對象
Task對象本身就是一個Runable,因此在其run方法里定義了運行邏輯。?
第一步是切換Task的狀態(tài):
?
接下來,就是導入用戶類加載器并加載用戶代碼。?
然后,是向網(wǎng)絡管理器注冊當前任務(flink的各個算子在運行時進行數(shù)據(jù)交換需要依賴網(wǎng)絡管理器),分配一些緩存以保存數(shù)據(jù)?
然后,讀入指定的緩存文件。?
然后,再把task創(chuàng)建時傳入的那一大堆變量用于創(chuàng)建一個執(zhí)行環(huán)境Envrionment。?
再然后,對于那些并不是第一次執(zhí)行的task(比如失敗后重啟的)要恢復其狀態(tài)。?
接下來最重要的是
方法。為什么這么說呢,因為這個方法就是用戶代碼所真正被執(zhí)行的入口。比如我們寫的什么new MapFunction()的邏輯,最終就是在這里被執(zhí)行的。這里說一下這個invokable,這是一個抽象類,提供了可以被TaskManager執(zhí)行的對象的基本抽象。?
這個invokable是在解析JobGraph的時候生成相關信息的,并在此處形成真正可執(zhí)行的對象
?
?
上圖顯示了flink提供的可被執(zhí)行的Task類型。從名字上就可以看出各個task的作用,在此不再贅述。?
接下來就是invoke方法了,因為我們的wordcount例子用了流式api,在此我們以StreamTask的invoke方法為例進行說明。
3.3.2.3 StreamTask的執(zhí)行邏輯
先上部分核心代碼:
public final void invoke() throws Exception {boolean disposed = false;try {// -------- Initialize ---------//先做一些賦值操作......// if the clock is not already set, then assign a default TimeServiceProvider//處理timerif (timerService == null) {ThreadFactory timerThreadFactory =new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);}//把之前JobGraph串起來的chain的信息形成實現(xiàn)operatorChain = new OperatorChain<>(this);headOperator = operatorChain.getHeadOperator();// task specific initialization//這個init操作的起名非常詭異,因為這里主要是處理算子采用了自定義的checkpoint檢查機制的情況,但是起了一個非常大眾臉的名字init();// save the work of reloading state, etc, if the task is already canceledif (canceled) {throw new CancelTaskException();}// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are openedsynchronized (lock) {// both the following operations are protected by the lock// so that we avoid race conditions in the case that initializeState()// registers a timer, that fires before the open() is called.//初始化操作符狀態(tài),主要是一些state啥的initializeState();//對于富操作符,執(zhí)行其open操作openAllOperators();}// final check to exit early before starting to runf (canceled) {throw new CancelTaskException();}// let the task do its work//真正開始執(zhí)行的代碼isRunning = true;run();?
StreamTask.invoke()方法里,第一個值得一說的是TimerService。Flink在2015年決定向StreamTask類加入timer service的時候解釋到:
This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.
第二個要注意的是chain操作。前面提到了,flink會出于優(yōu)化的角度,把一些算子chain成一個整體的算子作為一個task來執(zhí)行。比如wordcount例子中,Source和FlatMap算子就被chain在了一起。在進行chain操作的時候,會設定頭節(jié)點,并且指定輸出的RecordWriter。
接下來不出所料仍然是初始化,只不過初始化的對象變成了各個operator。如果是有checkpoint的,那就從state信息里恢復,不然就作為全新的算子處理。從源碼中可以看到,flink針對keyed算子和普通算子做了不同的處理。keyed算子在初始化時需要計算出一個group區(qū)間,這個區(qū)間的值在整個生命周期里都不會再變化,后面key就會根據(jù)hash的不同結果,分配到特定的group中去計算。順便提一句,flink的keyed算子保存的是對每個數(shù)據(jù)的key的計算方法,而非真實的key,用戶需要自己保證對每一行數(shù)據(jù)提供的keySelector的冪等性。至于為什么要用KeyGroup的設計,這就牽扯到擴容的范疇了,將在后面的章節(jié)進行講述。?
對于openAllOperators()方法,就是對各種RichOperator執(zhí)行其open方法,通常可用于在執(zhí)行計算之前加載資源。?
最后,run方法千呼萬喚始出來,該方法經(jīng)過一系列跳轉,最終調用chain上的第一個算子的run方法。在wordcount的例子中,它最終調用了SocketTextStreamFunction的run,建立socket連接并讀入文本。
3.4 StreamTask與StreamOperator
前面提到,Task對象在執(zhí)行過程中,把執(zhí)行的任務交給了StreamTask這個類去執(zhí)行。在我們的wordcount例子中,實際初始化的是OneInputStreamTask的對象(參考上面的類圖)。那么這個對象是如何執(zhí)行用戶的代碼的呢?
protected void run() throws Exception {// cache processor reference on the stack, to make the code more JIT friendlyfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// all the work happens in the "processInput" method}}?
它做的,就是把任務直接交給了InputProcessor去執(zhí)行processInput方法。這是一個StreamInputProcessor的實例,該processor的任務就是處理輸入的數(shù)據(jù),包括用戶數(shù)據(jù)、watermark和checkpoint數(shù)據(jù)等。我們先來看看這個processor是如何產生的:
public void init() throws Exception {StreamConfig configuration = getConfiguration();TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {InputGate[] inputGates = getEnvironment().getAllInputGates();inputProcessor = new StreamInputProcessor<>(inputGates,inSerializer,this,configuration.getCheckpointMode(),getCheckpointLock(),getEnvironment().getIOManager(),getEnvironment().getTaskManagerInfo().getConfiguration(),getStreamStatusMaintainer(),this.headOperator);// make sure that stream tasks report their I/O statisticsinputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());}}?
這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息,生成自己的inputProcessor。那么inputProcessor是如何處理數(shù)據(jù)的呢?我們接著跟進源碼:
public boolean processInput() throws Exception {if (isFinished) {return false;}if (numRecordsIn == null) {numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();}//這個while是用來處理單個元素的(不要想當然以為是循環(huán)處理元素的)while (true) {//注意 1在下面//2.接下來,會利用這個反序列化器得到下一個數(shù)據(jù)記錄,并進行解析(是用戶數(shù)據(jù)還是watermark等等),然后進行對應的操作if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycle();currentRecordDeserializer = null;}if (result.isFullRecord()) {StreamElement recordOrMark = deserializationDelegate.getInstance();//如果元素是watermark,就準備更新當前channel的watermark值(并不是簡單賦值,因為有亂序存在),if (recordOrMark.isWatermark()) {// handle watermarkstatusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;} else if (recordOrMark.isStreamStatus()) {//如果元素是status,就進行相應處理。可以看作是一個flag,標志著當前stream接下來即將沒有元素輸入(idle),或者當前即將由空閑狀態(tài)轉為有元素狀態(tài)(active)。同時,StreamStatus還對如何處理watermark有影響。通過發(fā)送status,上游的operator可以很方便的通知下游當前的數(shù)據(jù)流的狀態(tài)。// handle stream statusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;} else if (recordOrMark.isLatencyMarker()) {//LatencyMarker是用來衡量代碼執(zhí)行時間的。在Source處創(chuàng)建,攜帶創(chuàng)建時的時間戳,流到Sink時就可以知道經(jīng)過了多長時間// handle latency markersynchronized (lock) {streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;} else {//這里就是真正的,用戶的代碼即將被執(zhí)行的地方。從章節(jié)1到這里足足用了三萬字,有點萬里長征的感覺// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized (lock) {numRecordsIn.inc();streamOperator.setKeyContextElement1(record);streamOperator.processElement(record);}return true;}}}//1.程序首先獲取下一個buffer//這一段代碼是服務于flink的FaultTorrent機制的,后面我會講到,這里只需理解到它會嘗試獲取buffer,然后賦值給當前的反序列化器final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}else {isFinished = true;if (!barrierHandler.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return false;}}}?
到此為止,以上部分就是一個flink程序啟動后,到執(zhí)行用戶代碼之前,flink框架所做的準備工作。回顧一下:
- 啟動一個環(huán)境
- 生成StreamGraph
- 注冊和選舉JobManager
- 在各節(jié)點生成TaskManager,并根據(jù)JobGraph生成對應的Task
- 啟動各個task,準備執(zhí)行代碼
接下來,我們挑幾個Operator看看flink是如何抽象這些算子的。
4. StreamOperator的抽象與實現(xiàn)
4.1 數(shù)據(jù)源的邏輯——StreamSource與時間模型
StreamSource抽象了一個數(shù)據(jù)源,并且指定了一些如何處理數(shù)據(jù)的模式。
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {......public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {run(lockingObject, streamStatusMaintainer, output);}public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector) throws Exception {final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();LatencyMarksEmitter latencyEmitter = null;if (getExecutionConfig().isLatencyTrackingEnabled()) {latencyEmitter = new LatencyMarksEmitter<>(getProcessingTimeService(),collector,getExecutionConfig().getLatencyTrackingInterval(),getOperatorConfig().getVertexID(),getRuntimeContext().getIndexOfThisSubtask());}final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);try {userFunction.run(ctx);// if we get here, then the user function either exited after being done (finite source)// or the function was canceled or stopped. For the finite source case, we should emit// a final watermark that indicates that we reached the end of event-timeif (!isCanceledOrStopped()) {ctx.emitWatermark(Watermark.MAX_WATERMARK);}} finally {// make sure that the context is closed in any casectx.close();if (latencyEmitter != null) {latencyEmitter.close();}}}......private static class LatencyMarksEmitter<OUT> {private final ScheduledFuture<?> latencyMarkTimer;public LatencyMarksEmitter(final ProcessingTimeService processingTimeService,final Output<StreamRecord<OUT>> output,long latencyTrackingInterval,final int vertexID,final int subtaskIndex) {latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() {@Overridepublic void onProcessingTime(long timestamp) throws Exception {try {// ProcessingTimeService callbacks are executed under the checkpointing lockoutput.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));} catch (Throwable t) {// we catch the Throwables here so that we don't trigger the processing// timer services async exception handlerLOG.warn("Error while emitting latency marker.", t);}}},0L,latencyTrackingInterval);}public void close() {latencyMarkTimer.cancel(true);}} }?
在StreamSource生成上下文之后,接下來就是把上下文交給SourceFunction去執(zhí)行:
SourceFunction是對Function的一個抽象,就好像MapFunction,KeyByFunction一樣,用戶選擇實現(xiàn)這些函數(shù),然后flink框架就能利用這些函數(shù)進行計算,完成用戶邏輯。?
我們的wordcount程序使用了flink提供的一個SocketTextStreamFunction。我們可以看一下它的實現(xiàn)邏輯,對source如何運行有一個基本的認識:
?
整段代碼里,只有collect方法有些復雜度,后面我們在講到flink的對象機制時會結合來講,此處知道collect方法會收集結果,然后發(fā)送給接收者即可。在我們的wordcount里,這個算子的接收者就是被chain在一起的flatmap算子,不記得這個示例程序的話,可以返回第一章去看一下。
4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator
StreamSource是用來開啟整個流的算子,而承接輸入數(shù)據(jù)并進行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。?
?
整個StreamOperator的繼承關系如上圖所示(圖很大,建議點開放大看)。?
OneInputStreamOperator這個接口的邏輯很簡單:
?
而實現(xiàn)了這個接口的StreamFlatMap算子也很簡單,沒什么可說的:
public class StreamFlatMap<IN, OUT>extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;private transient TimestampedCollector<OUT> collector;public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {super(flatMapper);chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);userFunction.flatMap(element.getValue(), collector);} }?
從類圖里可以看到,flink為我們封裝了一個算子的基類AbstractUdfStreamOperator,提供了一些通用功能,比如把context賦給算子,保存快照等等,其中最為大家了解的應該是這兩個:
@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void close() throws Exception {super.close();functionsClosed = true;FunctionUtils.closeFunction(userFunction);}?
這兩個就是flink提供的Rich***Function系列算子的open和close方法被執(zhí)行的地方。
4.3 StreamSink
StreamSink著實沒什么可說的,邏輯很簡單,值得一提的只有兩個方法:
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {sinkContext.element = element;userFunction.invoke(element.getValue(), sinkContext);}@Overrideprotected void reportOrForwardLatencyMarker(LatencyMarker maker) {// all operators are tracking latenciesthis.latencyGauge.reportLatency(maker, true);// sinks don't forward latency markers}?
其中,processElement?是繼承自StreamOperator的方法。reportOrForwardLatencyMarker是用來計算延遲的,前面提到StreamSource會產生LateMarker,用于記錄數(shù)據(jù)計算時間,就是在這里完成了計算。
算子這部分邏輯相對簡單清晰,就講這么多吧。
5. 為執(zhí)行保駕護航——Fault Tolerant與保證Exactly-Once語義
5.1 Fault Tolerant演進之路
對于7×24小時不間斷運行的流程序來說,要保證fault tolerant是很難的,這不像是離線任務,如果失敗了只需要清空已有結果,重新跑一次就可以了。對于流任務,如果要保證能夠重新處理已處理過的數(shù)據(jù),就要把數(shù)據(jù)保存下來;而這就面臨著幾個問題:比如一是保存多久的數(shù)據(jù)?二是重復計算的數(shù)據(jù)應該怎么處理,怎么保證冪等性??
對于一個流系統(tǒng),我們有以下希望:
5.1.1 Storm的Record acknowledgement模式
storm的fault tolerant是這樣工作的:每一個被storm的operator處理的數(shù)據(jù)都會向其上一個operator發(fā)送一份應答消息,通知其已被下游處理。storm的源operator保存了所有已發(fā)送的消息的每一個下游算子的應答消息,當它收到來自sink的應答時,它就知道該消息已經(jīng)被完整處理,可以移除了。?
如果沒有收到應答,storm就會重發(fā)該消息。顯而易見,這是一種at least once的邏輯。另外,這種方式面臨著嚴重的冪等性問題,例如對一個count算子,如果count的下游算子出錯,source重發(fā)該消息,那么防止該消息被count兩遍的邏輯需要程序員自己去實現(xiàn)。最后,這樣一種處理方式非常低效,吞吐量很低。
5.1.2 Spark streaming的micro batch模式
前面提到,storm的實現(xiàn)方式就注定了與高吞吐量無緣。那么,為了提高吞吐量,把一批數(shù)據(jù)聚集在一起處理就是很自然的選擇。Spark Streaming的實現(xiàn)就是基于這樣的思路:?
我們可以在完全的連續(xù)計算與完全的分批計算中間取折中,通過控制每批計算數(shù)據(jù)的大小來控制延遲與吞吐量的制約,如果想要低延遲,就用小一點的batch,如果想要大吞吐量,就不得不忍受更高的延遲(更久的等待數(shù)據(jù)到來的時間和更多的計算),如下圖所示。?
?
以這樣的方式,可以在每個batch中做到exactly-once,但是這種方式也有其弊端:?
首先,batch的方式使得一些需要跨batch的操作變得非常困難,例如session window;用戶不得不自己想辦法去實現(xiàn)相關邏輯。?
其次,batch模式很難做好背壓。當一個batch因為種種原因處理慢了,那么下一個batch要么不得不容納更多的新來數(shù)據(jù),要么不得不堆積更多的batch,整個任務可能會被拖垮,這是一個非常致命的問題。?
最后,batch的方式基本意味著其延遲是有比較高的下限的,實時性上不好。
5.1.3 Google Cloud Dataflow的事務式模型
我們在傳統(tǒng)數(shù)據(jù)庫,如mysql中使用binlog來完成事務,這樣的思路也可以被用在實現(xiàn)exactly-once模型中。例如,我們可以log下每個數(shù)據(jù)元素每一次被處理時的結果和當時所處的操作符的狀態(tài)。這樣,當我們需要fault tolerant時,我們只需要讀一下log就可以了。這種模式規(guī)避了storm和spark所面臨的問題,并且能夠很好的實現(xiàn)exactly-once,唯一的弊端是:如何盡可能的減少log的成本?Flink給了我們答案。
5.1.4 Flink的分布式快照機制
實現(xiàn)exactly-once的關鍵是什么?是能夠準確的知道和快速記錄下來當前的operator的狀態(tài)、當前正在處理的元素(以及正處在不同算子之間傳遞的元素)。如果上面這些可以做到,那么fault tolerant無非就是從持久化存儲中讀取上次記錄的這些元信息,并且恢復到程序中。那么Flink是如何實現(xiàn)的呢?
Flink的分布式快照的核心是其輕量級異步分布式快照機制。為了實現(xiàn)這一機制,flink引入了一個概念,叫做Barrier。Barrier是一種標記,它被source產生并且插入到流數(shù)據(jù)中,被發(fā)送到下游節(jié)點。當下游節(jié)點處理到該barrier標志時,這就意味著在該barrier插入到流數(shù)據(jù)時,已經(jīng)進入系統(tǒng)的數(shù)據(jù)在當前節(jié)點已經(jīng)被處理完畢。?
如圖所示,每當一個barrier流過一個算子節(jié)點時,就說明了在該算子上,可以觸發(fā)一次檢查點,用以保存當前節(jié)點的狀態(tài)和已經(jīng)處理過的數(shù)據(jù),這就是一份快照。(在這里可以聯(lián)想一下micro-batch,把barrier想象成分割每個batch的邏輯,會好理解一點)這樣的方式下,記錄快照就像和前面提到的micro-batch一樣容易。
與此同時,該算子會向下游發(fā)送該barrier。因為數(shù)據(jù)在算子之間是按順序發(fā)送的,所以當下游節(jié)點收到該barrier時,也就意味著同樣的一批數(shù)據(jù)在下游節(jié)點上也處理完畢,可以進行一次checkpoint,保存基于該節(jié)點的一份快照,快照完成后,會通知JobMananger自己完成了這個快照。這就是分布式快照的基本含義。
再看這張圖:?
?
有時,有的算子的上游節(jié)點和下游節(jié)點都不止一個,應該怎么處理呢?如果有不止一個下游節(jié)點,就向每個下游發(fā)送barrier。同理,如果有不止一個上游節(jié)點,那么就要等到所有上游節(jié)點的同一批次的barrier到達之后,才能觸發(fā)checkpoint。因為每個節(jié)點運算速度不同,所以有的上游節(jié)點可能已經(jīng)在發(fā)下個barrier周期的數(shù)據(jù)了,有的上游節(jié)點還沒發(fā)送本次的barrier,這時候,當前算子就要緩存一下提前到來的數(shù)據(jù),等比較慢的上游節(jié)點發(fā)送barrier之后,才能處理下一批數(shù)據(jù)。
當整個程序的最后一個算子sink都收到了這個barrier,也就意味著這個barrier和上個barrier之間所夾雜的這批元素已經(jīng)全部落袋為安。這時,最后一個算子通知JobManager整個流程已經(jīng)完成,而JobManager隨后發(fā)出通知,要求所有算子刪除本次快照內容,以完成清理。這整個部分,就是Flink的兩階段提交的checkpoint過程,如下面四幅圖所示:?
總之,通過這種方式,flink實現(xiàn)了我們前面提到的六項對流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復機制。
最后,貼一個美團做的flink與storm的性能對比:flink與storm的性能對比
5.2 checkpoint的生命周期
接下來,我們結合源碼來看看flink的checkpoint到底是如何實現(xiàn)其生命周期的:
由于flink提供的SocketSource并不支持checkpoint,所以這里我以FlinkKafkaConsumer010作為sourceFunction。
5.2.1 觸發(fā)checkpoint
要完成一次checkpoint,第一步必然是發(fā)起checkpoint請求。那么,這個請求是哪里發(fā)出的,怎么發(fā)出的,又由誰控制呢??
還記得如果我們要設置checkpoint的話,需要指定checkpoint間隔吧?既然是一個指定間隔觸發(fā)的功能,那應該會有類似于Scheduler的東西存在,flink里,這個負責觸發(fā)checkpoint的類是CheckpointCoordinator。
flink在提交job時,會啟動這個類的startCheckpointScheduler方法,如下所示
public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS);}}private final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint.", e);}}}?
啟動之后,就會以設定好的頻率調用triggerCheckPoint()方法。這個方法太長,我大概說一下都做了什么:
- 檢查符合觸發(fā)checkpoint的條件,例如如果禁止了周期性的checkpoint,尚未達到觸發(fā)checkpoint的最小間隔等等,就直接return
- 檢查是否所有需要checkpoint和需要響應checkpoint的ACK(ack涉及到checkpoint的兩階段提交,后面會講)的task都處于running狀態(tài),否則return
- 如果都符合,那么執(zhí)行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個新的id,然后生成一個PendingCheckpoint。PendingCheckpoint是一個啟動了的checkpoint,但是還沒有被確認。等到所有的task都確認了本次checkpoint,那么這個checkpoint對象將轉化為一個CompletedCheckpoint。
- 定義一個超時callback,如果checkpoint執(zhí)行了很久還沒完成,就把它取消
- 觸發(fā)MasterHooks,用戶可以定義一些額外的操作,用以增強checkpoint的功能(如準備和清理外部資源)
- 接下來是核心邏輯:
?
這里是調用了Execution的triggerCheckpoint方法,一個execution就是一個executionVertex的實際執(zhí)行者。我們看一下這個方法:
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {final LogicalSlot slot = assignedResource;if (slot != null) {//TaskManagerGateway是用來跟taskManager進行通信的組件final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}}?
再往下跟就進入了Task類的范疇,我們將在下一小節(jié)進行解讀。本小節(jié)主要講了CheckpointCoordinator類是如何觸發(fā)一次checkpoint,從其名字也可以看出來其功能:檢查點協(xié)調器。
5.2.2 Task層面checkpoint的準備工作
先說Task類中的部分,該類創(chuàng)建了一個CheckpointMetaData的對象,并且生成了一個Runable匿名類用于執(zhí)行checkpoint,然后以異步的方式觸發(fā)了該Runable:
public void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {......Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}}......}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}}?
上面代碼里的invokable事實上就是我們的StreamTask了。Task類實際上是將checkpoint委托給了更具體的類去執(zhí)行,而StreamTask也將委托給更具體的類,直到業(yè)務代碼。?
StreamTask是這樣實現(xiàn)的:
- 如果task還在運行,那就可以進行checkpoint。方法是先向下游所有出口廣播一個Barrier,然后觸發(fā)本task的State保存。
- 如果task結束了,那我們就要通知下游取消本次checkpoint,方法是發(fā)送一個CancelCheckpointMarker,這是類似于Barrier的另一種消息。
- 注意,從這里開始,整個執(zhí)行鏈路上開始出現(xiàn)Barrier,可以和前面講Fault Tolerant原理的地方結合看一下。
?
完成broadcastCheckpointBarrier方法后,在checkpointState()方法中,StreamTask還做了很多別的工作:
public void executeCheckpointing() throws Exception {......try {//這里,就是調用StreamOperator進行snapshotState的入口方法for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);//這里注冊了一個Runnable,在執(zhí)行完checkpoint之后向JobManager發(fā)出CompletedCheckPoint消息,這也是fault tolerant兩階段提交的一部分owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);......} }?
說到checkpoint,我們印象里最直觀的感受肯定是我們的一些做聚合的操作符的狀態(tài)保存,比如sum的和以及count的值等等。這些內容就是StreamOperator部分將要觸發(fā)保存的內容。可以看到,除了我們直觀的這些操作符的狀態(tài)保存外,flink的checkpoint做了大量的其他工作。
接下來,我們就把目光轉向操作符的checkpoint機制。
5.2.3 操作符的狀態(tài)保存及barrier傳遞
第四章時,我們已經(jīng)了解了StreamOperator的類關系,這里,我們就直接接著上一節(jié)的checkpointStreamOperator(op)方法往下講。?
順便,前面也提到了,在進行checkpoint之前,operator初始化時,會執(zhí)行一個initializeState方法,在該方法中,如果task是從失敗中恢復的話,其保存的state也會被restore進來。
傳遞barrier是在進行本operator的statesnapshot之前完成的,我們先來看看其邏輯,其實和傳遞一條數(shù)據(jù)是類似的,就是生成一個CheckpointBarrier對象,然后向每個streamOutput寫進去:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {try {CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);for (RecordWriterOutput<?> streamOutput : streamOutputs) {streamOutput.broadcastEvent(barrier);}}catch (InterruptedException e) {throw new IOException("Interrupted while broadcasting checkpoint barrier");}}?
下游的operator接收到本barrier,就會觸發(fā)其自身的checkpoint。
StreamTask在執(zhí)行完broadcastCheckpointBarrier之后,?
我們當前的wordcount程序里有兩個operator chain,分別是:
- kafka source -> flatmap
- keyed aggregation -> sink
我們就按這個順序來捋一下checkpoint的過程。
1.kafka source的checkpoint過程
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}?
kafka的snapshot邏輯就是記錄一下當前消費的offsets,然后做成tuple(partitiion,offset)放進一個StateBackend里。StateBackend是flink抽象出來的一個用于保存狀態(tài)的接口。
2.FlatMap算子的checkpoint過程?
沒什么可說的,就是調用了snapshotState()方法而已。
3.本operator chain的state保存過程?
細心的同學應該注意到了,各個算子的snapshot方法只把自己的狀態(tài)保存到了StateBackend里,沒有寫入的持久化操作。這部分操作被放到了AbstractStreamOperator中,由flink統(tǒng)一負責持久化。其實不需要看源碼我們也能想出來,持久化無非就是把這些數(shù)據(jù)用一個流寫到磁盤或者別的地方,接下來我們來看看是不是這樣:
?
那么這個operatorStateBackend是怎么保存狀態(tài)的呢?- 首先把各個算子的state做了一份深拷貝;
- 然后以異步的方式執(zhí)行了一個內部類的runnable,該內部類的run方法實現(xiàn)了一個模版方法,首先打開stream,然后寫入數(shù)據(jù),然后再關閉stream。
我們來看看這個寫入數(shù)據(jù)的方法:
public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {long asyncStartTime = System.currentTimeMillis();CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;// get the registered operator state infos ...List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =new ArrayList<>(registeredOperatorStatesDeepCopies.size());for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov = new DataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);backendSerializationProxy.write(dov);......}?
注釋寫的很清楚,我就不多說了。
4.后繼operatorChain的checkpoint過程?
前面說到,在flink的流中,barrier流過時會觸發(fā)checkpoint。在上面第1步中,上游節(jié)點已經(jīng)發(fā)出了Barrier,所以在我們的keyed aggregation -> sink 這個operatorchain中,我們將首先捕獲這個barrier。
捕獲barrier的過程其實就是處理input數(shù)據(jù)的過程,對應著StreamInputProcessor.processInput()方法,該方法我們在第四章已經(jīng)講過,這里我們簡單回顧一下:
//每個元素都會觸發(fā)這一段邏輯,如果下一個數(shù)據(jù)是buffer,則從外圍的while循環(huán)里進入處理用戶數(shù)據(jù)的邏輯;這個方法里默默的處理了barrier的邏輯final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}?
處理barrier的過程在這段代碼里沒有體現(xiàn),因為被包含在了getNextNonBlocked()方法中,我們看下這個方法的核心邏輯:
//BarrierBuffer.getNextNonBlocked方法else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {if (!endOfStream) {// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}}else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());}?
先提一嘴,大家還記得之前的部分也提到過CheckpointMarker吧,這里正好也對上了。
處理barrier也是個麻煩事,大家回想一下5.1節(jié)提到的屏障的原理圖,一個opertor必須收到從每個inputchannel發(fā)過來的同一序號的barrier之后才能發(fā)起本節(jié)點的checkpoint,如果有的channel的數(shù)據(jù)處理的快了,那該barrier后的數(shù)據(jù)還需要緩存起來,如果有的inputchannel被關閉了,那它就不會再發(fā)送barrier過來了:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {final long barrierId = receivedBarrier.getId();// fast path for single channel casesif (totalNumberOfInputChannels == 1) {if (barrierId > currentCheckpointId) {// new checkpointcurrentCheckpointId = barrierId;notifyCheckpoint(receivedBarrier);}return;}// -- general code path for multiple input channels --if (numBarriersReceived > 0) {// this is only true if some alignment is already progress and was not canceledif (barrierId == currentCheckpointId) {// regular caseonBarrier(channelIndex);}else if (barrierId > currentCheckpointId) {// we did not complete the current checkpoint, another started beforeLOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +"Skipping current checkpoint.", barrierId, currentCheckpointId);// let the task know we are not completing thisnotifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));// abort the current checkpointreleaseBlocksAndResetBarriers();// begin a the new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// ignore trailing barrier from an earlier checkpoint (obsolete now)return;}}else if (barrierId > currentCheckpointId) {// first barrier of a new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// either the current checkpoint was canceled (numBarriers == 0) or// this barrier is from an old subsumed checkpointreturn;}// check if we have all barriers - since canceled checkpoints always have zero barriers// this can only happen on a non canceled checkpointif (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {// actually trigger checkpointif (LOG.isDebugEnabled()) {LOG.debug("Received all barriers, triggering checkpoint {} at {}",receivedBarrier.getId(), receivedBarrier.getTimestamp());}releaseBlocksAndResetBarriers();notifyCheckpoint(receivedBarrier);}}?
總之,當收到全部的barrier之后,就會觸發(fā)notifyCheckpoint(),該方法又會調用StreamTask的triggerCheckpoint,和之前的operator是一樣的。
如果還有后續(xù)的operator的話,就是完全相同的循環(huán),不再贅述。
5.報告完成checkpoint事件?
當一個operator保存完checkpoint數(shù)據(jù)后,就會啟動一個異步對象AsyncCheckpointRunnable,用以報告該檢查點已完成,其具體邏輯在reportCompletedSnapshotStates中。這個方法把任務又最終委托給了RpcCheckpointResponder這個類:
?
從這個類也可以看出來,它的邏輯是通過rpc的方式遠程調JobManager的相關方法完成報告事件,底層也是通過akka實現(xiàn)的。?
那么,誰響應了這個rpc調用呢?是該任務的JobMaster。
?
JobMaster反手就是一巴掌就把任務又rpc給了CheckpointCoordinator.receiveAcknowledgeMessage()方法。
之前提到,coordinator在觸發(fā)checkpoint時,生成了一個PendingCheckpoint,保存了所有operator的id。
當PendingCheckpoint收到一個operator的完成checkpoint的消息時,它就把這個operator從未完成checkpoint的節(jié)點集合移動到已完成的集合。當所有的operator都報告完成了checkpoint時,CheckpointCoordinator會觸發(fā)completePendingCheckpoint()方法,該方法做了以下事情:
- 把pendinCgCheckpoint轉換為CompletedCheckpoint
- 把CompletedCheckpoint加入已完成的檢查點集合,并從未完成檢查點集合刪除該檢查點
- 再度向各個operator發(fā)出rpc,通知該檢查點已完成
本文里,收到這個遠程調用的就是那兩個operator chain,我們來看看其邏輯:
public void notifyCheckpointComplete(long checkpointId) throws Exception {synchronized (lock) {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}}else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());}}}?
再接下來無非就是層層通知對應的算子做出響應罷了。至此,flink的兩階段提交的checkpoint邏輯全部完成。
5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend
State是快照數(shù)據(jù)的載體,StateBackend是快照如何被保存的抽象。
State分為 KeyedState和OperatorState,從名字就可以看出來分別對應著keyedStream和其他的oeprator。從State由誰管理上,也可以區(qū)分為raw state和Managed state。Flink管理的就是Managed state,用戶自己管理的就是raw state。Managed State又分為ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState這么幾種,看名字知用途。
StateBackend目前提供了三個backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。
State接口、StateBackend接口及其實現(xiàn)都比較簡單,代碼就不貼了, 尤其State本質上就是一層容器封裝。
貼個別人寫的狀態(tài)管理的文章吧:詳解Flink中的狀態(tài)管理
6.數(shù)據(jù)流轉——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程
本章打算講一下flink底層是如何定義和在操作符之間傳遞數(shù)據(jù)的。
6.1 flink的數(shù)據(jù)抽象
6.1.1 MemorySegment
Flink作為一個高效的流框架,為了避免JVM的固有缺陷(java對象存儲密度低,FGC影響吞吐和響應等),必然走上自主管理內存的道路。
這個MemorySegment就是Flink的內存抽象。默認情況下,一個MemorySegment可以被看做是一個32kb大的內存塊的抽象。這塊內存既可以是JVM里的一個byte[],也可以是堆外內存(DirectByteBuffer)。
如果說byte[]數(shù)組和direct memory是最底層的存儲,那么memorysegment就是在其上覆蓋的一層統(tǒng)一抽象。它定義了一系列抽象方法,用于控制和底層內存的交互,如:
public abstract class MemorySegment {public abstract byte get(int index);public abstract void put(int index, byte b);public int size() ;public abstract ByteBuffer wrap(int offset, int length);...... }?
我們可以看到,它在提供了諸多直接操作內存的方法外,還提供了一個wrap()方法,將自己包裝成一個ByteBuffer,我們待會兒講這個ByteBuffer。
Flink為MemorySegment提供了兩個實現(xiàn)類:HeapMemorySegment和HybridMemorySegment。他們的區(qū)別在于前者只能分配堆內存,而后者能用來分配堆內和堆外內存。事實上,Flink框架里,只使用了后者。這是為什么呢?
如果HybridMemorySegment只能用于分配堆外內存的話,似乎更合常理。但是在JVM的世界中,如果一個方法是一個虛方法,那么每次調用時,JVM都要花時間去確定調用的到底是哪個子類實現(xiàn)的該虛方法(方法重寫機制,不明白的去看JVM的invokeVirtual指令),也就意味著每次都要去翻方法表;而如果該方法雖然是個虛方法,但實際上整個JVM里只有一個實現(xiàn)(就是說只加載了一個子類進來),那么JVM會很聰明的把它去虛化處理,這樣就不用每次調用方法時去找方法表了,能夠大大提升性能。但是只分配堆內或者堆外內存不能滿足我們的需要,所以就出現(xiàn)了HybridMemorySegment同時可以分配兩種內存的設計。
我們可以看看HybridMemorySegment的構造代碼:
HybridMemorySegment(ByteBuffer buffer, Object owner) {super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);this.offHeapBuffer = buffer;}HybridMemorySegment(byte[] buffer, Object owner) {super(buffer, owner);this.offHeapBuffer = null;}?
其中,第一個構造函數(shù)的checkBufferAndGetAddress()方法能夠得到direct buffer的內存地址,因此可以操作堆外內存。
6.1.2 ByteBuffer與NetworkBufferPool
在MemorySegment這個抽象之上,Flink在數(shù)據(jù)從operator內的數(shù)據(jù)對象在向TaskManager上轉移,預備被發(fā)給下個節(jié)點的過程中,使用的抽象或者說內存對象是Buffer。
注意,這個Buffer是個flink接口,不是java.nio提供的那個Buffer抽象類。Flink在這一層面同時使用了這兩個同名概念,用來存儲對象,直接看代碼時到處都是各種xxxBuffer很容易混淆:
- java提供的那個Buffer抽象類在這一層主要用于構建HeapByteBuffer,這個主要是當數(shù)據(jù)從jvm里的一個對象被序列化成字節(jié)數(shù)組時用的;
- Flink的這個Buffer接口主要是一種flink層面用于傳輸數(shù)據(jù)和事件的統(tǒng)一抽象,其實現(xiàn)類是NetworkBuffer,是對MemorySegment的包裝。Flink在各個TaskManager之間傳遞數(shù)據(jù)時,使用的是這一層的抽象。
因為Buffer的底層是MemorySegment,這可能不是JVM所管理的,所以為了知道什么時候一個Buffer用完了可以回收,Flink引入了引用計數(shù)的概念,當確認這個buffer沒有人引用,就可以回收這一片MemorySegment用于別的地方了(JVM的垃圾回收為啥不用引用計數(shù)?讀者思考一下):
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {private volatile int refCnt = 1;...... }?
為了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,并且提供了唯一實現(xiàn)NetworkBufferPool,這是個工廠模式的應用。
NetworkBufferPool在每個TaskManager上只有一個,負責所有子task的內存管理。其實例化時就會嘗試獲取所有可由它管理的內存(對于堆內存來說,直接獲取所有內存并放入老年代,并令用戶對象只在新生代存活,可以極大程度的減少Full GC),我們看看其構造方法:
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {......try {this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);}catch (OutOfMemoryError err) {throw new OutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate + " - " + err.getMessage());}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));}}......long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",allocatedMb, availableMemorySegments.size(), segmentSize);}?
由于NetworkBufferPool只是個工廠,實際的內存池是LocalBufferPool。每個TaskManager都只有一個NetworkBufferPool工廠,但是上面運行的每個task都要有一個和其他task隔離的LocalBufferPool池,這從邏輯上很好理解。另外,NetworkBufferPool會計算自己所擁有的所有內存分片數(shù),在分配新的內存池時對每個內存池應該占有的內存分片數(shù)重分配,步驟是:
- 首先,從整個工廠管理的內存片中拿出所有的內存池所需要的最少Buffer數(shù)目總和
- 如果正好分配完,就結束
- 其次,把所有的剩下的沒分配的內存片,按照每個LocalBufferPool內存池的剩余想要容量大小進行按比例分配
- 剩余想要容量大小是這么個東西:如果該內存池至少需要3個buffer,最大需要10個buffer,那么它的剩余想要容量就是7
實現(xiàn)代碼如下:
private void redistributeBuffers() throws IOException {assert Thread.holdsLock(factoryLock);// All buffers, which are not among the required onesfinal int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;if (numAvailableMemorySegment == 0) {// in this case, we need to redistribute buffers so that every pool gets its minimumfor (LocalBufferPool bufferPool : allBufferPools) {bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());}return;}long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);}// no capacity to receive additional buffers?if (totalCapacity == 0) {return; // necessary to avoid div by zero when nothing to re-distribute}final int memorySegmentsToDistribute = MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity));long totalPartsUsed = 0; // of totalCapacityint numDistributedMemorySegment = 0;for (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();// shortcutif (excessMax == 0) {continue;}totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);final int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);}assert (totalPartsUsed == totalCapacity);assert (numDistributedMemorySegment == memorySegmentsToDistribute);}?
接下來說說這個LocalBufferPool內存池。?
LocalBufferPool的邏輯想想無非是增刪改查,值得說的是其fields:
?
承接NetworkBufferPool的重分配方法,我們來看看LocalBufferPool的setNumBuffers()方法,代碼很短,邏輯也相當簡單,就不展開說了:
public void setNumBuffers(int numBuffers) throws IOException {synchronized (availableMemorySegments) {checkArgument(numBuffers >= numberOfRequiredMemorySegments,"Buffer pool needs at least %s buffers, but tried to set to %s",numberOfRequiredMemorySegments, numBuffers);if (numBuffers > maxNumberOfMemorySegments) {currentPoolSize = maxNumberOfMemorySegments;} else {currentPoolSize = numBuffers;}returnExcessMemorySegments();// If there is a registered owner and we have still requested more buffers than our// size, trigger a recycle via the owner.if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);}}}?
6.1.3 RecordWriter與Record
我們接著往高層抽象走,剛剛提到了最底層內存抽象是MemorySegment,用于數(shù)據(jù)傳輸?shù)氖荁uffer,那么,承上啟下對接從Java對象轉為Buffer的中間對象是什么呢?是StreamRecord。
從StreamRecord<T>這個類名字就可以看出來,這個類就是個wrap,里面保存了原始的Java對象。另外,StreamRecord還保存了一個timestamp。
那么這個對象是怎么變成LocalBufferPool內存池里的一個大號字節(jié)數(shù)組的呢?借助了StreamWriter這個類。
我們直接來看把數(shù)據(jù)序列化交出去的方法:
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}?
先說最后一行,如果配置為flushAlways,那么會立刻把元素發(fā)送出去,但是這樣吞吐量會下降;Flink的默認設置其實也不是一個元素一個元素的發(fā)送,是單獨起了一個線程,每隔固定時間flush一次所有channel,較真起來也算是mini batch了。
再說序列化那一句:SerializationResult result = serializer.addRecord(record);。在這行代碼中,Flink把對象調用該對象所屬的序列化器序列化為字節(jié)數(shù)組。
6.2 數(shù)據(jù)流轉過程
上一節(jié)講了各層數(shù)據(jù)的抽象,這一節(jié)講講數(shù)據(jù)在各個task之間exchange的過程。
6.2.1 整體過程
看這張圖:?
6.2.2 數(shù)據(jù)跨task傳遞
本節(jié)講一下算子之間具體的數(shù)據(jù)傳輸過程。也先上一張圖:?
?
數(shù)據(jù)在task之間傳遞有如下幾步:
數(shù)據(jù)在不同機器的算子之間傳遞的步驟就是以上這些。
了解了步驟之后,再來看一下部分關鍵代碼:?
首先是把數(shù)據(jù)交給recordwriter。
?
然后recordwriter把數(shù)據(jù)發(fā)送到對應的通道。
//RecordWriter.javapublic void emit(T record) throws IOException, InterruptedException {//channelselector登場了for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {sendToTarget(record, targetChannel);}}private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {//選擇序列化器并序列化數(shù)據(jù)RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);//寫入channelresult = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}?
接下來是把數(shù)據(jù)推給底層設施(netty)的過程:
//ResultPartition.java@Overridepublic void flushAll() {for (ResultSubpartition subpartition : subpartitions) {subpartition.flush();}}//PartitionRequestQueue.javavoid notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {//這里交給了netty server線程去推ctx.executor().execute(new Runnable() {@Overridepublic void run() {ctx.pipeline().fireUserEventTriggered(reader);}});}?
netty相關的部分:
//AbstractChannelHandlerContext.javapublic ChannelHandlerContext fireUserEventTriggered(final Object event) {if (event == null) {throw new NullPointerException("event");} else {final AbstractChannelHandlerContext next = this.findContextInbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeUserEventTriggered(event);} else {executor.execute(new OneTimeTask() {public void run() {next.invokeUserEventTriggered(event);}});}return this;}}?
最后真實的寫入:
//PartittionRequesetQueue.javaprivate void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {return;}// Queue an available reader for consumption. If the queue is empty,// we try trigger the actual write. Otherwise this will be handled by// the writeAndFlushNextMessageIfPossible calls.boolean triggerWrite = availableReaders.isEmpty();registerAvailableReader(reader);if (triggerWrite) {writeAndFlushNextMessageIfPossible(ctx.channel());}}private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {......next = reader.getNextBuffer();if (next == null) {if (!reader.isReleased()) {continue;}markAsReleased(reader.getReceiverId());Throwable cause = reader.getFailureCause();if (cause != null) {ErrorResponse msg = new ErrorResponse(new ProducerFailedException(cause),reader.getReceiverId());ctx.writeAndFlush(msg);}} else {// This channel was now removed from the available reader queue.// We re-add it into the queue if it is still availableif (next.moreAvailable()) {registerAvailableReader(reader);}BufferResponse msg = new BufferResponse(next.buffer(),reader.getSequenceNumber(),reader.getReceiverId(),next.buffersInBacklog());if (isEndOfPartitionEvent(next.buffer())) {reader.notifySubpartitionConsumed();reader.releaseAllResources();markAsReleased(reader.getReceiverId());}// Write and flush and wait until this is done before// trying to continue with the next buffer.channel.writeAndFlush(msg).addListener(writeListener);return;}......}?
上面這段代碼里第二個方法中調用的writeAndFlush(msg)就是真正往netty的nio通道里寫入的地方了。在這里,寫入的是一個RemoteInputChannel,對應的就是下游節(jié)點的InputGate的channels。
有寫就有讀,nio通道的另一端需要讀入buffer,代碼如下:
//CreditBasedPartitionRequestClientHandler.javaprivate void decodeMsg(Object msg) throws Throwable {final Class<?> msgClazz = msg.getClass();// ---- Buffer --------------------------------------------------------if (msgClazz == NettyMessage.BufferResponse.class) {NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);if (inputChannel == null) {bufferOrEvent.releaseBuffer();cancelRequestFor(bufferOrEvent.receiverId);return;}decodeBufferOrEvent(inputChannel, bufferOrEvent);} ......}?
插一句,Flink其實做阻塞和獲取數(shù)據(jù)的方式非常自然,利用了生產者和消費者模型,當獲取不到數(shù)據(jù)時,消費者自然阻塞;當數(shù)據(jù)被加入隊列,消費者被notify。Flink的背壓機制也是借此實現(xiàn)。
然后在這里又反序列化成StreamRecord:
//StreamElementSerializer.javapublic StreamElement deserialize(DataInputView source) throws IOException {int tag = source.readByte();if (tag == TAG_REC_WITH_TIMESTAMP) {long timestamp = source.readLong();return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);}else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {return new StreamRecord<T>(typeSerializer.deserialize(source));}else if (tag == TAG_WATERMARK) {return new Watermark(source.readLong());}else if (tag == TAG_STREAM_STATUS) {return new StreamStatus(source.readInt());}else if (tag == TAG_LATENCY_MARKER) {return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());}else {throw new IOException("Corrupt stream, found tag: " + tag);}}?
然后再次在StreamInputProcessor.processInput()循環(huán)中得到處理。
至此,數(shù)據(jù)在跨jvm的節(jié)點之間的流轉過程就講完了。
6.3 Credit漫談
在看上一部分的代碼時,有一個小細節(jié)不知道讀者有沒有注意到,我們的數(shù)據(jù)發(fā)送端的代碼叫做PartittionRequesetQueue.java,而我們的接收端卻起了一個完全不相干的名字:CreditBasedPartitionRequestClientHandler.java。為什么前面加了CreditBased的前綴呢?
6.3.1 背壓問題
在流模型中,我們期待數(shù)據(jù)是像水流一樣平滑的流過我們的引擎,但現(xiàn)實生活不會這么美好。數(shù)據(jù)的上游可能因為各種原因數(shù)據(jù)量暴增,遠遠超出了下游的瞬時處理能力(回憶一下98年大洪水),導致系統(tǒng)崩潰。?
那么框架應該怎么應對呢?和人類處理自然災害的方式類似,我們修建了三峽大壩,當洪水來臨時把大量的水囤積在大壩里;對于Flink來說,就是在數(shù)據(jù)的接收端和發(fā)送端放置了緩存池,用以緩沖數(shù)據(jù),并且設置閘門阻止數(shù)據(jù)向下流。
那么Flink又是如何處理背壓的呢?答案也是靠這些緩沖池。?
?
這張圖說明了Flink在生產和消費數(shù)據(jù)時的大致情況。ResultPartition和InputGate在輸出和輸入數(shù)據(jù)時,都要向NetworkBufferPool申請一塊MemorySegment作為緩存池。?
接下來的情況和生產者消費者很類似。當數(shù)據(jù)發(fā)送太多,下游處理不過來了,那么首先InputChannel會被填滿,然后是InputChannel能申請到的內存達到最大,于是下游停止讀取數(shù)據(jù),上游負責發(fā)送數(shù)據(jù)的nettyServer會得到響應,停止從ResultSubPartition讀取緩存,那么ResultPartition很快也將存滿數(shù)據(jù)不能被消費,從而生產數(shù)據(jù)的邏輯被阻塞在獲取新buffer上,非常自然地形成背壓的效果。
Flink自己做了個試驗用以說明這個機制的效果:?
?
我們首先設置生產者的發(fā)送速度為60%,然后下游的算子以同樣的速度處理數(shù)據(jù)。然后我們將下游算子的處理速度降低到30%,可以看到上游的生產者的數(shù)據(jù)產生曲線幾乎與消費者同步下滑。而后當我們解除限速,整個流的速度立刻提高到了100%。
6.3.2 使用Credit實現(xiàn)ATM網(wǎng)絡流控
上文已經(jīng)提到,對于流量控制,一個樸素的思路就是在長江上建三峽鏈路上建立一個攔截的dam,如下圖所示:?
?
基于Credit的流控就是這樣一種建立在信用(消費數(shù)據(jù)的能力)上的,面向每個虛鏈路(而非端到端的)流模型,如下圖所示:?
?
首先,下游會向上游發(fā)送一條credit message,用以通知其目前的信用(可聯(lián)想信用卡的可用額度),然后上游會根據(jù)這個信用消息來決定向下游發(fā)送多少數(shù)據(jù)。當上游把數(shù)據(jù)發(fā)送給下游時,它就從下游的信用卡上劃走相應的額度(credit balance):?
?
下游總共獲得的credit數(shù)目是Buf_Alloc,已經(jīng)消費的數(shù)據(jù)是Fwd_Cnt,上游發(fā)送出來的數(shù)據(jù)是Tx_Cnt,那么剩下的那部分就是Crd_Bal:?
Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )?
上面這個式子應該很好理解。
可以看到,Credit Based Flow Control的關鍵是buffer分配。這種分配可以在數(shù)據(jù)的發(fā)送端完成,也可以在接收端完成。對于下游可能有多個上游節(jié)點的情況(比如Flink),使用接收端的credit分配更加合理:?
?
上圖中,接收者可以觀察到每個上游連接的帶寬情況,而上游的節(jié)點Snd1卻不可能輕易知道發(fā)往同一個下游節(jié)點的其他Snd2的帶寬情況,從而如果在上游控制流量將會很困難,而在下游控制流量將會很方便。
因此,這就是為何Flink在接收端有一個基于Credit的Client,而不是在發(fā)送端有一個CreditServer的原因。
最后,再講一下Credit的面向虛鏈路的流設計和端到端的流設計的區(qū)別:?
?
如上圖所示,a是面向連接的流設計,b是端到端的流設計。其中,a的設計使得當下游節(jié)點3因某些情況必須緩存數(shù)據(jù)暫緩處理時,每個上游節(jié)點(1和2)都可以利用其緩存保存數(shù)據(jù);而端到端的設計b里,只有節(jié)點3的緩存才可以用于保存數(shù)據(jù)(讀者可以從如何實現(xiàn)上想想為什么)。
對流控制感興趣的讀者,可以看這篇文章:Traffic Management For High-Speed Networks。
7.其他核心概念
截至第六章,和執(zhí)行過程相關的部分就全部講完,告一段落了。第七章主要講一點雜七雜八的內容,有時間就不定期更新。
7.1 EventTime時間模型
flink有三種時間模型:ProcessingTime,EventTime和IngestionTime。?
關于時間模型看這張圖:?
?
從這張圖里可以很清楚的看到三種Time模型的區(qū)別。
- EventTime是數(shù)據(jù)被生產出來的時間,可以是比如傳感器發(fā)出信號的時間等(此時數(shù)據(jù)還沒有被傳輸給flink)。
- IngestionTime是數(shù)據(jù)進入flink的時間,也就是從Source進入flink流的時間(此時數(shù)據(jù)剛剛被傳給flink)
- ProcessingTime是針對當前算子的系統(tǒng)時間,是指該數(shù)據(jù)已經(jīng)進入某個operator時,operator所在系統(tǒng)的當前時間
例如,我在寫這段話的時間是2018年5月13日03點47分,但是我引用的這張EventTime的圖片,是2015年畫出來的,那么這張圖的EventTime是2015年,而ProcessingTime是現(xiàn)在。?
Flink官網(wǎng)對于時間戳的解釋非常詳細:點我?
Flink對于EventTime模型的實現(xiàn),依賴的是一種叫做watermark的對象。watermark是攜帶有時間戳的一個對象,會按照程序的要求被插入到數(shù)據(jù)流中,用以標志某個事件在該時間發(fā)生了。?
我再做一點簡短的說明,還是以官網(wǎng)的圖為例:?
?
對于有序到來的數(shù)據(jù),假設我們在timestamp為11的元素后加入一個watermark,時間記錄為11,則下個元素收到該watermark時,認為所有早于11的元素均已到達。這是非常理想的情況。?
?
而在現(xiàn)實生活中,經(jīng)常會遇到亂序的數(shù)據(jù)。這時,我們雖然在timestamp為7的元素后就收到了11,但是我們一直等到了收到元素12之后,才插入了watermark為11的元素。與上面的圖相比,如果我們仍然在11后就插入11的watermark,那么元素9就會被丟棄,造成數(shù)據(jù)丟失。而我們在12之后插入watermark11,就保證了9仍然會被下一個operator處理。當然,我們不可能無限制的永遠等待遲到元素,所以要在哪個元素后插入11需要根據(jù)實際場景權衡。
對于來自多個數(shù)據(jù)源的watermark,可以看這張圖:?
?
可以看到,當一個operator收到多個watermark時,它遵循最小原則(或者說最早),即算子的當前watermark是流經(jīng)該算子的最小watermark,以容許來自不同的source的亂序數(shù)據(jù)到來。?
關于事件時間模型,更多內容可以參考Stream 101?和谷歌的這篇論文:Dataflow Model paper
7.2 FLIP-6 部署及處理模型演進
就在老白寫這篇blog的時候,Flink發(fā)布了其1.5 RELEASE版本,號稱實現(xiàn)了其部署及處理模型(也就是FLIP-6),所以打算簡略地說一下FLIP-6的主要內容。
7.2.1 現(xiàn)有模型不足
1.5之前的Flink模型有很多不足,包括:
- 只能靜態(tài)分配計算資源
- 在YARN上所有的資源分配都是一碗水端平的
- 與Docker/k8s的集成非常之蠢,頗有脫褲子放屁的神韻
- JobManager沒有任務調度邏輯
- 任務在YARN上執(zhí)行結束后web dashboard就不可用
- 集群的session模式和per job模式混淆難以理解
就我個人而言,我覺得Flink有一個這里完全沒提到的不足才是最應該修改的:針對任務的完全的資源隔離。尤其是如果用Standalone集群,一個用戶的task跑掛了TaskManager,然后拖垮了整個集群的情況簡直不要太多。
7.2.2 核心變更
Single Job JobManager?
最重要的變更是一個JobManager只處理一個job。當我們生成JobGraph時就順便起一個JobManager,這顯然更加自然。
ResourceManager?
其職責包括獲取新的TM和slot,通知失敗,釋放資源以及緩存TM以用于重用等。重要的是,這個組件要能做到掛掉時不要搞垮正在運行的好好的任務。其職責和與JobManager、TaskManager的交互圖如下:?
TaskManager?
TM要與上面的兩個組件交互。與JobManager交互時,要能提供slot,要能與所有給出slot的JM交互。丟失與JM的連接時要能試圖把本TM上的slot的情況通告給新JM,如果這一步失敗,就要能重新分配slot。?
與ResourceManager交互時,要通知RM自己的資源和當前的Job分配情況,能按照RM的要求分配資源或者關閉自身。
JobManager Slot Pool?
這個pool要持有所有分配給當前job的slot資源,并且能在RM掛掉的情況下管理當前已經(jīng)持有的slot。
Dispatcher?
需要一個Job的分發(fā)器的主要原因是在有的集群環(huán)境下我們可能需要一個統(tǒng)一的提交和監(jiān)控點,以及替代之前的Standalone模式下的JobManager。將來對分發(fā)器的期望可能包括權限控制等。?
7.2.3 Cluster Manager的架構
YARN?
新的基于YARN的架構主要包括不再需要先在容器里啟動集群,然后提交任務;用戶代碼不再使用動態(tài)ClassLoader加載;不用的資源可以釋放;可以按需分配不同大小的容器等。其執(zhí)行過程如下:?
無Dispatcher時?
?
有Dispatcher時?
Mesos?
與基于YARN的模式很像,但是只有帶Dispatcher模式,因為只有這樣才能在Mesos集群里跑其RM。?
?
Mesos的Fault Tolerance是類似這樣的:?
?
必須用類似Marathon之類的技術保證Dispatcher的HA。
Standalone?
其實沒啥可說的,把以前的JobManager的職責換成現(xiàn)在的Dispatcher就行了。?
?
將來可能會實現(xiàn)一個類似于輕量級Yarn的模式。
Docker/k8s?
用戶定義好容器,至少有一個是job specific的(不然怎么啟動任務);還有用于啟動TM的,可以不是job specific的。啟動過程如下?
7.2.4 組件設計及細節(jié)
分配slot相關細節(jié)?
從新的TM取slot過程:?
從Cached TM取slot過程:?
失敗處理
TM失敗?
TM失敗時,RM要能檢測到失敗,更新自己的狀態(tài),發(fā)送消息給JM,重啟一份TM;JM要能檢測到失敗,從狀態(tài)移除失效slot,標記該TM的task為失敗,并在沒有足夠slot繼續(xù)任務時調整規(guī)模;TM自身則要能從Checkpoint恢復
RM失敗?
此時TM要能檢測到失敗,并準備向新的RM注冊自身,并且向新的RM傳遞自身的資源情況;JM要能檢測到失敗并且等待新的RM可用,重新請求需要的資源;丟失的數(shù)據(jù)要能從Container、TM等處恢復。
JM失敗?
TM釋放所有task,向新JM注冊資源,并且如果不成功,就向RM報告這些資源可用于重分配;RM坐等;JM丟失的數(shù)據(jù)從持久化存儲中獲得,已完成的checkpoints從HA恢復,從最近的checkpoint重啟task,并申請資源。
JM & RM 失敗?
TM將在一段時間內試圖把資源交給新上任的JM,如果失敗,則把資源交給新的RM
TM & RM失敗?
JM如果正在申請資源,則要等到新的RM啟動后才能獲得;JM可能需要調整其規(guī)模,因為損失了TM的slot。
8.后記
Flink是當前流處理領域的優(yōu)秀框架,其設計思想和代碼實現(xiàn)都蘊含著許多人的智慧結晶。這篇解讀花了很多時間,篇幅也寫了很長,也仍然不能能覆蓋Flink的方方面面,也肯定有很多錯誤之處,歡迎大家批評指正!Flink生態(tài)里中文資料確實不多,對Flink源碼有興趣的讀者,可以參考VinoYang的專欄,繼續(xù)學習之旅。
本文至此結束。
轉載于:https://www.cnblogs.com/davidwang456/articles/10647291.html
總結
以上是生活随笔為你收集整理的追源索骥:透过源码看懂Flink核心框架的执行流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink JAR包上传和运行逻辑
- 下一篇: Spring Cloud Stream