3atv精品不卡视频,97人人超碰国产精品最新,中文字幕av一区二区三区人妻少妇,久久久精品波多野结衣,日韩一区二区三区精品

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

追源索骥:透过源码看懂Flink核心框架的执行流程

發(fā)布時間:2025/4/5 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 追源索骥:透过源码看懂Flink核心框架的执行流程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

https://www.cnblogs.com/bethunebtj/p/9168274.html

  • 追源索驥:透過源碼看懂Flink核心框架的執(zhí)行流程
    • 前言
    • 1.從 Hello,World WordCount開始
      • 1.1 flink執(zhí)行環(huán)境
      • 1.2 算子(Operator)的注冊(聲明)
      • 1.3 程序的執(zhí)行
        • 1.3.1 本地模式下的execute方法
        • 1.3.2 遠程模式(RemoteEnvironment)的execute方法
        • 1.3.3 程序啟動過程
    • 2.理解flink的圖結構
      • 2.1 flink的三層圖結構
      • 2.2 StreamGraph的生成
        • 2.2.1 StreamTransformation類代表了流的轉換
        • 2.2.2 StreamGraph生成函數(shù)分析
        • 2.2.3 WordCount函數(shù)的StreamGraph
      • 2.3 JobGraph的生成
        • 2.3.1 JobGraph生成源碼
        • 2.3.2 operator chain的邏輯
        • 2.3.3 JobGraph的提交
      • 2.4 ExecutionGraph的生成
    • 3. 任務的調度與執(zhí)行
      • 3.1 計算資源的調度
      • 3.2 JobManager執(zhí)行job
        • 3.2.1 JobManager的組件
        • 3.2.2 JobManager的啟動過程
        • 3.2.3 JobManager啟動Task
      • 3.3 TaskManager執(zhí)行task
        • 3.3.1 TaskManager的基本組件
        • 3.3.2 TaskManager執(zhí)行Task
        • 3.3.2.1 生成Task對象
        • 3.3.2.2 運行Task對象
        • 3.3.2.3 StreamTask的執(zhí)行邏輯
      • 3.4 StreamTask與StreamOperator
    • 4. StreamOperator的抽象與實現(xiàn)
      • 4.1 數(shù)據(jù)源的邏輯——StreamSource與時間模型
      • 4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator
      • 4.3 StreamSink
    • 5. 為執(zhí)行保駕護航——Fault Tolerant與保證Exactly-Once語義
      • 5.1 Fault Tolerant演進之路
        • 5.1.1 Storm的Record acknowledgement模式
        • 5.1.2 Spark streaming的micro batch模式
        • 5.1.3 Google Cloud Dataflow的事務式模型
        • 5.1.4 Flink的分布式快照機制
      • 5.2 checkpoint的生命周期
        • 5.2.1 觸發(fā)checkpoint
        • 5.2.2 Task層面checkpoint的準備工作
        • 5.2.3 操作符的狀態(tài)保存及barrier傳遞
      • 5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend
    • 6.數(shù)據(jù)流轉——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程
      • 6.1 flink的數(shù)據(jù)抽象
        • 6.1.1 MemorySegment
        • 6.1.2 ByteBuffer與NetworkBufferPool
        • 6.1.3 RecordWriter與Record
      • 6.2 數(shù)據(jù)流轉過程
        • 6.2.1 整體過程
        • 6.2.2 數(shù)據(jù)跨task傳遞
      • 6.3 Credit漫談
        • 6.3.1 背壓問題
        • 6.3.2 使用Credit實現(xiàn)ATM網(wǎng)絡流控
    • 7.其他核心概念
      • 7.1 EventTime時間模型
      • 7.2 FLIP-6 部署及處理模型演進
        • 7.2.1 現(xiàn)有模型不足
        • 7.2.2 核心變更
        • 7.2.3 Cluster Manager的架構
        • 7.2.4 組件設計及細節(jié)
    • 8.后記

前言

Flink是大數(shù)據(jù)處理領域最近很火的一個開源的分布式、高性能的流式處理框架,其對數(shù)據(jù)的處理可以達到毫秒級別。本文以一個來自官網(wǎng)的WordCount例子為引,全面闡述flink的核心架構及執(zhí)行流程,希望讀者可以借此更加深入的理解Flink邏輯。

本文跳過了一些基本概念,如果對相關概念感到迷惑,請參考官網(wǎng)文檔。另外在本文寫作過程中,Flink正式發(fā)布了其1.5 RELEASE版本,在其發(fā)布之后完成的內容將按照1.5的實現(xiàn)來組織。

1.從?Hello,World?WordCount開始

首先,我們把WordCount的例子再放一遍:

public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {if (args.length != 2){System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");return;}String hostName = args[0];Integer port = Integer.parseInt(args[1]);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input dataDataStream<String> text = env.socketTextStream(hostName, port);text.flatMap(new LineSplitter()).setParallelism(1)// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1).setParallelism(1).print();// execute programenv.execute("Java WordCount from SocketTextStream Example");}/*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into* multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).*/public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}}

首先從命令行中獲取socket對端的ip和端口,然后啟動一個執(zhí)行環(huán)境,從socket中讀取數(shù)據(jù),split成單個單詞的流,并按單詞進行總和的計數(shù),最后打印出來。這個例子相信接觸過大數(shù)據(jù)計算或者函數(shù)式編程的人都能看懂,就不過多解釋了。

1.1 flink執(zhí)行環(huán)境

程序的啟動,從這句開始:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()。?
這行代碼會返回一個可用的執(zhí)行環(huán)境。執(zhí)行環(huán)境是整個flink程序執(zhí)行的上下文,記錄了相關配置(如并行度等),并提供了一系列方法,如讀取輸入流的方法,以及真正開始運行整個代碼的execute方法等。對于分布式流處理程序來說,我們在代碼中定義的flatMap,keyBy等等操作,事實上可以理解為一種聲明,告訴整個程序我們采用了什么樣的算子,而真正開啟計算的代碼不在此處。由于我們是在本地運行flink程序,因此這行代碼會返回一個LocalStreamEnvironment,最后我們要調用它的execute方法來開啟真正的任務。我們先接著往下看。

1.2 算子(Operator)的注冊(聲明)

我們以flatMap為例,text.flatMap(new LineSplitter())這一句話跟蹤進去是這樣的:

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),getType(), Utils.getCallLocationName(), true);return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));}

?

里面完成了兩件事,一是用反射拿到了flatMap算子的輸出類型,二是生成了一個Operator。flink流式計算的核心概念,就是將數(shù)據(jù)從輸入流一個個傳遞給Operator進行鏈式處理,最后交給輸出流的過程。對數(shù)據(jù)的每一次處理在邏輯上成為一個operator,并且為了本地化處理的效率起見,operator之間也可以串成一個chain一起處理(可以參考責任鏈模式幫助理解)。下面這張圖表明了flink是如何看待用戶的處理流程的:抽象化為一系列operator,以source開始,以sink結尾,中間的operator做的操作叫做transform,并且可以把幾個操作串在一起執(zhí)行。?
?
我們也可以更改flink的設置,要求它不要對某個操作進行chain處理,或者從某個操作開啟一個新chain等。?
上面代碼中的最后一行transform方法的作用是返回一個SingleOutputStreamOperator,它繼承了Datastream類并且定義了一些輔助方法,方便對流的操作。在返回之前,transform方法還把它注冊到了執(zhí)行環(huán)境中(后面生成執(zhí)行圖的時候還會用到它)。其他的操作,包括keyBy,sum和print,都只是不同的算子,在這里出現(xiàn)都是一樣的效果,即生成一個operator并注冊給執(zhí)行環(huán)境用于生成DAG。

1.3 程序的執(zhí)行

程序執(zhí)行即env.execute("Java WordCount from SocketTextStream Example")這行代碼。

1.3.1 本地模式下的execute方法

這行代碼主要做了以下事情:

  • 生成StreamGraph。代表程序的拓撲結構,是從用戶代碼直接生成的圖。
  • 生成JobGraph。這個圖是要交給flink去生成task的圖。
  • 生成一系列配置
  • 將JobGraph和配置交給flink集群去運行。如果不是本地運行的話,還會把jar文件通過網(wǎng)絡發(fā)給其他節(jié)點。
  • 以本地模式運行的話,可以看到啟動過程,如啟動性能度量、web模塊、JobManager、ResourceManager、taskManager等等
  • 啟動任務。值得一提的是在啟動任務之前,先啟動了一個用戶類加載器,這個類加載器可以用來做一些在運行時動態(tài)加載類的工作。

1.3.2 遠程模式(RemoteEnvironment)的execute方法

遠程模式的程序執(zhí)行更加有趣一點。第一步仍然是獲取StreamGraph,然后調用executeRemotely方法進行遠程執(zhí)行。?
該方法首先創(chuàng)建一個用戶代碼加載器

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());

?

然后創(chuàng)建一系列配置,交給Client對象。Client這個詞有意思,看見它就知道這里絕對是跟遠程集群打交道的客戶端。

ClusterClient client;try {client = new StandaloneClusterClient(configuration);client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());}}try {return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();}

?

client的run方法首先生成一個JobGraph,然后將其傳遞給JobClient。關于Client、JobClient、JobManager到底誰管誰,可以看這張圖:?
?
確切的說,JobClient負責以異步的方式和JobManager通信(Actor是scala的異步模塊),具體的通信任務由JobClientActor完成。相對應的,JobManager的通信任務也由一個Actor完成。

JobListeningContext jobListeningContext = submitJob(actorSystem,config,highAvailabilityServices,jobGraph,timeout,sysoutLogUpdates, classLoader);return awaitJobResult(jobListeningContext);

?

可以看到,該方法阻塞在awaitJobResult方法上,并最終返回了一個JobListeningContext,透過這個Context可以得到程序運行的狀態(tài)和結果。

1.3.3 程序啟動過程

上面提到,整個程序真正意義上開始執(zhí)行,是這里:

  • env.execute("Java WordCount from SocketTextStream Example");
  • 遠程模式和本地模式有一點不同,我們先按本地模式來調試。?
    我們跟進源碼,(在本地調試模式下)會啟動一個miniCluster,然后開始執(zhí)行代碼:

    // LocalStreamEnvironment.java@Overridepublic JobExecutionResult execute(String jobName) throws Exception {//生成各種圖結構......try {//啟動集群,包括啟動JobMaster,進行l(wèi)eader選舉等等miniCluster.start();configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());//提交任務到JobMasterreturn miniCluster.executeJobBlocking(jobGraph);}finally {transformations.clear();miniCluster.close();}}

    ?

    這個方法里有一部分邏輯是與生成圖結構相關的,我們放在第二章里講;現(xiàn)在我們先接著往里跟:

    //MiniCluster.java public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");//在這里,最終把job提交給了jobMasterfinal CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));......}

    ?

    正如我在注釋里寫的,這一段代碼核心邏輯就是調用那個submitJob方法。那么我們再接著看這個方法:

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {final DispatcherGateway dispatcherGateway;try {dispatcherGateway = getDispatcherGateway();} catch (LeaderRetrievalException | InterruptedException e) {ExceptionUtils.checkInterrupted(e);return FutureUtils.completedExceptionally(e);}// we have to allow queued scheduling in Flip-6 mode because we need to request slots// from the ResourceManagerjobGraph.setAllowQueuedScheduling(true);final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(//在這里執(zhí)行了真正的submit操作(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));return acknowledgeCompletableFuture.thenApply((Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));}

    ?

    這里的Dispatcher是一個接收job,然后指派JobMaster去啟動任務的類,我們可以看看它的類結構,有兩個實現(xiàn)。在本地環(huán)境下啟動的是MiniDispatcher,在集群上提交任務時,集群上啟動的是StandaloneDispatcher。?

    那么這個Dispatcher又做了什么呢?它啟動了一個JobManagerRunner(這里我要吐槽Flink的命名,這個東西應該叫做JobMasterRunner才對,flink里的JobMaster和JobManager不是一個東西),委托JobManagerRunner去啟動該Job的JobMaster。我們看一下對應的代碼:

    //jobManagerRunner.javaprivate void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {......final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);......}

    ?

    然后,JobMaster經(jīng)過了一堆方法嵌套之后,執(zhí)行到了這里:

    private void scheduleExecutionGraph() {checkState(jobStatusListener == null);// register self as job status change listenerjobStatusListener = new JobManagerJobStatusListener();executionGraph.registerJobStatusListener(jobStatusListener);try {//這里調用了ExecutionGraph的啟動方法executionGraph.scheduleForExecution();}catch (Throwable t) {executionGraph.failGlobal(t);}}

    ?

    我們知道,flink的框架里有三層圖結構,其中ExecutionGraph就是真正被執(zhí)行的那一層,所以到這里為止,一個任務從提交到真正執(zhí)行的流程就走完了,我們再回顧一下(順便提一下遠程提交時的流程區(qū)別):

    • 客戶端代碼的execute方法執(zhí)行;
    • 本地環(huán)境下,MiniCluster完成了大部分任務,直接把任務委派給了MiniDispatcher;
    • 遠程環(huán)境下,啟動了一個RestClusterClient,這個類會以HTTP Rest的方式把用戶代碼提交到集群上;
    • 遠程環(huán)境下,請求發(fā)到集群上之后,必然有個handler去處理,在這里是JobSubmitHandler。這個類接手了請求后,委派StandaloneDispatcher啟動job,到這里之后,本地提交和遠程提交的邏輯往后又統(tǒng)一了;
    • Dispatcher接手job之后,會實例化一個JobManagerRunner,然后用這個runner啟動job;
    • JobManagerRunner接下來把job交給了JobMaster去處理;
    • JobMaster使用ExecutionGraph的方法啟動了整個執(zhí)行圖;整個任務就啟動起來了。

    至此,第一部分就講完了。

    2.理解flink的圖結構

    第一部分講到,我們的主函數(shù)最后一項任務就是生成StreamGraph,然后生成JobGraph,然后以此開始調度任務運行,所以接下來我們從這里入手,繼續(xù)探索flink。

    2.1 flink的三層圖結構

    事實上,flink總共提供了三種圖的抽象,我們前面已經(jīng)提到了StreamGraph和JobGraph,還有一種是ExecutionGraph,是用于調度的基本數(shù)據(jù)結構。?
    ?
    上面這張圖清晰的給出了flink各個圖的工作原理和轉換過程。其中最后一個物理執(zhí)行圖并非flink的數(shù)據(jù)結構,而是程序開始執(zhí)行后,各個task分布在不同的節(jié)點上,所形成的物理上的關系表示。

    • 從JobGraph的圖里可以看到,數(shù)據(jù)從上一個operator流到下一個operator的過程中,上游作為生產者提供了IntermediateDataSet,而下游作為消費者需要JobEdge。事實上,JobEdge是一個通信管道,連接了上游生產的dataset和下游的JobVertex節(jié)點。
    • 在JobGraph轉換到ExecutionGraph的過程中,主要發(fā)生了以下轉變:?
      • 加入了并行度的概念,成為真正可調度的圖結構
      • 生成了與JobVertex對應的ExecutionJobVertex,ExecutionVertex,與IntermediateDataSet對應的IntermediateResult和IntermediateResultPartition等,并行將通過這些類實現(xiàn)
    • ExecutionGraph已經(jīng)可以用于調度任務。我們可以看到,flink根據(jù)該圖生成了一一對應的Task,每個task對應一個ExecutionGraph的一個Execution。Task用InputGate、InputChannel和ResultPartition對應了上面圖中的IntermediateResult和ExecutionEdge。

    那么,flink抽象出這三層圖結構,四層執(zhí)行邏輯的意義是什么呢??
    StreamGraph是對用戶邏輯的映射。JobGraph在此基礎上進行了一些優(yōu)化,比如把一部分操作串成chain以提高效率。ExecutionGraph是為了調度存在的,加入了并行處理的概念。而在此基礎上真正執(zhí)行的是Task及其相關結構。

    2.2 StreamGraph的生成

    在第一節(jié)的算子注冊部分,我們可以看到,flink把每一個算子transform成一個對流的轉換(比如上文中返回的SingleOutputStreamOperator是一個DataStream的子類),并且注冊到執(zhí)行環(huán)境中,用于生成StreamGraph。實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations)?其中的transformations是一個list,里面記錄的就是我們在transform方法中放進來的算子。

    2.2.1 StreamTransformation類代表了流的轉換

    StreamTransformation代表了從一個或多個DataStream生成新DataStream的操作。順便,DataStream類在內部組合了一個StreamTransformation類,實際的轉換操作均通過該類完成。?
    ?
    我們可以看到,從source到各種map,union再到sink操作全部被映射成了StreamTransformation。?
    其映射過程如下所示:?

    以MapFunction為例:

    • 首先,用戶代碼里定義的UDF會被當作其基類對待,然后交給StreamMap這個operator做進一步包裝。事實上,每一個Transformation都對應了一個StreamOperator。
    • 由于map這個操作只接受一個輸入,所以再被進一步包裝為OneInputTransformation。
    • 最后,將該transformation注冊到執(zhí)行環(huán)境中,當執(zhí)行上文提到的generate方法時,生成StreamGraph圖結構。

      另外,并不是每一個 StreamTransformation 都會轉換成runtime層中的物理操作。有一些只是邏輯概念,比如union、split/select、partition等。如下圖所示的轉換樹,在運行時會優(yōu)化成下方的操作圖。?

    2.2.2 StreamGraph生成函數(shù)分析

    我們從StreamGraphGenerator.generate()方法往下看:

    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {return new StreamGraphGenerator(env).generateInternal(transformations);}//注意,StreamGraph的生成是從sink開始的private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {for (StreamTransformation<?> transformation: transformations) {transform(transformation);}return streamGraph;}//這個方法的核心邏輯就是判斷傳入的steamOperator是哪種類型,并執(zhí)行相應的操作,詳情見下面那一大堆if-elseprivate Collection<Integer> transform(StreamTransformation<?> transform) {if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {// if the max parallelism hasn't been set, then first use the job wide max parallelism// from theExecutionConfig.int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// call at least once to trigger exceptions about MissingTypeInfotransform.getOutputType();Collection<Integer> transformedIds;//這里對操作符的類型進行判斷,并以此調用相應的處理邏輯.簡而言之,處理的核心無非是遞歸的將該節(jié)點和節(jié)點的上游節(jié)點加入圖if (transform instanceof OneInputTransformation<?, ?>) {transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);} else if (transform instanceof SourceTransformation<?>) {transformedIds = transformSource((SourceTransformation<?>) transform);} else if (transform instanceof SinkTransformation<?>) {transformedIds = transformSink((SinkTransformation<?>) transform);} else if (transform instanceof UnionTransformation<?>) {transformedIds = transformUnion((UnionTransformation<?>) transform);} else if (transform instanceof SplitTransformation<?>) {transformedIds = transformSplit((SplitTransformation<?>) transform);} else if (transform instanceof SelectTransformation<?>) {transformedIds = transformSelect((SelectTransformation<?>) transform);} else if (transform instanceof FeedbackTransformation<?>) {transformedIds = transformFeedback((FeedbackTransformation<?>) transform);} else if (transform instanceof CoFeedbackTransformation<?>) {transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);} else if (transform instanceof PartitionTransformation<?>) {transformedIds = transformPartition((PartitionTransformation<?>) transform);} else if (transform instanceof SideOutputTransformation<?>) {transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);} else {throw new IllegalStateException("Unknown transformation: " + transform);}//注意這里和函數(shù)開始時的方法相對應,在有向圖中要注意避免循環(huán)的產生// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}if (transform.getBufferTimeout() > 0) {streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());}if (transform.getUid() != null) {streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (transform.getMinResources() != null && transform.getPreferredResources() != null) {streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}return transformedIds;}

    ?

    因為map,filter等常用操作都是OneInputStreamOperator,我們就來看看transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {Collection<Integer> inputIds = transform(transform.getInput());// 在遞歸處理節(jié)點過程中,某個節(jié)點可能已經(jīng)被其他子節(jié)點先處理過了,需要跳過if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}//這里是獲取slotSharingGroup。這個group用來定義當前我們在處理的這個操作符可以跟什么操作符chain到一個slot里進行操作//因為有時候我們可能不滿意flink替我們做的chain聚合//一個slot就是一個執(zhí)行task的基本容器String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);//把該operator加入圖streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());//對于keyedStream,我們還要記錄它的keySelector方法//flink并不真正為每個keyedStream保存一個key,而是每次需要用到key的時候都使用keySelector方法進行計算//因此,我們自定義的keySelector方法需要保證冪等性//到后面介紹keyGroup的時候我們還會再次提到這一點if (transform.getStateKeySelector() != null) {TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());//為當前節(jié)點和它的依賴節(jié)點建立邊//這里可以看到之前提到的select union partition等邏輯節(jié)點被合并入edge的過程for (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}return Collections.singleton(transform.getId());}public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null,new ArrayList<String>(),null);}//addEdge的實現(xiàn),會合并一些邏輯節(jié)點private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag) {//如果輸入邊是側輸出節(jié)點,則把side的輸入邊作為本節(jié)點的輸入邊,并遞歸調用if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;if (outputTag == null) {outputTag = virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);//如果輸入邊是select,則把select的輸入邊作為本節(jié)點的輸入邊} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames = virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);//如果是partition節(jié)點} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {int virtualId = upStreamVertexID;upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;if (partitioner == null) {partitioner = virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {//正常的edge處理邏輯StreamNode upstreamNode = getStreamNode(upStreamVertexID);StreamNode downstreamNode = getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}}

    ?

    2.2.3 WordCount函數(shù)的StreamGraph

    flink提供了一個StreamGraph可視化顯示工具,在這里?
    我們可以把我們的程序的執(zhí)行計劃打印出來System.out.println(env.getExecutionPlan());?復制到這個網(wǎng)站上,點擊生成,如圖所示:?
    ?
    可以看到,我們源程序被轉化成了4個operator。?
    另外,在operator之間的連線上也顯示出了flink添加的一些邏輯流程。由于我設定了每個操作符的并行度都是1,所以在每個操作符之間都是直接FORWARD,不存在shuffle的過程。

    2.3 JobGraph的生成

    flink會根據(jù)上一步生成的StreamGraph生成JobGraph,然后將JobGraph發(fā)送到server端進行ExecutionGraph的解析。

    2.3.1 JobGraph生成源碼

    與StreamGraph類似,JobGraph的入口方法是StreamingJobGraphGenerator.createJobGraph()。我們直接來看源碼

    private JobGraph createJobGraph() {// 設置啟動模式為所有節(jié)點均在一開始就啟動jobGraph.setScheduleMode(ScheduleMode.EAGER);// 為每個節(jié)點生成hash idMap<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);// 為了保持兼容性創(chuàng)建的hashList<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());for (StreamGraphHasher hasher : legacyStreamGraphHashers) {legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));}Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();//生成jobvertex,串成chain等//這里的邏輯大致可以理解為,挨個遍歷節(jié)點,如果該節(jié)點是一個chain的頭節(jié)點,就生成一個JobVertex,如果不是頭節(jié)點,就要把自身配置并入頭節(jié)點,然后把頭節(jié)點和自己的出邊相連;對于不能chain的節(jié)點,當作只有頭節(jié)點處理即可setChaining(hashes, legacyHashes, chainedOperatorHashes);//設置輸入邊edgesetPhysicalEdges();//設置slot共享groupsetSlotSharing();//配置檢查點configureCheckpointing();// 如果有之前的緩存文件的配置的話,重新讀入for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());}// 傳遞執(zhí)行環(huán)境配置try {jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());}catch (IOException e) {throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +"This indicates that non-serializable types (like custom serializers) were registered");}return jobGraph;}

    ?

    2.3.2 operator chain的邏輯

    為了更高效地分布式執(zhí)行,Flink會盡可能地將operator的subtask鏈接(chain)在一起形成task。每個task在一個線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時提高整體的吞吐量。

    ?
    上圖中將KeyAggregation和Sink兩個operator進行了合并,因為這兩個合并后并不會改變整體的拓撲結構。但是,并不是任意兩個 operator 就能 chain 一起的,其條件還是很苛刻的:

    • 上下游的并行度一致
    • 下游節(jié)點的入度為1 (也就是說下游節(jié)點沒有來自其他節(jié)點的輸入)
    • 上下游節(jié)點都在同一個 slot group 中(下面會解釋 slot group)
    • 下游節(jié)點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)
    • 上游節(jié)點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)
    • 兩個節(jié)點間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
    • 用戶沒有禁用 chain

    flink的chain邏輯是一種很常見的設計,比如spring的interceptor也是類似的實現(xiàn)方式。通過把操作符串成一個大操作符,flink避免了把數(shù)據(jù)序列化后通過網(wǎng)絡發(fā)送給其他節(jié)點的開銷,能夠大大增強效率。

    2.3.3 JobGraph的提交

    前面已經(jīng)提到,JobGraph的提交依賴于JobClient和JobManager之間的異步通信,如圖所示:?
    ?
    在submitJobAndWait方法中,其首先會創(chuàng)建一個JobClientActor的ActorRef,然后向其發(fā)起一個SubmitJobAndWait消息,該消息將JobGraph的實例提交給JobClientActor。發(fā)起模式是ask,它表示需要一個應答消息。

    Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT())); answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

    ?

    該SubmitJobAndWait消息被JobClientActor接收后,最終通過調用tryToSubmitJob方法觸發(fā)真正的提交動作。當JobManager的actor接收到來自client端的請求后,會執(zhí)行一個submitJob方法,主要做以下事情:

    • 向BlobLibraryCacheManager注冊該Job;
    • 構建ExecutionGraph對象;
    • 對JobGraph中的每個頂點進行初始化;
    • 將DAG拓撲中從source開始排序,排序后的頂點集合附加到Exec> - utionGraph對象;
    • 獲取檢查點相關的配置,并將其設置到ExecutionGraph對象;
    • 向ExecutionGraph注冊相關的listener;
    • 執(zhí)行恢復操作或者將JobGraph信息寫入SubmittedJobGraphStore以在后續(xù)用于恢復目的;
    • 響應給客戶端JobSubmitSuccess消息;
    • 對ExecutionGraph對象進行調度執(zhí)行;

    最后,JobManger會返回消息給JobClient,通知該任務是否提交成功。

    2.4 ExecutionGraph的生成

    與StreamGraph和JobGraph不同,ExecutionGraph并不是在我們的客戶端程序生成,而是在服務端(JobManager處)生成的,順便flink只維護一個JobManager。其入口代碼是ExecutionGraphBuilder.buildGraph(...)?
    該方法長200多行,其中一大半是checkpoiont的相關邏輯,我們暫且略過,直接看核心方法executionGraph.attachJobGraph(sortedTopology)?
    因為ExecutionGraph事實上只是改動了JobGraph的每個節(jié)點,而沒有對整個拓撲結構進行變動,所以代碼里只是挨個遍歷jobVertex并進行處理:

    for (JobVertex jobVertex : topologiallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}//在這里生成ExecutionGraph的每個節(jié)點//首先是進行了一堆賦值,將任務信息交給要生成的圖節(jié)點,以及設定并行度等等//然后是創(chuàng)建本節(jié)點的IntermediateResult,根據(jù)本節(jié)點的下游節(jié)點的個數(shù)確定創(chuàng)建幾份//最后是根據(jù)設定好的并行度創(chuàng)建用于執(zhí)行task的ExecutionVertex//如果job有設定inputsplit的話,這里還要指定inputsplitsExecutionJobVertex ejv = new ExecutionJobVertex(this,jobVertex,1,rpcCallTimeout,globalModVersion,createTimestamp);//這里要處理所有的JobEdge//對每個edge,獲取對應的intermediateResult,并記錄到本節(jié)點的輸入上//最后,把每個ExecutorVertex和對應的IntermediateResult關聯(lián)起來ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}for (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();newExecJobVertices.add(ejv);}

    ?

    至此,ExecutorGraph就創(chuàng)建完成了。

    3. 任務的調度與執(zhí)行

    關于flink的任務執(zhí)行架構,官網(wǎng)的這兩張圖就是最好的說明:?
    ?
    Flink 集群啟動后,首先會啟動一個 JobManger 和多個的 TaskManager。用戶的代碼會由JobClient 提交給 JobManager,JobManager 再把來自不同用戶的任務發(fā)給 不同的TaskManager 去執(zhí)行,每個TaskManager管理著多個task,task是執(zhí)行計算的最小結構, TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager。TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸。上述除了task外的三者均為獨立的 JVM 進程。?
    要注意的是,TaskManager和job并非一一對應的關系。flink調度的最小單元是task而非TaskManager,也就是說,來自不同job的不同task可能運行于同一個TaskManager的不同線程上。?
    ?
    一個flink任務所有可能的狀態(tài)如上圖所示。圖上畫的很明白,就不再贅述了。

    3.1 計算資源的調度

    Task slot是一個TaskManager內資源分配的最小載體,代表了一個固定大小的資源子集,每個TaskManager會將其所占有的資源平分給它的slot。?
    通過調整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味著每個task運行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個task運行在同一個JVM中。?
    而在同一個JVM進程中的task,可以共享TCP連接(基于多路復用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡傳輸,也能共享一些數(shù)據(jù)結構,一定程度上減少了每個task的消耗。?
    每個slot可以接受單個task,也可以接受多個連續(xù)task組成的pipeline,如下圖所示,FlatMap函數(shù)占用一個taskslot,而key Agg函數(shù)和sink函數(shù)共用一個taskslot:?
    ?
    為了達到共用slot的目的,除了可以以chain的方式pipeline算子,我們還可以允許SlotSharingGroup,如下圖所示:?
    ?
    我們可以把不能被chain成一條的兩個操作如flatmap和key&sink放在一個TaskSlot里執(zhí)行,這樣做可以獲得以下好處:

    • 共用slot使得我們不再需要計算每個任務需要的總task數(shù)目,直接取最高算子的并行度即可
    • 對計算資源的利用率更高。例如,通常的輕量級操作map和重量級操作Aggregate不再分別需要一個線程,而是可以在同一個線程內執(zhí)行,而且對于slot有限的場景,我們可以增大每個task的并行度了。?
      接下來我們還是用官網(wǎng)的圖來說明flink是如何重用slot的:?

    • TaskManager1分配一個SharedSlot0
    • 把source task放入一個SimpleSlot0,再把該slot放入SharedSlot0
    • 把flatmap task放入一個SimpleSlot1,再把該slot放入SharedSlot0
    • 因為我們的flatmap task并行度是2,因此不能再放入SharedSlot0,所以向TaskMange21申請了一個新的SharedSlot0
    • 把第二個flatmap task放進一個新的SimpleSlot,并放進TaskManager2的SharedSlot0
    • 開始處理key&sink task,因為其并行度也是2,所以先把第一個task放進TaskManager1的SharedSlot
    • 把第二個key&sink放進TaskManager2的SharedSlot

    3.2 JobManager執(zhí)行job

    JobManager負責接收 flink 的作業(yè),調度 task,收集 job 的狀態(tài)、管理 TaskManagers。被實現(xiàn)為一個 akka actor。

    3.2.1 JobManager的組件

    • BlobServer 是一個用來管理二進制大文件的服務,比如保存用戶上傳的jar文件,該服務會將其寫到磁盤上。還有一些相關的類,如BlobCache,用于TaskManager向JobManager下載用戶的jar文件
    • InstanceManager 用來管理當前存活的TaskManager的組件,記錄了TaskManager的心跳信息等
    • CompletedCheckpointStore 用于保存已完成的checkpoint相關信息,持久化到內存中或者zookeeper上
    • MemoryArchivist 保存了已經(jīng)提交到flink的作業(yè)的相關信息,如JobGraph等

    3.2.2 JobManager的啟動過程

    先列出JobManager啟動的核心代碼

    def runJobManager(configuration: Configuration,executionMode: JobManagerMode,listeningAddress: String,listeningPort: Int): Unit = {val numberProcessors = Hardware.getNumberCPUCores()val futureExecutor = Executors.newScheduledThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-future"))val ioExecutor = Executors.newFixedThreadPool(numberProcessors,new ExecutorThreadFactory("jobmanager-io"))val timeout = AkkaUtils.getTimeout(configuration)// we have to first start the JobManager ActorSystem because this determines the port if 0// was chosen before. The method startActorSystem will update the configuration correspondingly.val jobManagerSystem = startActorSystem(configuration,listeningAddress,listeningPort)val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,ioExecutor,AddressResolution.NO_ADDRESS_RESOLUTION)val metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration))metricRegistry.startQueryService(jobManagerSystem, null)val (_, _, webMonitorOption, _) = try {startJobManagerActors(jobManagerSystem,configuration,executionMode,listeningAddress,futureExecutor,ioExecutor,highAvailabilityServices,metricRegistry,classOf[JobManager],classOf[MemoryArchivist],Option(classOf[StandaloneResourceManager]))} catch {case t: Throwable =>futureExecutor.shutdownNow()ioExecutor.shutdownNow()throw t}// block until everything is shut downjobManagerSystem.awaitTermination()....... }

    ?

    • 配置Akka并生成ActorSystem,啟動JobManager
    • 啟動HA和metric相關服務
    • 在startJobManagerActors()方法中啟動JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等
    • 阻塞等待終止
    • 集群通過LeaderService等選出JobManager的leader

    3.2.3 JobManager啟動Task

    JobManager 是一個Actor,通過各種消息來完成核心邏輯:

    override def handleMessage: Receive = {case GrantLeadership(newLeaderSessionID) =>log.info(s"JobManager $getAddress was granted leadership with leader session ID " +s"$newLeaderSessionID.")leaderSessionID = newLeaderSessionID.......

    ?

    有幾個比較重要的消息:

    • GrantLeadership 獲得leader授權,將自身被分發(fā)到的 session id 寫到 zookeeper,并恢復所有的 jobs
    • RevokeLeadership 剝奪leader授權,打斷清空所有的 job 信息,但是保留作業(yè)緩存,注銷所有的 TaskManagers
    • RegisterTaskManagers 注冊 TaskManager,如果之前已經(jīng)注冊過,則只給對應的 Instance 發(fā)送消息,否則啟動注冊邏輯:在 InstanceManager 中注冊該 Instance 的信息,并停止 Instance BlobLibraryCacheManager 的端口【供下載 lib 包用】,同時使用 watch 監(jiān)聽 task manager 的存活
    • SubmitJob 提交 jobGraph?
      最后一項SubmintJob就是我們要關注的,從客戶端收到JobGraph,轉換為ExecutionGraph并執(zhí)行的過程。
    private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {......executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph,jobGraph,flinkConfiguration,futureExecutor,ioExecutor,scheduler,userCodeLoader,checkpointRecoveryFactory,Time.of(timeout.length, timeout.unit),restartStrategy,jobMetrics,numSlots,blobServer,log.logger)......if (leaderElectionService.hasLeadership) {log.info(s"Scheduling job $jobId ($jobName).")executionGraph.scheduleForExecution()} else {self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +"this. I am not scheduling the job for execution.")...... }

    ?

    首先做一些準備工作,然后獲取一個ExecutionGraph,判斷是否是恢復的job,然后將job保存下來,并且通知客戶端本地已經(jīng)提交成功了,最后如果確認本JobManager是leader,則執(zhí)行executionGraph.scheduleForExecution()方法,這個方法經(jīng)過一系列調用,把每個ExecutionVertex傳遞給了Excution類的deploy方法:

    public void deploy() throws JobException {......try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) {throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}// race double check, did we fail/cancel and do we need to release the slot?if (this.state != DEPLOYING) {slot.releaseSlot();return;}if (LOG.isInfoEnabled()) {LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),attemptNumber, getAssignedResourceLocation().getHostname()));}final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId,slot,taskState,attemptNumber);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);......}catch (Throwable t) {markFailed(t);ExceptionUtils.rethrow(t);}}

    ?

    我們首先生成了一個TaskDeploymentDescriptor,然后交給了taskManagerGateway.submitTask()方法執(zhí)行。接下來的部分,就屬于TaskManager的范疇了。

    3.3 TaskManager執(zhí)行task

    3.3.1 TaskManager的基本組件

    TaskManager是flink中資源管理的基本組件,是所有執(zhí)行任務的基本容器,提供了內存管理、IO管理、通信管理等一系列功能,本節(jié)對各個模塊進行簡要介紹。?
    1. MemoryManager flink并沒有把所有內存的管理都委托給JVM,因為JVM普遍存在著存儲對象密度低、大內存時GC對系統(tǒng)影響大等問題。所以flink自己抽象了一套內存管理機制,將所有對象序列化后放在自己的MemorySegment上進行管理。MemoryManger涉及內容較多,將在后續(xù)章節(jié)進行繼續(xù)剖析。?
    2. IOManager flink通過IOManager管理磁盤IO的過程,提供了同步和異步兩種寫模式,又進一步區(qū)分了block、buffer和bulk三種讀寫方式。?
    IOManager提供了兩種方式枚舉磁盤文件,一種是直接遍歷文件夾下所有文件,另一種是計數(shù)器方式,對每個文件名以遞增順序訪問。?
    在底層,flink將文件IO抽象為FileIOChannle,封裝了底層實現(xiàn)。?
    ?
    可以看到,flink在底層實際上都是以異步的方式進行讀寫。?
    3. NetworkEnvironment 是TaskManager的網(wǎng)絡 IO 組件,包含了追蹤中間結果和數(shù)據(jù)交換的數(shù)據(jù)結構。它的構造器會統(tǒng)一將配置的內存先分配出來,抽象成 NetworkBufferPool 統(tǒng)一管理內存的申請和釋放。意思是說,在輸入和輸出數(shù)據(jù)時,不管是保留在本地內存,等待chain在一起的下個操作符進行處理,還是通過網(wǎng)絡把本操作符的計算結果發(fā)送出去,都被抽象成了NetworkBufferPool。后續(xù)我們還將對這個組件進行詳細分析。

    3.3.2 TaskManager執(zhí)行Task

    對于TM來說,執(zhí)行task就是把收到的TaskDeploymentDescriptor對象轉換成一個task并執(zhí)行的過程。TaskDeploymentDescriptor這個類保存了task執(zhí)行所必須的所有內容,例如序列化的算子,輸入的InputGate和輸出的ResultPartition的定義,該task要作為幾個subtask執(zhí)行等等。?
    按照正常邏輯思維,很容易想到TM的submitTask方法的行為:首先是確認資源,如尋找JobManager和Blob,而后建立連接,解序列化算子,收集task相關信息,接下來就是創(chuàng)建一個新的Task對象,這個task對象就是真正執(zhí)行任務的關鍵所在。

    val task = new Task(jobInformation,taskInformation,tdd.getExecutionAttemptId,tdd.getAllocationId,tdd.getSubtaskIndex,tdd.getAttemptNumber,tdd.getProducedPartitions,tdd.getInputGates,tdd.getTargetSlotNumber,tdd.getTaskStateHandles,memoryManager,ioManager,network,bcVarManager,taskManagerConnection,inputSplitProvider,checkpointResponder,blobCache,libCache,fileCache,config,taskMetricGroup,resultPartitionConsumableNotifier,partitionStateChecker,context.dispatcher)

    ?

    如果讀者是從頭開始看這篇blog,里面有很多對象應該已經(jīng)比較明確其作用了(除了那個brVarManager,這個是管理廣播變量的,廣播變量是一類會被分發(fā)到每個任務中的共享變量)。接下來的主要任務,就是把這個task啟動起來,然后報告說已經(jīng)啟動task了:

    // all good, we kick off the task, which performs its own initialization task.startTaskThread()sender ! decorateMessage(Acknowledge.get())

    ?

    3.3.2.1 生成Task對象

    在執(zhí)行new Task()方法時,第一步是把構造函數(shù)里的這些變量賦值給當前task的fields。?
    接下來是初始化ResultPartition和InputGate。這兩個類描述了task的輸出數(shù)據(jù)和輸入數(shù)據(jù)。

    for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);this.producedPartitions[counter] = new ResultPartition(taskNameWithSubtaskAndId,this,jobId,partitionId,desc.getPartitionType(),desc.getNumberOfSubpartitions(),desc.getMaxParallelism(),networkEnvironment.getResultPartitionManager(),resultPartitionConsumableNotifier,ioManager,desc.sendScheduleOrUpdateConsumersMessage()); //為每個partition初始化對應的writer writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);++counter; }// Consumed intermediate result partitions this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()]; this.inputGatesById = new HashMap<>();counter = 0;for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {SingleInputGate gate = SingleInputGate.create(taskNameWithSubtaskAndId,jobId,executionId,inputGateDeploymentDescriptor,networkEnvironment,this,metricGroup.getIOMetricGroup());inputGates[counter] = gate;inputGatesById.put(gate.getConsumedResultId(), gate);++counter; }

    ?

    最后,創(chuàng)建一個Thread對象,并把自己放進該對象,這樣在執(zhí)行時,自己就有了自身的線程的引用。

    3.3.2.2 運行Task對象

    Task對象本身就是一個Runable,因此在其run方法里定義了運行邏輯。?
    第一步是切換Task的狀態(tài):

    while (true) {ExecutionState current = this.executionState;如果當前的執(zhí)行狀態(tài)為CREATED,則將其設置為DEPLOYING狀態(tài)if (current == ExecutionState.CREATED) {if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {// success, we can start our workbreak;}}//如果當前執(zhí)行狀態(tài)為FAILED,則發(fā)出通知并退出run方法else if (current == ExecutionState.FAILED) {// we were immediately failed. tell the TaskManager that we reached our final statenotifyFinalState();if (metrics != null) {metrics.close();}return;}//如果當前執(zhí)行狀態(tài)為CANCELING,則將其修改為CANCELED狀態(tài),并退出runelse if (current == ExecutionState.CANCELING) {if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {// we were immediately canceled. tell the TaskManager that we reached our final statenotifyFinalState();if (metrics != null) {metrics.close();}return;}}//否則說明發(fā)生了異常else {if (metrics != null) {metrics.close();}throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');}}

    ?

    接下來,就是導入用戶類加載器并加載用戶代碼。?
    然后,是向網(wǎng)絡管理器注冊當前任務(flink的各個算子在運行時進行數(shù)據(jù)交換需要依賴網(wǎng)絡管理器),分配一些緩存以保存數(shù)據(jù)?
    然后,讀入指定的緩存文件。?
    然后,再把task創(chuàng)建時傳入的那一大堆變量用于創(chuàng)建一個執(zhí)行環(huán)境Envrionment。?
    再然后,對于那些并不是第一次執(zhí)行的task(比如失敗后重啟的)要恢復其狀態(tài)。?
    接下來最重要的是

  • invokable.invoke();
  • 方法。為什么這么說呢,因為這個方法就是用戶代碼所真正被執(zhí)行的入口。比如我們寫的什么new MapFunction()的邏輯,最終就是在這里被執(zhí)行的。這里說一下這個invokable,這是一個抽象類,提供了可以被TaskManager執(zhí)行的對象的基本抽象。?
    這個invokable是在解析JobGraph的時候生成相關信息的,并在此處形成真正可執(zhí)行的對象

    // now load the task's invokable code //通過反射生成對象 invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

    ?

    ?
    上圖顯示了flink提供的可被執(zhí)行的Task類型。從名字上就可以看出各個task的作用,在此不再贅述。?
    接下來就是invoke方法了,因為我們的wordcount例子用了流式api,在此我們以StreamTask的invoke方法為例進行說明。

    3.3.2.3 StreamTask的執(zhí)行邏輯

    先上部分核心代碼:

    public final void invoke() throws Exception {boolean disposed = false;try {// -------- Initialize ---------//先做一些賦值操作......// if the clock is not already set, then assign a default TimeServiceProvider//處理timerif (timerService == null) {ThreadFactory timerThreadFactory =new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);}//把之前JobGraph串起來的chain的信息形成實現(xiàn)operatorChain = new OperatorChain<>(this);headOperator = operatorChain.getHeadOperator();// task specific initialization//這個init操作的起名非常詭異,因為這里主要是處理算子采用了自定義的checkpoint檢查機制的情況,但是起了一個非常大眾臉的名字init();// save the work of reloading state, etc, if the task is already canceledif (canceled) {throw new CancelTaskException();}// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are openedsynchronized (lock) {// both the following operations are protected by the lock// so that we avoid race conditions in the case that initializeState()// registers a timer, that fires before the open() is called.//初始化操作符狀態(tài),主要是一些state啥的initializeState();//對于富操作符,執(zhí)行其open操作openAllOperators();}// final check to exit early before starting to runf (canceled) {throw new CancelTaskException();}// let the task do its work//真正開始執(zhí)行的代碼isRunning = true;run();

    ?

    StreamTask.invoke()方法里,第一個值得一說的是TimerService。Flink在2015年決定向StreamTask類加入timer service的時候解釋到:

    This integrates the timer as a service in StreamTask that StreamOperators can use by calling a method on the StreamingRuntimeContext. This also ensures that the timer callbacks can not be called concurrently with other methods on the StreamOperator. This behaviour is ensured by an ITCase.

    第二個要注意的是chain操作。前面提到了,flink會出于優(yōu)化的角度,把一些算子chain成一個整體的算子作為一個task來執(zhí)行。比如wordcount例子中,Source和FlatMap算子就被chain在了一起。在進行chain操作的時候,會設定頭節(jié)點,并且指定輸出的RecordWriter。

    接下來不出所料仍然是初始化,只不過初始化的對象變成了各個operator。如果是有checkpoint的,那就從state信息里恢復,不然就作為全新的算子處理。從源碼中可以看到,flink針對keyed算子和普通算子做了不同的處理。keyed算子在初始化時需要計算出一個group區(qū)間,這個區(qū)間的值在整個生命周期里都不會再變化,后面key就會根據(jù)hash的不同結果,分配到特定的group中去計算。順便提一句,flink的keyed算子保存的是對每個數(shù)據(jù)的key的計算方法,而非真實的key,用戶需要自己保證對每一行數(shù)據(jù)提供的keySelector的冪等性。至于為什么要用KeyGroup的設計,這就牽扯到擴容的范疇了,將在后面的章節(jié)進行講述。?
    對于openAllOperators()方法,就是對各種RichOperator執(zhí)行其open方法,通常可用于在執(zhí)行計算之前加載資源。?
    最后,run方法千呼萬喚始出來,該方法經(jīng)過一系列跳轉,最終調用chain上的第一個算子的run方法。在wordcount的例子中,它最終調用了SocketTextStreamFunction的run,建立socket連接并讀入文本。

    3.4 StreamTask與StreamOperator

    前面提到,Task對象在執(zhí)行過程中,把執(zhí)行的任務交給了StreamTask這個類去執(zhí)行。在我們的wordcount例子中,實際初始化的是OneInputStreamTask的對象(參考上面的類圖)。那么這個對象是如何執(zhí)行用戶的代碼的呢?

    protected void run() throws Exception {// cache processor reference on the stack, to make the code more JIT friendlyfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// all the work happens in the "processInput" method}}

    ?

    它做的,就是把任務直接交給了InputProcessor去執(zhí)行processInput方法。這是一個StreamInputProcessor的實例,該processor的任務就是處理輸入的數(shù)據(jù),包括用戶數(shù)據(jù)、watermark和checkpoint數(shù)據(jù)等。我們先來看看這個processor是如何產生的:

    public void init() throws Exception {StreamConfig configuration = getConfiguration();TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());int numberOfInputs = configuration.getNumberOfInputs();if (numberOfInputs > 0) {InputGate[] inputGates = getEnvironment().getAllInputGates();inputProcessor = new StreamInputProcessor<>(inputGates,inSerializer,this,configuration.getCheckpointMode(),getCheckpointLock(),getEnvironment().getIOManager(),getEnvironment().getTaskManagerInfo().getConfiguration(),getStreamStatusMaintainer(),this.headOperator);// make sure that stream tasks report their I/O statisticsinputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());}}

    ?

    這是OneInputStreamTask的init方法,從configs里面獲取StreamOperator信息,生成自己的inputProcessor。那么inputProcessor是如何處理數(shù)據(jù)的呢?我們接著跟進源碼:

    public boolean processInput() throws Exception {if (isFinished) {return false;}if (numRecordsIn == null) {numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();}//這個while是用來處理單個元素的(不要想當然以為是循環(huán)處理元素的)while (true) {//注意 1在下面//2.接下來,會利用這個反序列化器得到下一個數(shù)據(jù)記錄,并進行解析(是用戶數(shù)據(jù)還是watermark等等),然后進行對應的操作if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycle();currentRecordDeserializer = null;}if (result.isFullRecord()) {StreamElement recordOrMark = deserializationDelegate.getInstance();//如果元素是watermark,就準備更新當前channel的watermark值(并不是簡單賦值,因為有亂序存在),if (recordOrMark.isWatermark()) {// handle watermarkstatusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;} else if (recordOrMark.isStreamStatus()) {//如果元素是status,就進行相應處理。可以看作是一個flag,標志著當前stream接下來即將沒有元素輸入(idle),或者當前即將由空閑狀態(tài)轉為有元素狀態(tài)(active)。同時,StreamStatus還對如何處理watermark有影響。通過發(fā)送status,上游的operator可以很方便的通知下游當前的數(shù)據(jù)流的狀態(tài)。// handle stream statusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;} else if (recordOrMark.isLatencyMarker()) {//LatencyMarker是用來衡量代碼執(zhí)行時間的。在Source處創(chuàng)建,攜帶創(chuàng)建時的時間戳,流到Sink時就可以知道經(jīng)過了多長時間// handle latency markersynchronized (lock) {streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;} else {//這里就是真正的,用戶的代碼即將被執(zhí)行的地方。從章節(jié)1到這里足足用了三萬字,有點萬里長征的感覺// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized (lock) {numRecordsIn.inc();streamOperator.setKeyContextElement1(record);streamOperator.processElement(record);}return true;}}}//1.程序首先獲取下一個buffer//這一段代碼是服務于flink的FaultTorrent機制的,后面我會講到,這里只需理解到它會嘗試獲取buffer,然后賦值給當前的反序列化器final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}else {isFinished = true;if (!barrierHandler.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return false;}}}

    ?

    到此為止,以上部分就是一個flink程序啟動后,到執(zhí)行用戶代碼之前,flink框架所做的準備工作。回顧一下:

    • 啟動一個環(huán)境
    • 生成StreamGraph
    • 注冊和選舉JobManager
    • 在各節(jié)點生成TaskManager,并根據(jù)JobGraph生成對應的Task
    • 啟動各個task,準備執(zhí)行代碼

    接下來,我們挑幾個Operator看看flink是如何抽象這些算子的。

    4. StreamOperator的抽象與實現(xiàn)

    4.1 數(shù)據(jù)源的邏輯——StreamSource與時間模型

    StreamSource抽象了一個數(shù)據(jù)源,并且指定了一些如何處理數(shù)據(jù)的模式。

    public class StreamSource<OUT, SRC extends SourceFunction<OUT>>extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {......public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {run(lockingObject, streamStatusMaintainer, output);}public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector) throws Exception {final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();LatencyMarksEmitter latencyEmitter = null;if (getExecutionConfig().isLatencyTrackingEnabled()) {latencyEmitter = new LatencyMarksEmitter<>(getProcessingTimeService(),collector,getExecutionConfig().getLatencyTrackingInterval(),getOperatorConfig().getVertexID(),getRuntimeContext().getIndexOfThisSubtask());}final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);try {userFunction.run(ctx);// if we get here, then the user function either exited after being done (finite source)// or the function was canceled or stopped. For the finite source case, we should emit// a final watermark that indicates that we reached the end of event-timeif (!isCanceledOrStopped()) {ctx.emitWatermark(Watermark.MAX_WATERMARK);}} finally {// make sure that the context is closed in any casectx.close();if (latencyEmitter != null) {latencyEmitter.close();}}}......private static class LatencyMarksEmitter<OUT> {private final ScheduledFuture<?> latencyMarkTimer;public LatencyMarksEmitter(final ProcessingTimeService processingTimeService,final Output<StreamRecord<OUT>> output,long latencyTrackingInterval,final int vertexID,final int subtaskIndex) {latencyMarkTimer = processingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() {@Overridepublic void onProcessingTime(long timestamp) throws Exception {try {// ProcessingTimeService callbacks are executed under the checkpointing lockoutput.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));} catch (Throwable t) {// we catch the Throwables here so that we don't trigger the processing// timer services async exception handlerLOG.warn("Error while emitting latency marker.", t);}}},0L,latencyTrackingInterval);}public void close() {latencyMarkTimer.cancel(true);}} }

    ?

    在StreamSource生成上下文之后,接下來就是把上下文交給SourceFunction去執(zhí)行:

  • userFunction.run(ctx);
  • SourceFunction是對Function的一個抽象,就好像MapFunction,KeyByFunction一樣,用戶選擇實現(xiàn)這些函數(shù),然后flink框架就能利用這些函數(shù)進行計算,完成用戶邏輯。?
    我們的wordcount程序使用了flink提供的一個SocketTextStreamFunction。我們可以看一下它的實現(xiàn)邏輯,對source如何運行有一個基本的認識:

    public void run(SourceContext<String> ctx) throws Exception {final StringBuilder buffer = new StringBuilder();long attempt = 0;while (isRunning) {try (Socket socket = new Socket()) {currentSocket = socket;LOG.info("Connecting to server socket " + hostname + ':' + port);socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));char[] cbuf = new char[8192];int bytesRead;//核心邏輯就是一直讀inputSocket,然后交給collect方法while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {buffer.append(cbuf, 0, bytesRead);int delimPos;while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {String record = buffer.substring(0, delimPos);// truncate trailing carriage returnif (delimiter.equals("\n") && record.endsWith("\r")) {record = record.substring(0, record.length() - 1);}//讀到數(shù)據(jù)后,把數(shù)據(jù)交給collect方法,collect方法負責把數(shù)據(jù)交到合適的位置(如發(fā)布為br變量,或者交給下個operator,或者通過網(wǎng)絡發(fā)出去)ctx.collect(record);buffer.delete(0, delimPos + delimiter.length());}}}// if we dropped out of this loop due to an EOF, sleep and retryif (isRunning) {attempt++;if (maxNumRetries == -1 || attempt < maxNumRetries) {LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");Thread.sleep(delayBetweenRetries);}else {// this should probably be here, but some examples expect simple exists of the stream source// throw new EOFException("Reached end of stream and reconnects are not enabled.");break;}}}// collect trailing dataif (buffer.length() > 0) {ctx.collect(buffer.toString());}}

    ?

    整段代碼里,只有collect方法有些復雜度,后面我們在講到flink的對象機制時會結合來講,此處知道collect方法會收集結果,然后發(fā)送給接收者即可。在我們的wordcount里,這個算子的接收者就是被chain在一起的flatmap算子,不記得這個示例程序的話,可以返回第一章去看一下。

    4.2 從數(shù)據(jù)輸入到數(shù)據(jù)處理——OneInputStreamOperator & AbstractUdfStreamOperator

    StreamSource是用來開啟整個流的算子,而承接輸入數(shù)據(jù)并進行處理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。?
    ?
    整個StreamOperator的繼承關系如上圖所示(圖很大,建議點開放大看)。?
    OneInputStreamOperator這個接口的邏輯很簡單:

    public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {/*** Processes one element that arrived at this operator.* This method is guaranteed to not be called concurrently with other methods of the operator.*/void processElement(StreamRecord<IN> element) throws Exception;/*** Processes a {@link Watermark}.* This method is guaranteed to not be called concurrently with other methods of the operator.** @see org.apache.flink.streaming.api.watermark.Watermark*/void processWatermark(Watermark mark) throws Exception;void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; }

    ?

    而實現(xiàn)了這個接口的StreamFlatMap算子也很簡單,沒什么可說的:

    public class StreamFlatMap<IN, OUT>extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;private transient TimestampedCollector<OUT> collector;public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {super(flatMapper);chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);userFunction.flatMap(element.getValue(), collector);} }

    ?

    從類圖里可以看到,flink為我們封裝了一個算子的基類AbstractUdfStreamOperator,提供了一些通用功能,比如把context賦給算子,保存快照等等,其中最為大家了解的應該是這兩個:

    @Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void close() throws Exception {super.close();functionsClosed = true;FunctionUtils.closeFunction(userFunction);}

    ?

    這兩個就是flink提供的Rich***Function系列算子的open和close方法被執(zhí)行的地方。

    4.3 StreamSink

    StreamSink著實沒什么可說的,邏輯很簡單,值得一提的只有兩個方法:

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {sinkContext.element = element;userFunction.invoke(element.getValue(), sinkContext);}@Overrideprotected void reportOrForwardLatencyMarker(LatencyMarker maker) {// all operators are tracking latenciesthis.latencyGauge.reportLatency(maker, true);// sinks don't forward latency markers}

    ?

    其中,processElement?是繼承自StreamOperator的方法。reportOrForwardLatencyMarker是用來計算延遲的,前面提到StreamSource會產生LateMarker,用于記錄數(shù)據(jù)計算時間,就是在這里完成了計算。

    算子這部分邏輯相對簡單清晰,就講這么多吧。

    5. 為執(zhí)行保駕護航——Fault Tolerant與保證Exactly-Once語義

    5.1 Fault Tolerant演進之路

    對于7×24小時不間斷運行的流程序來說,要保證fault tolerant是很難的,這不像是離線任務,如果失敗了只需要清空已有結果,重新跑一次就可以了。對于流任務,如果要保證能夠重新處理已處理過的數(shù)據(jù),就要把數(shù)據(jù)保存下來;而這就面臨著幾個問題:比如一是保存多久的數(shù)據(jù)?二是重復計算的數(shù)據(jù)應該怎么處理,怎么保證冪等性??
    對于一個流系統(tǒng),我們有以下希望:

  • 最好能做到exactly-once
  • 處理延遲越低越好
  • 吞吐量越高越好
  • 計算模型應當足夠簡單易用,又具有足夠的表達力
  • 從錯誤恢復的開銷越低越好
  • 足夠的流控制能力(背壓能力)
  • 5.1.1 Storm的Record acknowledgement模式

    storm的fault tolerant是這樣工作的:每一個被storm的operator處理的數(shù)據(jù)都會向其上一個operator發(fā)送一份應答消息,通知其已被下游處理。storm的源operator保存了所有已發(fā)送的消息的每一個下游算子的應答消息,當它收到來自sink的應答時,它就知道該消息已經(jīng)被完整處理,可以移除了。?
    如果沒有收到應答,storm就會重發(fā)該消息。顯而易見,這是一種at least once的邏輯。另外,這種方式面臨著嚴重的冪等性問題,例如對一個count算子,如果count的下游算子出錯,source重發(fā)該消息,那么防止該消息被count兩遍的邏輯需要程序員自己去實現(xiàn)。最后,這樣一種處理方式非常低效,吞吐量很低。

    5.1.2 Spark streaming的micro batch模式

    前面提到,storm的實現(xiàn)方式就注定了與高吞吐量無緣。那么,為了提高吞吐量,把一批數(shù)據(jù)聚集在一起處理就是很自然的選擇。Spark Streaming的實現(xiàn)就是基于這樣的思路:?
    我們可以在完全的連續(xù)計算與完全的分批計算中間取折中,通過控制每批計算數(shù)據(jù)的大小來控制延遲與吞吐量的制約,如果想要低延遲,就用小一點的batch,如果想要大吞吐量,就不得不忍受更高的延遲(更久的等待數(shù)據(jù)到來的時間和更多的計算),如下圖所示。?
    ?
    以這樣的方式,可以在每個batch中做到exactly-once,但是這種方式也有其弊端:?
    首先,batch的方式使得一些需要跨batch的操作變得非常困難,例如session window;用戶不得不自己想辦法去實現(xiàn)相關邏輯。?
    其次,batch模式很難做好背壓。當一個batch因為種種原因處理慢了,那么下一個batch要么不得不容納更多的新來數(shù)據(jù),要么不得不堆積更多的batch,整個任務可能會被拖垮,這是一個非常致命的問題。?
    最后,batch的方式基本意味著其延遲是有比較高的下限的,實時性上不好。

    5.1.3 Google Cloud Dataflow的事務式模型

    我們在傳統(tǒng)數(shù)據(jù)庫,如mysql中使用binlog來完成事務,這樣的思路也可以被用在實現(xiàn)exactly-once模型中。例如,我們可以log下每個數(shù)據(jù)元素每一次被處理時的結果和當時所處的操作符的狀態(tài)。這樣,當我們需要fault tolerant時,我們只需要讀一下log就可以了。這種模式規(guī)避了storm和spark所面臨的問題,并且能夠很好的實現(xiàn)exactly-once,唯一的弊端是:如何盡可能的減少log的成本?Flink給了我們答案。

    5.1.4 Flink的分布式快照機制

    實現(xiàn)exactly-once的關鍵是什么?是能夠準確的知道和快速記錄下來當前的operator的狀態(tài)、當前正在處理的元素(以及正處在不同算子之間傳遞的元素)。如果上面這些可以做到,那么fault tolerant無非就是從持久化存儲中讀取上次記錄的這些元信息,并且恢復到程序中。那么Flink是如何實現(xiàn)的呢?

    Flink的分布式快照的核心是其輕量級異步分布式快照機制。為了實現(xiàn)這一機制,flink引入了一個概念,叫做Barrier。Barrier是一種標記,它被source產生并且插入到流數(shù)據(jù)中,被發(fā)送到下游節(jié)點。當下游節(jié)點處理到該barrier標志時,這就意味著在該barrier插入到流數(shù)據(jù)時,已經(jīng)進入系統(tǒng)的數(shù)據(jù)在當前節(jié)點已經(jīng)被處理完畢。?

    如圖所示,每當一個barrier流過一個算子節(jié)點時,就說明了在該算子上,可以觸發(fā)一次檢查點,用以保存當前節(jié)點的狀態(tài)和已經(jīng)處理過的數(shù)據(jù),這就是一份快照。(在這里可以聯(lián)想一下micro-batch,把barrier想象成分割每個batch的邏輯,會好理解一點)這樣的方式下,記錄快照就像和前面提到的micro-batch一樣容易。

    與此同時,該算子會向下游發(fā)送該barrier。因為數(shù)據(jù)在算子之間是按順序發(fā)送的,所以當下游節(jié)點收到該barrier時,也就意味著同樣的一批數(shù)據(jù)在下游節(jié)點上也處理完畢,可以進行一次checkpoint,保存基于該節(jié)點的一份快照,快照完成后,會通知JobMananger自己完成了這個快照。這就是分布式快照的基本含義。

    再看這張圖:?
    ?
    有時,有的算子的上游節(jié)點和下游節(jié)點都不止一個,應該怎么處理呢?如果有不止一個下游節(jié)點,就向每個下游發(fā)送barrier。同理,如果有不止一個上游節(jié)點,那么就要等到所有上游節(jié)點的同一批次的barrier到達之后,才能觸發(fā)checkpoint。因為每個節(jié)點運算速度不同,所以有的上游節(jié)點可能已經(jīng)在發(fā)下個barrier周期的數(shù)據(jù)了,有的上游節(jié)點還沒發(fā)送本次的barrier,這時候,當前算子就要緩存一下提前到來的數(shù)據(jù),等比較慢的上游節(jié)點發(fā)送barrier之后,才能處理下一批數(shù)據(jù)。

    當整個程序的最后一個算子sink都收到了這個barrier,也就意味著這個barrier和上個barrier之間所夾雜的這批元素已經(jīng)全部落袋為安。這時,最后一個算子通知JobManager整個流程已經(jīng)完成,而JobManager隨后發(fā)出通知,要求所有算子刪除本次快照內容,以完成清理。這整個部分,就是Flink的兩階段提交的checkpoint過程,如下面四幅圖所示:?

    總之,通過這種方式,flink實現(xiàn)了我們前面提到的六項對流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復機制。

    最后,貼一個美團做的flink與storm的性能對比:flink與storm的性能對比

    5.2 checkpoint的生命周期

    接下來,我們結合源碼來看看flink的checkpoint到底是如何實現(xiàn)其生命周期的:

    由于flink提供的SocketSource并不支持checkpoint,所以這里我以FlinkKafkaConsumer010作為sourceFunction。

    5.2.1 觸發(fā)checkpoint

    要完成一次checkpoint,第一步必然是發(fā)起checkpoint請求。那么,這個請求是哪里發(fā)出的,怎么發(fā)出的,又由誰控制呢??
    還記得如果我們要設置checkpoint的話,需要指定checkpoint間隔吧?既然是一個指定間隔觸發(fā)的功能,那應該會有類似于Scheduler的東西存在,flink里,這個負責觸發(fā)checkpoint的類是CheckpointCoordinator。

    flink在提交job時,會啟動這個類的startCheckpointScheduler方法,如下所示

    public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS);}}private final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint.", e);}}}

    ?

    啟動之后,就會以設定好的頻率調用triggerCheckPoint()方法。這個方法太長,我大概說一下都做了什么:

    • 檢查符合觸發(fā)checkpoint的條件,例如如果禁止了周期性的checkpoint,尚未達到觸發(fā)checkpoint的最小間隔等等,就直接return
    • 檢查是否所有需要checkpoint和需要響應checkpoint的ACK(ack涉及到checkpoint的兩階段提交,后面會講)的task都處于running狀態(tài),否則return
    • 如果都符合,那么執(zhí)行checkpointID = checkpointIdCounter.getAndIncrement();以生成一個新的id,然后生成一個PendingCheckpoint。PendingCheckpoint是一個啟動了的checkpoint,但是還沒有被確認。等到所有的task都確認了本次checkpoint,那么這個checkpoint對象將轉化為一個CompletedCheckpoint。
    • 定義一個超時callback,如果checkpoint執(zhí)行了很久還沒完成,就把它取消
    • 觸發(fā)MasterHooks,用戶可以定義一些額外的操作,用以增強checkpoint的功能(如準備和清理外部資源)
    • 接下來是核心邏輯:
    // send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}

    ?

    這里是調用了Execution的triggerCheckpoint方法,一個execution就是一個executionVertex的實際執(zhí)行者。我們看一下這個方法:

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {final LogicalSlot slot = assignedResource;if (slot != null) {//TaskManagerGateway是用來跟taskManager進行通信的組件final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}}

    ?

    再往下跟就進入了Task類的范疇,我們將在下一小節(jié)進行解讀。本小節(jié)主要講了CheckpointCoordinator類是如何觸發(fā)一次checkpoint,從其名字也可以看出來其功能:檢查點協(xié)調器。

    5.2.2 Task層面checkpoint的準備工作

    先說Task類中的部分,該類創(chuàng)建了一個CheckpointMetaData的對象,并且生成了一個Runable匿名類用于執(zhí)行checkpoint,然后以異步的方式觸發(fā)了該Runable:

    public void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {......Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}}......}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}}

    ?

    上面代碼里的invokable事實上就是我們的StreamTask了。Task類實際上是將checkpoint委托給了更具體的類去執(zhí)行,而StreamTask也將委托給更具體的類,直到業(yè)務代碼。?
    StreamTask是這樣實現(xiàn)的:

    • 如果task還在運行,那就可以進行checkpoint。方法是先向下游所有出口廣播一個Barrier,然后觸發(fā)本task的State保存。
    • 如果task結束了,那我們就要通知下游取消本次checkpoint,方法是發(fā)送一個CancelCheckpointMarker,這是類似于Barrier的另一種消息。
    • 注意,從這里開始,整個執(zhí)行鏈路上開始出現(xiàn)Barrier,可以和前面講Fault Tolerant原理的地方結合看一下。
    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {synchronized (lock) {if (isRunning) {operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {......}}}

    ?

    完成broadcastCheckpointBarrier方法后,在checkpointState()方法中,StreamTask還做了很多別的工作:

    public void executeCheckpointing() throws Exception {......try {//這里,就是調用StreamOperator進行snapshotState的入口方法for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);//這里注冊了一個Runnable,在執(zhí)行完checkpoint之后向JobManager發(fā)出CompletedCheckPoint消息,這也是fault tolerant兩階段提交的一部分owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);......} }

    ?

    說到checkpoint,我們印象里最直觀的感受肯定是我們的一些做聚合的操作符的狀態(tài)保存,比如sum的和以及count的值等等。這些內容就是StreamOperator部分將要觸發(fā)保存的內容。可以看到,除了我們直觀的這些操作符的狀態(tài)保存外,flink的checkpoint做了大量的其他工作。

    接下來,我們就把目光轉向操作符的checkpoint機制。

    5.2.3 操作符的狀態(tài)保存及barrier傳遞

    第四章時,我們已經(jīng)了解了StreamOperator的類關系,這里,我們就直接接著上一節(jié)的checkpointStreamOperator(op)方法往下講。?
    順便,前面也提到了,在進行checkpoint之前,operator初始化時,會執(zhí)行一個initializeState方法,在該方法中,如果task是從失敗中恢復的話,其保存的state也會被restore進來。

    傳遞barrier是在進行本operator的statesnapshot之前完成的,我們先來看看其邏輯,其實和傳遞一條數(shù)據(jù)是類似的,就是生成一個CheckpointBarrier對象,然后向每個streamOutput寫進去:

    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {try {CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);for (RecordWriterOutput<?> streamOutput : streamOutputs) {streamOutput.broadcastEvent(barrier);}}catch (InterruptedException e) {throw new IOException("Interrupted while broadcasting checkpoint barrier");}}

    ?

    下游的operator接收到本barrier,就會觸發(fā)其自身的checkpoint。

    StreamTask在執(zhí)行完broadcastCheckpointBarrier之后,?
    我們當前的wordcount程序里有兩個operator chain,分別是:

    • kafka source -> flatmap
    • keyed aggregation -> sink

    我們就按這個順序來捋一下checkpoint的過程。

    1.kafka source的checkpoint過程

    public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call can happen// on this function at a time: either snapshotState() or notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

    ?

    kafka的snapshot邏輯就是記錄一下當前消費的offsets,然后做成tuple(partitiion,offset)放進一個StateBackend里。StateBackend是flink抽象出來的一個用于保存狀態(tài)的接口。

    2.FlatMap算子的checkpoint過程?
    沒什么可說的,就是調用了snapshotState()方法而已。

    3.本operator chain的state保存過程?
    細心的同學應該注意到了,各個算子的snapshot方法只把自己的狀態(tài)保存到了StateBackend里,沒有寫入的持久化操作。這部分操作被放到了AbstractStreamOperator中,由flink統(tǒng)一負責持久化。其實不需要看源碼我們也能想出來,持久化無非就是把這些數(shù)據(jù)用一個流寫到磁盤或者別的地方,接下來我們來看看是不是這樣:

    //還是AbstractStreamOperator.java的snapshotState方法if (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}

    ?

    那么這個operatorStateBackend是怎么保存狀態(tài)的呢?
    • 首先把各個算子的state做了一份深拷貝;
    • 然后以異步的方式執(zhí)行了一個內部類的runnable,該內部類的run方法實現(xiàn)了一個模版方法,首先打開stream,然后寫入數(shù)據(jù),然后再關閉stream。

    我們來看看這個寫入數(shù)據(jù)的方法:

    public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {long asyncStartTime = System.currentTimeMillis();CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;// get the registered operator state infos ...List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =new ArrayList<>(registeredOperatorStatesDeepCopies.size());for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov = new DataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);backendSerializationProxy.write(dov);......}

    ?

    注釋寫的很清楚,我就不多說了。

    4.后繼operatorChain的checkpoint過程?
    前面說到,在flink的流中,barrier流過時會觸發(fā)checkpoint。在上面第1步中,上游節(jié)點已經(jīng)發(fā)出了Barrier,所以在我們的keyed aggregation -> sink 這個operatorchain中,我們將首先捕獲這個barrier。

    捕獲barrier的過程其實就是處理input數(shù)據(jù)的過程,對應著StreamInputProcessor.processInput()方法,該方法我們在第四章已經(jīng)講過,這里我們簡單回顧一下:

    //每個元素都會觸發(fā)這一段邏輯,如果下一個數(shù)據(jù)是buffer,則從外圍的while循環(huán)里進入處理用戶數(shù)據(jù)的邏輯;這個方法里默默的處理了barrier的邏輯final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}

    ?

    處理barrier的過程在這段代碼里沒有體現(xiàn),因為被包含在了getNextNonBlocked()方法中,我們看下這個方法的核心邏輯:

    //BarrierBuffer.getNextNonBlocked方法else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {if (!endOfStream) {// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}}else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());}

    ?

    先提一嘴,大家還記得之前的部分也提到過CheckpointMarker吧,這里正好也對上了。

    處理barrier也是個麻煩事,大家回想一下5.1節(jié)提到的屏障的原理圖,一個opertor必須收到從每個inputchannel發(fā)過來的同一序號的barrier之后才能發(fā)起本節(jié)點的checkpoint,如果有的channel的數(shù)據(jù)處理的快了,那該barrier后的數(shù)據(jù)還需要緩存起來,如果有的inputchannel被關閉了,那它就不會再發(fā)送barrier過來了:

    private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {final long barrierId = receivedBarrier.getId();// fast path for single channel casesif (totalNumberOfInputChannels == 1) {if (barrierId > currentCheckpointId) {// new checkpointcurrentCheckpointId = barrierId;notifyCheckpoint(receivedBarrier);}return;}// -- general code path for multiple input channels --if (numBarriersReceived > 0) {// this is only true if some alignment is already progress and was not canceledif (barrierId == currentCheckpointId) {// regular caseonBarrier(channelIndex);}else if (barrierId > currentCheckpointId) {// we did not complete the current checkpoint, another started beforeLOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +"Skipping current checkpoint.", barrierId, currentCheckpointId);// let the task know we are not completing thisnotifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));// abort the current checkpointreleaseBlocksAndResetBarriers();// begin a the new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// ignore trailing barrier from an earlier checkpoint (obsolete now)return;}}else if (barrierId > currentCheckpointId) {// first barrier of a new checkpointbeginNewAlignment(barrierId, channelIndex);}else {// either the current checkpoint was canceled (numBarriers == 0) or// this barrier is from an old subsumed checkpointreturn;}// check if we have all barriers - since canceled checkpoints always have zero barriers// this can only happen on a non canceled checkpointif (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {// actually trigger checkpointif (LOG.isDebugEnabled()) {LOG.debug("Received all barriers, triggering checkpoint {} at {}",receivedBarrier.getId(), receivedBarrier.getTimestamp());}releaseBlocksAndResetBarriers();notifyCheckpoint(receivedBarrier);}}

    ?

    總之,當收到全部的barrier之后,就會觸發(fā)notifyCheckpoint(),該方法又會調用StreamTask的triggerCheckpoint,和之前的operator是一樣的。

    如果還有后續(xù)的operator的話,就是完全相同的循環(huán),不再贅述。

    5.報告完成checkpoint事件?
    當一個operator保存完checkpoint數(shù)據(jù)后,就會啟動一個異步對象AsyncCheckpointRunnable,用以報告該檢查點已完成,其具體邏輯在reportCompletedSnapshotStates中。這個方法把任務又最終委托給了RpcCheckpointResponder這個類:

    checkpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);

    ?

    從這個類也可以看出來,它的邏輯是通過rpc的方式遠程調JobManager的相關方法完成報告事件,底層也是通過akka實現(xiàn)的。?
    那么,誰響應了這個rpc調用呢?是該任務的JobMaster。

    //JobMaster.javapublic void acknowledgeCheckpoint(final JobID jobID,final ExecutionAttemptID executionAttemptID,final long checkpointId,final CheckpointMetrics checkpointMetrics,final TaskStateSnapshot checkpointState) {final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID,executionAttemptID,checkpointId,checkpointMetrics,checkpointState);if (checkpointCoordinator != null) {getRpcService().execute(() -> {try {checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);} catch (Throwable t) {log.warn("Error while processing checkpoint acknowledgement message");}});} else {log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",jobGraph.getJobID());}}

    ?

    JobMaster反手就是一巴掌就把任務又rpc給了CheckpointCoordinator.receiveAcknowledgeMessage()方法。

    之前提到,coordinator在觸發(fā)checkpoint時,生成了一個PendingCheckpoint,保存了所有operator的id。

    當PendingCheckpoint收到一個operator的完成checkpoint的消息時,它就把這個operator從未完成checkpoint的節(jié)點集合移動到已完成的集合。當所有的operator都報告完成了checkpoint時,CheckpointCoordinator會觸發(fā)completePendingCheckpoint()方法,該方法做了以下事情:

    • 把pendinCgCheckpoint轉換為CompletedCheckpoint
    • 把CompletedCheckpoint加入已完成的檢查點集合,并從未完成檢查點集合刪除該檢查點
    • 再度向各個operator發(fā)出rpc,通知該檢查點已完成

    本文里,收到這個遠程調用的就是那兩個operator chain,我們來看看其邏輯:

    public void notifyCheckpointComplete(long checkpointId) throws Exception {synchronized (lock) {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}}else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());}}}

    ?

    再接下來無非就是層層通知對應的算子做出響應罷了。

    至此,flink的兩階段提交的checkpoint邏輯全部完成。

    5.3 承載checkpoint數(shù)據(jù)的抽象:State & StateBackend

    State是快照數(shù)據(jù)的載體,StateBackend是快照如何被保存的抽象。

    State分為 KeyedState和OperatorState,從名字就可以看出來分別對應著keyedStream和其他的oeprator。從State由誰管理上,也可以區(qū)分為raw state和Managed state。Flink管理的就是Managed state,用戶自己管理的就是raw state。Managed State又分為ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState這么幾種,看名字知用途。

    StateBackend目前提供了三個backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。

    State接口、StateBackend接口及其實現(xiàn)都比較簡單,代碼就不貼了, 尤其State本質上就是一層容器封裝。

    貼個別人寫的狀態(tài)管理的文章吧:詳解Flink中的狀態(tài)管理

    6.數(shù)據(jù)流轉——Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程

    本章打算講一下flink底層是如何定義和在操作符之間傳遞數(shù)據(jù)的。

    6.1 flink的數(shù)據(jù)抽象

    6.1.1 MemorySegment

    Flink作為一個高效的流框架,為了避免JVM的固有缺陷(java對象存儲密度低,FGC影響吞吐和響應等),必然走上自主管理內存的道路。

    這個MemorySegment就是Flink的內存抽象。默認情況下,一個MemorySegment可以被看做是一個32kb大的內存塊的抽象。這塊內存既可以是JVM里的一個byte[],也可以是堆外內存(DirectByteBuffer)。

    如果說byte[]數(shù)組和direct memory是最底層的存儲,那么memorysegment就是在其上覆蓋的一層統(tǒng)一抽象。它定義了一系列抽象方法,用于控制和底層內存的交互,如:

    public abstract class MemorySegment {public abstract byte get(int index);public abstract void put(int index, byte b);public int size() ;public abstract ByteBuffer wrap(int offset, int length);...... }

    ?

    我們可以看到,它在提供了諸多直接操作內存的方法外,還提供了一個wrap()方法,將自己包裝成一個ByteBuffer,我們待會兒講這個ByteBuffer。

    Flink為MemorySegment提供了兩個實現(xiàn)類:HeapMemorySegment和HybridMemorySegment。他們的區(qū)別在于前者只能分配堆內存,而后者能用來分配堆內和堆外內存。事實上,Flink框架里,只使用了后者。這是為什么呢?

    如果HybridMemorySegment只能用于分配堆外內存的話,似乎更合常理。但是在JVM的世界中,如果一個方法是一個虛方法,那么每次調用時,JVM都要花時間去確定調用的到底是哪個子類實現(xiàn)的該虛方法(方法重寫機制,不明白的去看JVM的invokeVirtual指令),也就意味著每次都要去翻方法表;而如果該方法雖然是個虛方法,但實際上整個JVM里只有一個實現(xiàn)(就是說只加載了一個子類進來),那么JVM會很聰明的把它去虛化處理,這樣就不用每次調用方法時去找方法表了,能夠大大提升性能。但是只分配堆內或者堆外內存不能滿足我們的需要,所以就出現(xiàn)了HybridMemorySegment同時可以分配兩種內存的設計。

    我們可以看看HybridMemorySegment的構造代碼:

    HybridMemorySegment(ByteBuffer buffer, Object owner) {super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);this.offHeapBuffer = buffer;}HybridMemorySegment(byte[] buffer, Object owner) {super(buffer, owner);this.offHeapBuffer = null;}

    ?

    其中,第一個構造函數(shù)的checkBufferAndGetAddress()方法能夠得到direct buffer的內存地址,因此可以操作堆外內存。

    6.1.2 ByteBuffer與NetworkBufferPool

    在MemorySegment這個抽象之上,Flink在數(shù)據(jù)從operator內的數(shù)據(jù)對象在向TaskManager上轉移,預備被發(fā)給下個節(jié)點的過程中,使用的抽象或者說內存對象是Buffer。

    注意,這個Buffer是個flink接口,不是java.nio提供的那個Buffer抽象類。Flink在這一層面同時使用了這兩個同名概念,用來存儲對象,直接看代碼時到處都是各種xxxBuffer很容易混淆:

    • java提供的那個Buffer抽象類在這一層主要用于構建HeapByteBuffer,這個主要是當數(shù)據(jù)從jvm里的一個對象被序列化成字節(jié)數(shù)組時用的;
    • Flink的這個Buffer接口主要是一種flink層面用于傳輸數(shù)據(jù)和事件的統(tǒng)一抽象,其實現(xiàn)類是NetworkBuffer,是對MemorySegment的包裝。Flink在各個TaskManager之間傳遞數(shù)據(jù)時,使用的是這一層的抽象。

    因為Buffer的底層是MemorySegment,這可能不是JVM所管理的,所以為了知道什么時候一個Buffer用完了可以回收,Flink引入了引用計數(shù)的概念,當確認這個buffer沒有人引用,就可以回收這一片MemorySegment用于別的地方了(JVM的垃圾回收為啥不用引用計數(shù)?讀者思考一下):

    public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {private volatile int refCnt = 1;...... }

    ?

    為了方便管理NetworkBuffer,Flink提供了BufferPoolFactory,并且提供了唯一實現(xiàn)NetworkBufferPool,這是個工廠模式的應用。

    NetworkBufferPool在每個TaskManager上只有一個,負責所有子task的內存管理。其實例化時就會嘗試獲取所有可由它管理的內存(對于堆內存來說,直接獲取所有內存并放入老年代,并令用戶對象只在新生代存活,可以極大程度的減少Full GC),我們看看其構造方法:

    public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {......try {this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);}catch (OutOfMemoryError err) {throw new OutOfMemoryError("Could not allocate buffer queue of length "+ numberOfSegmentsToAllocate + " - " + err.getMessage());}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));}}......long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",allocatedMb, availableMemorySegments.size(), segmentSize);}

    ?

    由于NetworkBufferPool只是個工廠,實際的內存池是LocalBufferPool。每個TaskManager都只有一個NetworkBufferPool工廠,但是上面運行的每個task都要有一個和其他task隔離的LocalBufferPool池,這從邏輯上很好理解。另外,NetworkBufferPool會計算自己所擁有的所有內存分片數(shù),在分配新的內存池時對每個內存池應該占有的內存分片數(shù)重分配,步驟是:

    • 首先,從整個工廠管理的內存片中拿出所有的內存池所需要的最少Buffer數(shù)目總和
    • 如果正好分配完,就結束
    • 其次,把所有的剩下的沒分配的內存片,按照每個LocalBufferPool內存池的剩余想要容量大小進行按比例分配
    • 剩余想要容量大小是這么個東西:如果該內存池至少需要3個buffer,最大需要10個buffer,那么它的剩余想要容量就是7

    實現(xiàn)代碼如下:

    private void redistributeBuffers() throws IOException {assert Thread.holdsLock(factoryLock);// All buffers, which are not among the required onesfinal int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;if (numAvailableMemorySegment == 0) {// in this case, we need to redistribute buffers so that every pool gets its minimumfor (LocalBufferPool bufferPool : allBufferPools) {bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());}return;}long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);}// no capacity to receive additional buffers?if (totalCapacity == 0) {return; // necessary to avoid div by zero when nothing to re-distribute}final int memorySegmentsToDistribute = MathUtils.checkedDownCast(Math.min(numAvailableMemorySegment, totalCapacity));long totalPartsUsed = 0; // of totalCapacityint numDistributedMemorySegment = 0;for (LocalBufferPool bufferPool : allBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments() -bufferPool.getNumberOfRequiredMemorySegments();// shortcutif (excessMax == 0) {continue;}totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);final int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);}assert (totalPartsUsed == totalCapacity);assert (numDistributedMemorySegment == memorySegmentsToDistribute);}

    ?

    接下來說說這個LocalBufferPool內存池。?
    LocalBufferPool的邏輯想想無非是增刪改查,值得說的是其fields:

    /** 該內存池需要的最少內存片數(shù)目*/private final int numberOfRequiredMemorySegments;/*** 當前已經(jīng)獲得的內存片中,還沒有寫入數(shù)據(jù)的空白內存片*/private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();/*** 注冊的所有監(jiān)控buffer可用性的監(jiān)聽器*/private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();/** 能給內存池分配的最大分片數(shù)*/private final int maxNumberOfMemorySegments;/** 當前內存池大小 */private int currentPoolSize;/*** 所有經(jīng)由NetworkBufferPool分配的,被本內存池引用到的(非直接獲得的)分片數(shù)*/private int numberOfRequestedMemorySegments;

    ?

    承接NetworkBufferPool的重分配方法,我們來看看LocalBufferPool的setNumBuffers()方法,代碼很短,邏輯也相當簡單,就不展開說了:

    public void setNumBuffers(int numBuffers) throws IOException {synchronized (availableMemorySegments) {checkArgument(numBuffers >= numberOfRequiredMemorySegments,"Buffer pool needs at least %s buffers, but tried to set to %s",numberOfRequiredMemorySegments, numBuffers);if (numBuffers > maxNumberOfMemorySegments) {currentPoolSize = maxNumberOfMemorySegments;} else {currentPoolSize = numBuffers;}returnExcessMemorySegments();// If there is a registered owner and we have still requested more buffers than our// size, trigger a recycle via the owner.if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);}}}

    ?

    6.1.3 RecordWriter與Record

    我們接著往高層抽象走,剛剛提到了最底層內存抽象是MemorySegment,用于數(shù)據(jù)傳輸?shù)氖荁uffer,那么,承上啟下對接從Java對象轉為Buffer的中間對象是什么呢?是StreamRecord。

    從StreamRecord<T>這個類名字就可以看出來,這個類就是個wrap,里面保存了原始的Java對象。另外,StreamRecord還保存了一個timestamp。

    那么這個對象是怎么變成LocalBufferPool內存池里的一個大號字節(jié)數(shù)組的呢?借助了StreamWriter這個類。

    我們直接來看把數(shù)據(jù)序列化交出去的方法:

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}

    ?

    先說最后一行,如果配置為flushAlways,那么會立刻把元素發(fā)送出去,但是這樣吞吐量會下降;Flink的默認設置其實也不是一個元素一個元素的發(fā)送,是單獨起了一個線程,每隔固定時間flush一次所有channel,較真起來也算是mini batch了。

    再說序列化那一句:SerializationResult result = serializer.addRecord(record);。在這行代碼中,Flink把對象調用該對象所屬的序列化器序列化為字節(jié)數(shù)組。

    6.2 數(shù)據(jù)流轉過程

    上一節(jié)講了各層數(shù)據(jù)的抽象,這一節(jié)講講數(shù)據(jù)在各個task之間exchange的過程。

    6.2.1 整體過程

    看這張圖:?

  • 第一步必然是準備一個ResultPartition;
  • 通知JobMaster;
  • JobMaster通知下游節(jié)點;如果下游節(jié)點尚未部署,則部署之;
  • 下游節(jié)點向上游請求數(shù)據(jù)
  • 開始傳輸數(shù)據(jù)
  • 6.2.2 數(shù)據(jù)跨task傳遞

    本節(jié)講一下算子之間具體的數(shù)據(jù)傳輸過程。也先上一張圖:?
    ?
    數(shù)據(jù)在task之間傳遞有如下幾步:

  • 數(shù)據(jù)在本operator處理完后,交給RecordWriter。每條記錄都要選擇一個下游節(jié)點,所以要經(jīng)過ChannelSelector。
  • 每個channel都有一個serializer(我認為這應該是為了避免多線程寫的麻煩),把這條Record序列化為ByteBuffer
  • 接下來數(shù)據(jù)被寫入ResultPartition下的各個subPartition里,此時該數(shù)據(jù)已經(jīng)存入DirectBuffer(MemorySegment)
  • 單獨的線程控制數(shù)據(jù)的flush速度,一旦觸發(fā)flush,則通過Netty的nio通道向對端寫入
  • 對端的netty client接收到數(shù)據(jù),decode出來,把數(shù)據(jù)拷貝到buffer里,然后通知InputChannel
  • 有可用的數(shù)據(jù)時,下游算子從阻塞醒來,從InputChannel取出buffer,再解序列化成record,交給算子執(zhí)行用戶代碼
  • 數(shù)據(jù)在不同機器的算子之間傳遞的步驟就是以上這些。

    了解了步驟之后,再來看一下部分關鍵代碼:?
    首先是把數(shù)據(jù)交給recordwriter。

    //RecordWriterOutput.java@Overridepublic void collect(StreamRecord<OUT> record) {if (this.outputTag != null) {// we are only responsible for emitting to the main inputreturn;}//這里可以看到把記錄交給了recordwriterpushToRecordWriter(record);}

    ?

    然后recordwriter把數(shù)據(jù)發(fā)送到對應的通道。

    //RecordWriter.javapublic void emit(T record) throws IOException, InterruptedException {//channelselector登場了for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {sendToTarget(record, targetChannel);}}private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {//選擇序列化器并序列化數(shù)據(jù)RecordSerializer<T> serializer = serializers[targetChannel];SerializationResult result = serializer.addRecord(record);while (result.isFullBuffer()) {if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {// If this was a full record, we are done. Not breaking// out of the loop at this point will lead to another// buffer request before breaking out (that would not be// a problem per se, but it can lead to stalls in the// pipeline).if (result.isFullRecord()) {break;}}BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);//寫入channelresult = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {targetPartition.flush(targetChannel);}}

    ?

    接下來是把數(shù)據(jù)推給底層設施(netty)的過程:

    //ResultPartition.java@Overridepublic void flushAll() {for (ResultSubpartition subpartition : subpartitions) {subpartition.flush();}}//PartitionRequestQueue.javavoid notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {//這里交給了netty server線程去推ctx.executor().execute(new Runnable() {@Overridepublic void run() {ctx.pipeline().fireUserEventTriggered(reader);}});}

    ?

    netty相關的部分:

    //AbstractChannelHandlerContext.javapublic ChannelHandlerContext fireUserEventTriggered(final Object event) {if (event == null) {throw new NullPointerException("event");} else {final AbstractChannelHandlerContext next = this.findContextInbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeUserEventTriggered(event);} else {executor.execute(new OneTimeTask() {public void run() {next.invokeUserEventTriggered(event);}});}return this;}}

    ?

    最后真實的寫入:

    //PartittionRequesetQueue.javaprivate void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {return;}// Queue an available reader for consumption. If the queue is empty,// we try trigger the actual write. Otherwise this will be handled by// the writeAndFlushNextMessageIfPossible calls.boolean triggerWrite = availableReaders.isEmpty();registerAvailableReader(reader);if (triggerWrite) {writeAndFlushNextMessageIfPossible(ctx.channel());}}private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {......next = reader.getNextBuffer();if (next == null) {if (!reader.isReleased()) {continue;}markAsReleased(reader.getReceiverId());Throwable cause = reader.getFailureCause();if (cause != null) {ErrorResponse msg = new ErrorResponse(new ProducerFailedException(cause),reader.getReceiverId());ctx.writeAndFlush(msg);}} else {// This channel was now removed from the available reader queue.// We re-add it into the queue if it is still availableif (next.moreAvailable()) {registerAvailableReader(reader);}BufferResponse msg = new BufferResponse(next.buffer(),reader.getSequenceNumber(),reader.getReceiverId(),next.buffersInBacklog());if (isEndOfPartitionEvent(next.buffer())) {reader.notifySubpartitionConsumed();reader.releaseAllResources();markAsReleased(reader.getReceiverId());}// Write and flush and wait until this is done before// trying to continue with the next buffer.channel.writeAndFlush(msg).addListener(writeListener);return;}......}

    ?

    上面這段代碼里第二個方法中調用的writeAndFlush(msg)就是真正往netty的nio通道里寫入的地方了。在這里,寫入的是一個RemoteInputChannel,對應的就是下游節(jié)點的InputGate的channels。

    有寫就有讀,nio通道的另一端需要讀入buffer,代碼如下:

    //CreditBasedPartitionRequestClientHandler.javaprivate void decodeMsg(Object msg) throws Throwable {final Class<?> msgClazz = msg.getClass();// ---- Buffer --------------------------------------------------------if (msgClazz == NettyMessage.BufferResponse.class) {NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);if (inputChannel == null) {bufferOrEvent.releaseBuffer();cancelRequestFor(bufferOrEvent.receiverId);return;}decodeBufferOrEvent(inputChannel, bufferOrEvent);} ......}

    ?

    插一句,Flink其實做阻塞和獲取數(shù)據(jù)的方式非常自然,利用了生產者和消費者模型,當獲取不到數(shù)據(jù)時,消費者自然阻塞;當數(shù)據(jù)被加入隊列,消費者被notify。Flink的背壓機制也是借此實現(xiàn)。

    然后在這里又反序列化成StreamRecord:

    //StreamElementSerializer.javapublic StreamElement deserialize(DataInputView source) throws IOException {int tag = source.readByte();if (tag == TAG_REC_WITH_TIMESTAMP) {long timestamp = source.readLong();return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);}else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {return new StreamRecord<T>(typeSerializer.deserialize(source));}else if (tag == TAG_WATERMARK) {return new Watermark(source.readLong());}else if (tag == TAG_STREAM_STATUS) {return new StreamStatus(source.readInt());}else if (tag == TAG_LATENCY_MARKER) {return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());}else {throw new IOException("Corrupt stream, found tag: " + tag);}}

    ?

    然后再次在StreamInputProcessor.processInput()循環(huán)中得到處理。

    至此,數(shù)據(jù)在跨jvm的節(jié)點之間的流轉過程就講完了。

    6.3 Credit漫談

    在看上一部分的代碼時,有一個小細節(jié)不知道讀者有沒有注意到,我們的數(shù)據(jù)發(fā)送端的代碼叫做PartittionRequesetQueue.java,而我們的接收端卻起了一個完全不相干的名字:CreditBasedPartitionRequestClientHandler.java。為什么前面加了CreditBased的前綴呢?

    6.3.1 背壓問題

    在流模型中,我們期待數(shù)據(jù)是像水流一樣平滑的流過我們的引擎,但現(xiàn)實生活不會這么美好。數(shù)據(jù)的上游可能因為各種原因數(shù)據(jù)量暴增,遠遠超出了下游的瞬時處理能力(回憶一下98年大洪水),導致系統(tǒng)崩潰。?
    那么框架應該怎么應對呢?和人類處理自然災害的方式類似,我們修建了三峽大壩,當洪水來臨時把大量的水囤積在大壩里;對于Flink來說,就是在數(shù)據(jù)的接收端和發(fā)送端放置了緩存池,用以緩沖數(shù)據(jù),并且設置閘門阻止數(shù)據(jù)向下流。

    那么Flink又是如何處理背壓的呢?答案也是靠這些緩沖池。?
    ?
    這張圖說明了Flink在生產和消費數(shù)據(jù)時的大致情況。ResultPartition和InputGate在輸出和輸入數(shù)據(jù)時,都要向NetworkBufferPool申請一塊MemorySegment作為緩存池。?
    接下來的情況和生產者消費者很類似。當數(shù)據(jù)發(fā)送太多,下游處理不過來了,那么首先InputChannel會被填滿,然后是InputChannel能申請到的內存達到最大,于是下游停止讀取數(shù)據(jù),上游負責發(fā)送數(shù)據(jù)的nettyServer會得到響應,停止從ResultSubPartition讀取緩存,那么ResultPartition很快也將存滿數(shù)據(jù)不能被消費,從而生產數(shù)據(jù)的邏輯被阻塞在獲取新buffer上,非常自然地形成背壓的效果。

    Flink自己做了個試驗用以說明這個機制的效果:?
    ?
    我們首先設置生產者的發(fā)送速度為60%,然后下游的算子以同樣的速度處理數(shù)據(jù)。然后我們將下游算子的處理速度降低到30%,可以看到上游的生產者的數(shù)據(jù)產生曲線幾乎與消費者同步下滑。而后當我們解除限速,整個流的速度立刻提高到了100%。

    6.3.2 使用Credit實現(xiàn)ATM網(wǎng)絡流控

    上文已經(jīng)提到,對于流量控制,一個樸素的思路就是在長江上建三峽鏈路上建立一個攔截的dam,如下圖所示:?
    ?
    基于Credit的流控就是這樣一種建立在信用(消費數(shù)據(jù)的能力)上的,面向每個虛鏈路(而非端到端的)流模型,如下圖所示:?
    ?
    首先,下游會向上游發(fā)送一條credit message,用以通知其目前的信用(可聯(lián)想信用卡的可用額度),然后上游會根據(jù)這個信用消息來決定向下游發(fā)送多少數(shù)據(jù)。當上游把數(shù)據(jù)發(fā)送給下游時,它就從下游的信用卡上劃走相應的額度(credit balance):?
    ?
    下游總共獲得的credit數(shù)目是Buf_Alloc,已經(jīng)消費的數(shù)據(jù)是Fwd_Cnt,上游發(fā)送出來的數(shù)據(jù)是Tx_Cnt,那么剩下的那部分就是Crd_Bal:?
    Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )?
    上面這個式子應該很好理解。

    可以看到,Credit Based Flow Control的關鍵是buffer分配。這種分配可以在數(shù)據(jù)的發(fā)送端完成,也可以在接收端完成。對于下游可能有多個上游節(jié)點的情況(比如Flink),使用接收端的credit分配更加合理:?
    ?
    上圖中,接收者可以觀察到每個上游連接的帶寬情況,而上游的節(jié)點Snd1卻不可能輕易知道發(fā)往同一個下游節(jié)點的其他Snd2的帶寬情況,從而如果在上游控制流量將會很困難,而在下游控制流量將會很方便。

    因此,這就是為何Flink在接收端有一個基于Credit的Client,而不是在發(fā)送端有一個CreditServer的原因。

    最后,再講一下Credit的面向虛鏈路的流設計和端到端的流設計的區(qū)別:?
    ?
    如上圖所示,a是面向連接的流設計,b是端到端的流設計。其中,a的設計使得當下游節(jié)點3因某些情況必須緩存數(shù)據(jù)暫緩處理時,每個上游節(jié)點(1和2)都可以利用其緩存保存數(shù)據(jù);而端到端的設計b里,只有節(jié)點3的緩存才可以用于保存數(shù)據(jù)(讀者可以從如何實現(xiàn)上想想為什么)。

    對流控制感興趣的讀者,可以看這篇文章:Traffic Management For High-Speed Networks。

    7.其他核心概念

    截至第六章,和執(zhí)行過程相關的部分就全部講完,告一段落了。第七章主要講一點雜七雜八的內容,有時間就不定期更新。

    7.1 EventTime時間模型

    flink有三種時間模型:ProcessingTime,EventTime和IngestionTime。?
    關于時間模型看這張圖:?
    ?
    從這張圖里可以很清楚的看到三種Time模型的區(qū)別。

    • EventTime是數(shù)據(jù)被生產出來的時間,可以是比如傳感器發(fā)出信號的時間等(此時數(shù)據(jù)還沒有被傳輸給flink)。
    • IngestionTime是數(shù)據(jù)進入flink的時間,也就是從Source進入flink流的時間(此時數(shù)據(jù)剛剛被傳給flink)
    • ProcessingTime是針對當前算子的系統(tǒng)時間,是指該數(shù)據(jù)已經(jīng)進入某個operator時,operator所在系統(tǒng)的當前時間

    例如,我在寫這段話的時間是2018年5月13日03點47分,但是我引用的這張EventTime的圖片,是2015年畫出來的,那么這張圖的EventTime是2015年,而ProcessingTime是現(xiàn)在。?
    Flink官網(wǎng)對于時間戳的解釋非常詳細:點我?
    Flink對于EventTime模型的實現(xiàn),依賴的是一種叫做watermark的對象。watermark是攜帶有時間戳的一個對象,會按照程序的要求被插入到數(shù)據(jù)流中,用以標志某個事件在該時間發(fā)生了。?
    我再做一點簡短的說明,還是以官網(wǎng)的圖為例:?
    ?
    對于有序到來的數(shù)據(jù),假設我們在timestamp為11的元素后加入一個watermark,時間記錄為11,則下個元素收到該watermark時,認為所有早于11的元素均已到達。這是非常理想的情況。?
    ?
    而在現(xiàn)實生活中,經(jīng)常會遇到亂序的數(shù)據(jù)。這時,我們雖然在timestamp為7的元素后就收到了11,但是我們一直等到了收到元素12之后,才插入了watermark為11的元素。與上面的圖相比,如果我們仍然在11后就插入11的watermark,那么元素9就會被丟棄,造成數(shù)據(jù)丟失。而我們在12之后插入watermark11,就保證了9仍然會被下一個operator處理。當然,我們不可能無限制的永遠等待遲到元素,所以要在哪個元素后插入11需要根據(jù)實際場景權衡。

    對于來自多個數(shù)據(jù)源的watermark,可以看這張圖:?
    ?
    可以看到,當一個operator收到多個watermark時,它遵循最小原則(或者說最早),即算子的當前watermark是流經(jīng)該算子的最小watermark,以容許來自不同的source的亂序數(shù)據(jù)到來。?
    關于事件時間模型,更多內容可以參考Stream 101?和谷歌的這篇論文:Dataflow Model paper

    7.2 FLIP-6 部署及處理模型演進

    就在老白寫這篇blog的時候,Flink發(fā)布了其1.5 RELEASE版本,號稱實現(xiàn)了其部署及處理模型(也就是FLIP-6),所以打算簡略地說一下FLIP-6的主要內容。

    7.2.1 現(xiàn)有模型不足

    1.5之前的Flink模型有很多不足,包括:

    • 只能靜態(tài)分配計算資源
    • 在YARN上所有的資源分配都是一碗水端平的
    • 與Docker/k8s的集成非常之蠢,頗有脫褲子放屁的神韻
    • JobManager沒有任務調度邏輯
    • 任務在YARN上執(zhí)行結束后web dashboard就不可用
    • 集群的session模式和per job模式混淆難以理解

    就我個人而言,我覺得Flink有一個這里完全沒提到的不足才是最應該修改的:針對任務的完全的資源隔離。尤其是如果用Standalone集群,一個用戶的task跑掛了TaskManager,然后拖垮了整個集群的情況簡直不要太多。

    7.2.2 核心變更

    Single Job JobManager?
    最重要的變更是一個JobManager只處理一個job。當我們生成JobGraph時就順便起一個JobManager,這顯然更加自然。

    ResourceManager?
    其職責包括獲取新的TM和slot,通知失敗,釋放資源以及緩存TM以用于重用等。重要的是,這個組件要能做到掛掉時不要搞垮正在運行的好好的任務。其職責和與JobManager、TaskManager的交互圖如下:?

    TaskManager?
    TM要與上面的兩個組件交互。與JobManager交互時,要能提供slot,要能與所有給出slot的JM交互。丟失與JM的連接時要能試圖把本TM上的slot的情況通告給新JM,如果這一步失敗,就要能重新分配slot。?
    與ResourceManager交互時,要通知RM自己的資源和當前的Job分配情況,能按照RM的要求分配資源或者關閉自身。

    JobManager Slot Pool?
    這個pool要持有所有分配給當前job的slot資源,并且能在RM掛掉的情況下管理當前已經(jīng)持有的slot。

    Dispatcher?
    需要一個Job的分發(fā)器的主要原因是在有的集群環(huán)境下我們可能需要一個統(tǒng)一的提交和監(jiān)控點,以及替代之前的Standalone模式下的JobManager。將來對分發(fā)器的期望可能包括權限控制等。?

    7.2.3 Cluster Manager的架構

    YARN?
    新的基于YARN的架構主要包括不再需要先在容器里啟動集群,然后提交任務;用戶代碼不再使用動態(tài)ClassLoader加載;不用的資源可以釋放;可以按需分配不同大小的容器等。其執(zhí)行過程如下:?
    無Dispatcher時?
    ?
    有Dispatcher時?

    Mesos?
    與基于YARN的模式很像,但是只有帶Dispatcher模式,因為只有這樣才能在Mesos集群里跑其RM。?
    ?
    Mesos的Fault Tolerance是類似這樣的:?
    ?
    必須用類似Marathon之類的技術保證Dispatcher的HA。

    Standalone?
    其實沒啥可說的,把以前的JobManager的職責換成現(xiàn)在的Dispatcher就行了。?
    ?
    將來可能會實現(xiàn)一個類似于輕量級Yarn的模式。

    Docker/k8s?
    用戶定義好容器,至少有一個是job specific的(不然怎么啟動任務);還有用于啟動TM的,可以不是job specific的。啟動過程如下?

    7.2.4 組件設計及細節(jié)

    分配slot相關細節(jié)?
    從新的TM取slot過程:?

    從Cached TM取slot過程:?

    失敗處理

  • TM失敗?
    TM失敗時,RM要能檢測到失敗,更新自己的狀態(tài),發(fā)送消息給JM,重啟一份TM;JM要能檢測到失敗,從狀態(tài)移除失效slot,標記該TM的task為失敗,并在沒有足夠slot繼續(xù)任務時調整規(guī)模;TM自身則要能從Checkpoint恢復

  • RM失敗?
    此時TM要能檢測到失敗,并準備向新的RM注冊自身,并且向新的RM傳遞自身的資源情況;JM要能檢測到失敗并且等待新的RM可用,重新請求需要的資源;丟失的數(shù)據(jù)要能從Container、TM等處恢復。

  • JM失敗?
    TM釋放所有task,向新JM注冊資源,并且如果不成功,就向RM報告這些資源可用于重分配;RM坐等;JM丟失的數(shù)據(jù)從持久化存儲中獲得,已完成的checkpoints從HA恢復,從最近的checkpoint重啟task,并申請資源。

  • JM & RM 失敗?
    TM將在一段時間內試圖把資源交給新上任的JM,如果失敗,則把資源交給新的RM

  • TM & RM失敗?
    JM如果正在申請資源,則要等到新的RM啟動后才能獲得;JM可能需要調整其規(guī)模,因為損失了TM的slot。

  • 8.后記

    Flink是當前流處理領域的優(yōu)秀框架,其設計思想和代碼實現(xiàn)都蘊含著許多人的智慧結晶。這篇解讀花了很多時間,篇幅也寫了很長,也仍然不能能覆蓋Flink的方方面面,也肯定有很多錯誤之處,歡迎大家批評指正!Flink生態(tài)里中文資料確實不多,對Flink源碼有興趣的讀者,可以參考VinoYang的專欄,繼續(xù)學習之旅。

    本文至此結束。

    轉載于:https://www.cnblogs.com/davidwang456/articles/10647291.html

    總結

    以上是生活随笔為你收集整理的追源索骥:透过源码看懂Flink核心框架的执行流程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。

    日日麻批免费40分钟无码 | 天天拍夜夜添久久精品大 | 伊人色综合久久天天小片 | 久久久婷婷五月亚洲97号色 | 少妇无套内谢久久久久 | 一本加勒比波多野结衣 | 亚洲国产欧美在线成人 | 欧美变态另类xxxx | 亚洲狠狠色丁香婷婷综合 | 午夜免费福利小电影 | 2020久久香蕉国产线看观看 | 一区二区传媒有限公司 | 美女扒开屁股让男人桶 | 夜精品a片一区二区三区无码白浆 | 日日橹狠狠爱欧美视频 | 国产熟妇另类久久久久 | 日本欧美一区二区三区乱码 | 国产精品人人爽人人做我的可爱 | 性欧美大战久久久久久久 | 国产成人无码一二三区视频 | 亚洲乱码日产精品bd | 亚洲另类伦春色综合小说 | 性欧美熟妇videofreesex | 日本va欧美va欧美va精品 | 色一情一乱一伦一区二区三欧美 | 久久久久人妻一区精品色欧美 | 亚洲国产日韩a在线播放 | 国产精品久久久av久久久 | 暴力强奷在线播放无码 | 国产亲子乱弄免费视频 | 久久亚洲中文字幕精品一区 | 日韩人妻系列无码专区 | 四虎永久在线精品免费网址 | 欧美熟妇另类久久久久久多毛 | 一个人看的视频www在线 | 午夜免费福利小电影 | 国产亚洲精品久久久久久大师 | 啦啦啦www在线观看免费视频 | 欧美自拍另类欧美综合图片区 | 性做久久久久久久久 | 国产手机在线αⅴ片无码观看 | 国产亚洲日韩欧美另类第八页 | 午夜无码区在线观看 | 丰满护士巨好爽好大乳 | 久久zyz资源站无码中文动漫 | 国产午夜精品一区二区三区嫩草 | 精品久久8x国产免费观看 | 国产口爆吞精在线视频 | 国产激情综合五月久久 | 国产在线aaa片一区二区99 | 欧美日韩亚洲国产精品 | 久久99精品国产.久久久久 | 熟妇人妻激情偷爽文 | 乌克兰少妇性做爰 | 亚洲理论电影在线观看 | 欧美三级不卡在线观看 | 日本爽爽爽爽爽爽在线观看免 | 精品厕所偷拍各类美女tp嘘嘘 | 久久国产精品偷任你爽任你 | 国产在线一区二区三区四区五区 | 亚洲精品一区二区三区大桥未久 | 牲欲强的熟妇农村老妇女视频 | 免费观看黄网站 | 国产av一区二区三区最新精品 | 欧美freesex黑人又粗又大 | 人人妻人人澡人人爽人人精品浪潮 | a在线观看免费网站大全 | 丝袜足控一区二区三区 | 国产电影无码午夜在线播放 | 国产女主播喷水视频在线观看 | 午夜福利试看120秒体验区 | 小泽玛莉亚一区二区视频在线 | 日本成熟视频免费视频 | 学生妹亚洲一区二区 | 天堂无码人妻精品一区二区三区 | 天堂а√在线中文在线 | 久久综合激激的五月天 | 性生交片免费无码看人 | 国产成人精品必看 | 亚洲日本在线电影 | 狠狠色欧美亚洲狠狠色www | 捆绑白丝粉色jk震动捧喷白浆 | 色 综合 欧美 亚洲 国产 | 国产精品理论片在线观看 | 亚洲精品国产精品乱码视色 | 亚洲精品久久久久久久久久久 | 丝袜人妻一区二区三区 | 欧美35页视频在线观看 | 日日麻批免费40分钟无码 | 亚洲精品午夜国产va久久成人 | 日韩视频 中文字幕 视频一区 | 狠狠综合久久久久综合网 | 精品水蜜桃久久久久久久 | 免费人成网站视频在线观看 | 国产成人无码av在线影院 | 亚洲国产精品久久人人爱 | 久久久久久久久888 | 精品人人妻人人澡人人爽人人 | 人妻少妇精品无码专区二区 | 成人无码精品一区二区三区 | 亚洲国产精品久久人人爱 | 日韩少妇白浆无码系列 | 激情内射日本一区二区三区 | 丰满岳乱妇在线观看中字无码 | 日本精品人妻无码免费大全 | 国产亚洲精品久久久久久久 | 中文字幕人妻无码一区二区三区 | 牲欲强的熟妇农村老妇女 | 两性色午夜免费视频 | www国产亚洲精品久久久日本 | 无码纯肉视频在线观看 | 亚洲色偷偷偷综合网 | 夜夜夜高潮夜夜爽夜夜爰爰 | 东京一本一道一二三区 | 无码国产激情在线观看 | 色综合久久久无码网中文 | 亚洲熟女一区二区三区 | 给我免费的视频在线观看 | 国产激情艳情在线看视频 | 亚洲高清偷拍一区二区三区 | 亚洲男人av香蕉爽爽爽爽 | 亚洲 激情 小说 另类 欧美 | 内射后入在线观看一区 | 国产精品久久久久7777 | 精品久久久无码人妻字幂 | 成熟人妻av无码专区 | 一本无码人妻在中文字幕免费 | 美女毛片一区二区三区四区 | 无遮挡啪啪摇乳动态图 | 国精产品一区二区三区 | 西西人体www44rt大胆高清 | 综合激情五月综合激情五月激情1 | 1000部夫妻午夜免费 | 中文字幕人妻无码一夲道 | 亚洲精品一区二区三区婷婷月 | 人人澡人人妻人人爽人人蜜桃 | 亚洲中文字幕无码中字 | 久久人人爽人人爽人人片av高清 | 内射白嫩少妇超碰 | 国产精品a成v人在线播放 | 精品久久8x国产免费观看 | 丰满人妻被黑人猛烈进入 | 性欧美熟妇videofreesex | 欧美野外疯狂做受xxxx高潮 | 在线 国产 欧美 亚洲 天堂 | 伊人久久大香线蕉亚洲 | 欧美第一黄网免费网站 | 人妻互换免费中文字幕 | 欧美性生交活xxxxxdddd | 未满成年国产在线观看 | 色综合久久中文娱乐网 | 欧美老熟妇乱xxxxx | 久久精品国产亚洲精品 | 成人亚洲精品久久久久软件 | 亚洲精品美女久久久久久久 | 俄罗斯老熟妇色xxxx | 扒开双腿吃奶呻吟做受视频 | 亚洲第一网站男人都懂 | 99久久精品日本一区二区免费 | 人人妻人人澡人人爽欧美一区 | 在线播放亚洲第一字幕 | 久久国语露脸国产精品电影 | 真人与拘做受免费视频 | 欧美变态另类xxxx | 乱中年女人伦av三区 | 2019午夜福利不卡片在线 | 成 人 网 站国产免费观看 | 亚洲欧美精品伊人久久 | 99久久婷婷国产综合精品青草免费 | 精品无码国产一区二区三区av | 国产猛烈高潮尖叫视频免费 | 国产亚洲视频中文字幕97精品 | 中文字幕乱码人妻无码久久 | 国模大胆一区二区三区 | 久久久国产一区二区三区 | 精品成人av一区二区三区 | 国产黑色丝袜在线播放 | 97夜夜澡人人爽人人喊中国片 | 亚洲色偷偷偷综合网 | 内射老妇bbwx0c0ck | 久久亚洲日韩精品一区二区三区 | 久久久www成人免费毛片 | 国产人成高清在线视频99最全资源 | 亚洲成在人网站无码天堂 | 精品人人妻人人澡人人爽人人 | 丰满少妇高潮惨叫视频 | 色 综合 欧美 亚洲 国产 | 亚洲 欧美 激情 小说 另类 | 国产两女互慰高潮视频在线观看 | 免费观看黄网站 | 97人妻精品一区二区三区 | 亚洲一区二区三区无码久久 | 亚洲精品国偷拍自产在线观看蜜桃 | 玩弄少妇高潮ⅹxxxyw | 日韩av激情在线观看 | 久久天天躁夜夜躁狠狠 | 中文亚洲成a人片在线观看 | 精品无码国产自产拍在线观看蜜 | 又色又爽又黄的美女裸体网站 | 国产69精品久久久久app下载 | 国产av久久久久精东av | 131美女爱做视频 | 国精品人妻无码一区二区三区蜜柚 | 色综合久久久无码中文字幕 | 国产九九九九九九九a片 | 国产精品第一国产精品 | 国内精品一区二区三区不卡 | 在线天堂新版最新版在线8 | 久久精品丝袜高跟鞋 | aa片在线观看视频在线播放 | 日本高清一区免费中文视频 | 久久亚洲中文字幕无码 | 99re在线播放 | 少妇无码av无码专区在线观看 | 天干天干啦夜天干天2017 | 男人扒开女人内裤强吻桶进去 | 精品欧美一区二区三区久久久 | 水蜜桃亚洲一二三四在线 | 国内综合精品午夜久久资源 | 国产精品99久久精品爆乳 | 国产亲子乱弄免费视频 | 无码国产色欲xxxxx视频 | 国产午夜福利100集发布 | 青草青草久热国产精品 | 中文毛片无遮挡高清免费 | 妺妺窝人体色www在线小说 | 人人澡人摸人人添 | 荫蒂被男人添的好舒服爽免费视频 | 麻豆av传媒蜜桃天美传媒 | 国产成人精品久久亚洲高清不卡 | 377p欧洲日本亚洲大胆 | 无码国模国产在线观看 | 人人妻在人人 | 国产国语老龄妇女a片 | 夜精品a片一区二区三区无码白浆 | 国产精品久久久 | 国产激情无码一区二区 | 免费观看激色视频网站 | 一本一道久久综合久久 | 最近免费中文字幕中文高清百度 | 东京热男人av天堂 | 乱人伦中文视频在线观看 | 久久久久久久人妻无码中文字幕爆 | 沈阳熟女露脸对白视频 | 人人爽人人爽人人片av亚洲 | 日日干夜夜干 | 5858s亚洲色大成网站www | 久久www免费人成人片 | 亚洲精品综合五月久久小说 | 清纯唯美经典一区二区 | 欧美xxxxx精品 | 亚洲国产一区二区三区在线观看 | 扒开双腿疯狂进出爽爽爽视频 | 久久99精品国产麻豆蜜芽 | 国产片av国语在线观看 | 亚洲国产欧美在线成人 | 中文字幕亚洲情99在线 | 日韩视频 中文字幕 视频一区 | 精品一区二区三区波多野结衣 | 老太婆性杂交欧美肥老太 | 伊在人天堂亚洲香蕉精品区 | 一个人免费观看的www视频 | 亚洲中文字幕乱码av波多ji | 日韩精品无码免费一区二区三区 | 国产精品99久久精品爆乳 | 日本乱偷人妻中文字幕 | 人人妻人人澡人人爽欧美一区九九 | 亚洲熟妇色xxxxx欧美老妇y | 思思久久99热只有频精品66 | 乱人伦人妻中文字幕无码久久网 | 国产精品成人av在线观看 | 女人被爽到呻吟gif动态图视看 | 少妇被黑人到高潮喷出白浆 | 久久久久人妻一区精品色欧美 | 国产午夜精品一区二区三区嫩草 | 中文字幕亚洲情99在线 | 国产成人精品三级麻豆 | 大肉大捧一进一出好爽视频 | 亚洲 欧美 激情 小说 另类 | 欧美性猛交xxxx富婆 | 成人无码视频免费播放 | 国产成人综合色在线观看网站 | yw尤物av无码国产在线观看 | 最新国产麻豆aⅴ精品无码 | 综合激情五月综合激情五月激情1 | 精品一二三区久久aaa片 | 97夜夜澡人人双人人人喊 | 少妇无码av无码专区在线观看 | 亚洲中文字幕久久无码 | 日韩亚洲欧美中文高清在线 | 97夜夜澡人人爽人人喊中国片 | 婷婷五月综合激情中文字幕 | 久久这里只有精品视频9 | 西西人体www44rt大胆高清 | 水蜜桃色314在线观看 | 成年美女黄网站色大免费全看 | 色婷婷综合中文久久一本 | 97资源共享在线视频 | 欧美激情综合亚洲一二区 | 久精品国产欧美亚洲色aⅴ大片 | 午夜福利不卡在线视频 | 午夜精品一区二区三区在线观看 | 国产免费久久久久久无码 | 人妻少妇精品无码专区动漫 | 亚洲色偷偷偷综合网 | 精品熟女少妇av免费观看 | 乌克兰少妇性做爰 | 午夜男女很黄的视频 | 日产国产精品亚洲系列 | 日本精品人妻无码77777 天堂一区人妻无码 | 日本免费一区二区三区最新 | 精品厕所偷拍各类美女tp嘘嘘 | 最新版天堂资源中文官网 | 亚洲欧美国产精品久久 | 日本精品高清一区二区 | 爱做久久久久久 | 国产精品沙发午睡系列 | 女人高潮内射99精品 | 未满小14洗澡无码视频网站 | 国产亚洲精品久久久久久 | 白嫩日本少妇做爰 | 少妇高潮一区二区三区99 | 精品人妻中文字幕有码在线 | 人人爽人人澡人人高潮 | 国产无遮挡又黄又爽免费视频 | 免费人成网站视频在线观看 | 人妻插b视频一区二区三区 | av无码电影一区二区三区 | 天天爽夜夜爽夜夜爽 | 性生交大片免费看女人按摩摩 | 正在播放老肥熟妇露脸 | 久久精品国产99久久6动漫 | 在线欧美精品一区二区三区 | 亚洲乱码国产乱码精品精 | 丰满人妻被黑人猛烈进入 | 青春草在线视频免费观看 | 99久久婷婷国产综合精品青草免费 | 粉嫩少妇内射浓精videos | 18黄暴禁片在线观看 | 国产成人精品视频ⅴa片软件竹菊 | 久久这里只有精品视频9 | 中文字幕无线码免费人妻 | 亚洲爆乳精品无码一区二区三区 | 国产一区二区不卡老阿姨 | 日韩无码专区 | 人人爽人人爽人人片av亚洲 | 国产精品毛片一区二区 | 无码成人精品区在线观看 | 久久久久成人片免费观看蜜芽 | 亚洲 日韩 欧美 成人 在线观看 | 国产真实乱对白精彩久久 | 国产又爽又猛又粗的视频a片 | 女人高潮内射99精品 | 亚洲国产成人av在线观看 | 国产亚洲精品久久久久久国模美 | 中文字幕中文有码在线 | 亚洲一区二区三区含羞草 | 少妇性l交大片 | 国产精品久久国产三级国 | 丝袜美腿亚洲一区二区 | 老头边吃奶边弄进去呻吟 | 欧美日韩一区二区免费视频 | 最新国产乱人伦偷精品免费网站 | 成人性做爰aaa片免费看不忠 | аⅴ资源天堂资源库在线 | 麻豆精产国品 | 日本爽爽爽爽爽爽在线观看免 | aⅴ亚洲 日韩 色 图网站 播放 | 成人女人看片免费视频放人 | 国产精品无码一区二区三区不卡 | 无码人妻久久一区二区三区不卡 | 国产一区二区不卡老阿姨 | 九九久久精品国产免费看小说 | 成人aaa片一区国产精品 | 成 人 网 站国产免费观看 | 奇米影视7777久久精品人人爽 | 中文亚洲成a人片在线观看 | 成人精品一区二区三区中文字幕 | 精品国产一区二区三区四区在线看 | 人人妻人人澡人人爽欧美精品 | 久久国产精品精品国产色婷婷 | 精品亚洲韩国一区二区三区 | 十八禁视频网站在线观看 | 国内少妇偷人精品视频免费 | 国产精品人妻一区二区三区四 | 水蜜桃色314在线观看 | 亚洲色在线无码国产精品不卡 | 国产特级毛片aaaaaa高潮流水 | 亚洲区小说区激情区图片区 | 久久国产精品二国产精品 | 97se亚洲精品一区 | 久久亚洲中文字幕精品一区 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 啦啦啦www在线观看免费视频 | 男女爱爱好爽视频免费看 | 国产尤物精品视频 | 九月婷婷人人澡人人添人人爽 | 久久亚洲中文字幕精品一区 | 国产精品亚洲а∨无码播放麻豆 | 波多野结衣av一区二区全免费观看 | 国产热a欧美热a在线视频 | 亚洲无人区一区二区三区 | 国产三级精品三级男人的天堂 | 性色欲网站人妻丰满中文久久不卡 | 久久精品人妻少妇一区二区三区 | 午夜无码人妻av大片色欲 | 青春草在线视频免费观看 | 对白脏话肉麻粗话av | 色偷偷人人澡人人爽人人模 | 国产精品美女久久久久av爽李琼 | 无码成人精品区在线观看 | 人人超人人超碰超国产 | 高潮毛片无遮挡高清免费 | 大肉大捧一进一出视频出来呀 | 欧美成人免费全部网站 | 国内精品人妻无码久久久影院蜜桃 | 又大又硬又爽免费视频 | 亚无码乱人伦一区二区 | 国产精品.xx视频.xxtv | 欧美黑人巨大xxxxx | 久久亚洲中文字幕精品一区 | 东京无码熟妇人妻av在线网址 | 女人和拘做爰正片视频 | 67194成是人免费无码 | 麻豆av传媒蜜桃天美传媒 | 国产在线无码精品电影网 | 国产成人无码av一区二区 | 无码毛片视频一区二区本码 | 中文精品久久久久人妻不卡 | 国产超级va在线观看视频 | 又紧又大又爽精品一区二区 | 99久久人妻精品免费二区 | 大屁股大乳丰满人妻 | 牛和人交xxxx欧美 | 55夜色66夜色国产精品视频 | 老司机亚洲精品影院无码 | 18禁黄网站男男禁片免费观看 | 麻豆国产丝袜白领秘书在线观看 | 色婷婷av一区二区三区之红樱桃 | 又大又黄又粗又爽的免费视频 | 久久久精品456亚洲影院 | 国产精品无码永久免费888 | 樱花草在线社区www | 久久久久成人精品免费播放动漫 | 玩弄少妇高潮ⅹxxxyw | 亚洲色在线无码国产精品不卡 | 中文字幕+乱码+中文字幕一区 | 国产乱人伦av在线无码 | 无码人妻丰满熟妇区五十路百度 | 国产精品国产自线拍免费软件 | 初尝人妻少妇中文字幕 | 一二三四在线观看免费视频 | 曰本女人与公拘交酡免费视频 | 女人和拘做爰正片视频 | 狠狠综合久久久久综合网 | 亚洲综合无码久久精品综合 | 女人被爽到呻吟gif动态图视看 | 亚洲精品中文字幕 | 亚洲国产高清在线观看视频 | 国产人妻精品午夜福利免费 | 东京热无码av男人的天堂 | 欧美老妇与禽交 | 高潮毛片无遮挡高清免费视频 | 中文字幕+乱码+中文字幕一区 | 久久无码专区国产精品s | 少妇被黑人到高潮喷出白浆 | 无码av免费一区二区三区试看 | 精品国产一区二区三区av 性色 | 蜜臀av在线播放 久久综合激激的五月天 | 强伦人妻一区二区三区视频18 | 亚洲中文无码av永久不收费 | 亚洲日韩乱码中文无码蜜桃臀网站 | 亚洲精品一区二区三区在线观看 | 色诱久久久久综合网ywww | aa片在线观看视频在线播放 | 成人精品视频一区二区三区尤物 | 国产亚洲人成a在线v网站 | 欧美野外疯狂做受xxxx高潮 | 成人免费视频视频在线观看 免费 | 久久亚洲精品中文字幕无男同 | 欧洲vodafone精品性 | 国产卡一卡二卡三 | 熟女少妇人妻中文字幕 | 性欧美熟妇videofreesex | 人人妻人人澡人人爽精品欧美 | 亚洲熟妇自偷自拍另类 | 色窝窝无码一区二区三区色欲 | 人人妻人人澡人人爽欧美一区九九 | 亚洲中文字幕无码中文字在线 | 少妇高潮喷潮久久久影院 | 99精品无人区乱码1区2区3区 | 日日夜夜撸啊撸 | 狠狠躁日日躁夜夜躁2020 | 伦伦影院午夜理论片 | 国产在线精品一区二区高清不卡 | 国产69精品久久久久app下载 | 亚洲欧美精品伊人久久 | 亚洲精品国产精品乱码不卡 | 欧美激情一区二区三区成人 | 成人性做爰aaa片免费看 | 国产婷婷色一区二区三区在线 | 老司机亚洲精品影院无码 | 老熟女乱子伦 | 国产精品手机免费 | 伊人久久婷婷五月综合97色 | 青青久在线视频免费观看 | 国产精品香蕉在线观看 | 男女下面进入的视频免费午夜 | 免费乱码人妻系列无码专区 | 成人精品天堂一区二区三区 | 国产亚洲精品久久久久久国模美 | 亚洲精品国产精品乱码不卡 | aⅴ在线视频男人的天堂 | 国产精品无套呻吟在线 | 国内综合精品午夜久久资源 | 午夜福利试看120秒体验区 | 国产精品无码成人午夜电影 | 欧洲欧美人成视频在线 | 国产精品人妻一区二区三区四 | 无码人中文字幕 | 377p欧洲日本亚洲大胆 | 奇米影视7777久久精品人人爽 | √8天堂资源地址中文在线 | 十八禁真人啪啪免费网站 | 国产精品久久久久久久9999 | 日本熟妇人妻xxxxx人hd | 欧美丰满少妇xxxx性 | 人妻体内射精一区二区三四 | 欧美午夜特黄aaaaaa片 | 人妻少妇精品久久 | 伊人久久大香线焦av综合影院 | 亚洲人亚洲人成电影网站色 | 免费国产成人高清在线观看网站 | 久久人人爽人人人人片 | 国产舌乚八伦偷品w中 | 国产高清av在线播放 | 无码人妻丰满熟妇区毛片18 | 性史性农村dvd毛片 | 少妇无码一区二区二三区 | 麻豆果冻传媒2021精品传媒一区下载 | 99久久人妻精品免费一区 | 国产精品永久免费视频 | 亚洲成a人一区二区三区 | 夜精品a片一区二区三区无码白浆 | 成人av无码一区二区三区 | 色婷婷综合激情综在线播放 | 香港三级日本三级妇三级 | 久久亚洲a片com人成 | 亚洲熟悉妇女xxx妇女av | 少妇厨房愉情理9仑片视频 | 国产精品嫩草久久久久 | 国产精品内射视频免费 | 无码人妻出轨黑人中文字幕 | 国产人妻久久精品二区三区老狼 | 波多野结衣av一区二区全免费观看 | 成年美女黄网站色大免费视频 | 亚洲综合久久一区二区 | 少妇被粗大的猛进出69影院 | 澳门永久av免费网站 | 性欧美疯狂xxxxbbbb | 一区二区三区乱码在线 | 欧洲 | 思思久久99热只有频精品66 | 国产在线精品一区二区三区直播 | 麻豆国产97在线 | 欧洲 | 日韩少妇内射免费播放 | 久久国产自偷自偷免费一区调 | 国产精品美女久久久网av | 精品少妇爆乳无码av无码专区 | а天堂中文在线官网 | 狠狠色噜噜狠狠狠狠7777米奇 | 超碰97人人做人人爱少妇 | 日日天干夜夜狠狠爱 | 亚洲综合色区中文字幕 | 天下第一社区视频www日本 | 激情国产av做激情国产爱 | 欧美真人作爱免费视频 | 亚洲熟女一区二区三区 | 日韩精品乱码av一区二区 | 国产激情综合五月久久 | 在线精品亚洲一区二区 | 精品国产av色一区二区深夜久久 | 中文字幕乱妇无码av在线 | 亚洲午夜久久久影院 | 天下第一社区视频www日本 | 蜜桃视频插满18在线观看 | 四虎国产精品一区二区 | 色 综合 欧美 亚洲 国产 | 人人爽人人澡人人高潮 | 久久午夜无码鲁丝片秋霞 | 亚洲成a人片在线观看日本 | 97夜夜澡人人双人人人喊 | 亚洲日韩一区二区三区 | 亚洲の无码国产の无码步美 | 亚洲人亚洲人成电影网站色 | 丰满少妇弄高潮了www | 亚洲 高清 成人 动漫 | 综合人妻久久一区二区精品 | 欧洲极品少妇 | 少妇被粗大的猛进出69影院 | av无码久久久久不卡免费网站 | 亚洲国产成人a精品不卡在线 | 国产一区二区不卡老阿姨 | 国产av久久久久精东av | 青青草原综合久久大伊人精品 | 午夜时刻免费入口 | 牲欲强的熟妇农村老妇女视频 | 国产av一区二区精品久久凹凸 | 精品偷拍一区二区三区在线看 | 夜夜躁日日躁狠狠久久av | 国产 精品 自在自线 | 青青草原综合久久大伊人精品 | 国产精品va在线播放 | 国产精品久久久久久亚洲影视内衣 | 国产精品无码一区二区三区不卡 | 成人无码精品一区二区三区 | 无码人妻精品一区二区三区下载 | 国产一区二区三区四区五区加勒比 | 久久久中文久久久无码 | 一本大道伊人av久久综合 | 国产成人无码午夜视频在线观看 | 国产两女互慰高潮视频在线观看 | 国产黑色丝袜在线播放 | 久9re热视频这里只有精品 | 亚洲自偷精品视频自拍 | 国产成人午夜福利在线播放 | 亚洲国产欧美日韩精品一区二区三区 | 精品久久久中文字幕人妻 | 无遮挡啪啪摇乳动态图 | 99riav国产精品视频 | 欧美国产日韩亚洲中文 | 亚洲成a人片在线观看无码3d | 日本一区二区三区免费播放 | 久久99热只有频精品8 | 人妻少妇精品视频专区 | 成人精品天堂一区二区三区 | 性欧美熟妇videofreesex | 日韩精品一区二区av在线 | 国产一区二区不卡老阿姨 | 欧美色就是色 | 成熟女人特级毛片www免费 | 任你躁国产自任一区二区三区 | 国产亚洲欧美日韩亚洲中文色 | 亚洲 a v无 码免 费 成 人 a v | 中文无码成人免费视频在线观看 | av无码久久久久不卡免费网站 | 四虎影视成人永久免费观看视频 | 中文字幕无码日韩专区 | 成年美女黄网站色大免费视频 | 久久久久国色av免费观看性色 | 久久久精品欧美一区二区免费 | 又大又黄又粗又爽的免费视频 | 精品国产国产综合精品 | 国产精品二区一区二区aⅴ污介绍 | 国产亚洲精品精品国产亚洲综合 | 国产性生大片免费观看性 | 日本一区二区三区免费播放 | 欧美野外疯狂做受xxxx高潮 | 国产精品无码mv在线观看 | 日韩精品乱码av一区二区 | 国产免费观看黄av片 | 亚洲 a v无 码免 费 成 人 a v | 在线观看欧美一区二区三区 | 午夜嘿嘿嘿影院 | 在线a亚洲视频播放在线观看 | 无码午夜成人1000部免费视频 | 波多野结衣一区二区三区av免费 | 中文字幕无码视频专区 | 精品一区二区不卡无码av | 人人澡人摸人人添 | 99久久99久久免费精品蜜桃 | 无码国产色欲xxxxx视频 | 日日摸天天摸爽爽狠狠97 | 成人aaa片一区国产精品 | 国产精品无码mv在线观看 | 久在线观看福利视频 | 蜜桃臀无码内射一区二区三区 | 天堂а√在线地址中文在线 | 国产熟妇高潮叫床视频播放 | 久热国产vs视频在线观看 | 人人澡人人妻人人爽人人蜜桃 | 2019午夜福利不卡片在线 | 国产精品久久久久无码av色戒 | 老司机亚洲精品影院 | 久久久久人妻一区精品色欧美 | 在线看片无码永久免费视频 | 无码av中文字幕免费放 | 人人超人人超碰超国产 | 久久久久久a亚洲欧洲av冫 | 亚洲中文字幕在线观看 | 性色欲情网站iwww九文堂 | 无码人妻黑人中文字幕 | 小sao货水好多真紧h无码视频 | 日日摸夜夜摸狠狠摸婷婷 | 天下第一社区视频www日本 | 又色又爽又黄的美女裸体网站 | 国产69精品久久久久app下载 | 亚洲理论电影在线观看 | 国产另类ts人妖一区二区 | 人妻天天爽夜夜爽一区二区 | 国产亚洲精品久久久久久国模美 | 久久综合九色综合欧美狠狠 | 未满成年国产在线观看 | 99精品久久毛片a片 | 精品人妻中文字幕有码在线 | 日本熟妇人妻xxxxx人hd | 中国女人内谢69xxxxxa片 | 国产亚洲精品精品国产亚洲综合 | 亚洲理论电影在线观看 | 亚洲欧美中文字幕5发布 | 免费人成网站视频在线观看 | 天下第一社区视频www日本 | 国产网红无码精品视频 | 国产激情一区二区三区 | 国产激情无码一区二区app | 婷婷丁香五月天综合东京热 | 国产精品99久久精品爆乳 | 婷婷色婷婷开心五月四房播播 | 中文字幕无码日韩欧毛 | www国产精品内射老师 | 玩弄人妻少妇500系列视频 | 在线播放无码字幕亚洲 | 亚洲成a人一区二区三区 | 亚洲色在线无码国产精品不卡 | 伊在人天堂亚洲香蕉精品区 | 午夜精品久久久内射近拍高清 | 老熟女乱子伦 | 一个人免费观看的www视频 | 亚洲乱码国产乱码精品精 | 久久综合久久自在自线精品自 | 亚洲色在线无码国产精品不卡 | 麻豆人妻少妇精品无码专区 | 精品久久久久久人妻无码中文字幕 | 内射老妇bbwx0c0ck | 久久久久国色av免费观看性色 | 欧美日韩一区二区免费视频 | 成人免费视频视频在线观看 免费 | 色 综合 欧美 亚洲 国产 | 日韩亚洲欧美精品综合 | 又粗又大又硬又长又爽 | 精品欧美一区二区三区久久久 | 亚洲天堂2017无码中文 | 国产精品无码永久免费888 | 清纯唯美经典一区二区 | 久久综合激激的五月天 | 中文字幕无线码免费人妻 | 欧美丰满熟妇xxxx | 精品国产成人一区二区三区 | 欧美国产日韩亚洲中文 | 国产偷抇久久精品a片69 | 成人动漫在线观看 | 欧美一区二区三区 | 午夜精品一区二区三区的区别 | 亚洲精品一区二区三区在线观看 | 5858s亚洲色大成网站www | 午夜精品久久久内射近拍高清 | 色情久久久av熟女人妻网站 | 国产麻豆精品一区二区三区v视界 | 精品厕所偷拍各类美女tp嘘嘘 | 国产精品久久国产精品99 | 天天av天天av天天透 | 亚洲码国产精品高潮在线 | 国产两女互慰高潮视频在线观看 | 99久久久无码国产aaa精品 | 久久精品国产日本波多野结衣 | 国产成人无码av在线影院 | 无套内谢的新婚少妇国语播放 | 久久久久99精品国产片 | 亚洲中文字幕成人无码 | 成人免费视频一区二区 | 2020久久香蕉国产线看观看 | 人人爽人人澡人人人妻 | 久久aⅴ免费观看 | 超碰97人人射妻 | 中文字幕av无码一区二区三区电影 | 亚洲狠狠婷婷综合久久 | 国产亚洲精品久久久久久国模美 | 中文字幕久久久久人妻 | 亚洲狠狠婷婷综合久久 | 亚洲精品国偷拍自产在线麻豆 | 九九在线中文字幕无码 | 国产成人亚洲综合无码 | 学生妹亚洲一区二区 | 色窝窝无码一区二区三区色欲 | 日韩在线不卡免费视频一区 | 亚洲精品国偷拍自产在线麻豆 | 又大又黄又粗又爽的免费视频 | 成年美女黄网站色大免费全看 | 扒开双腿吃奶呻吟做受视频 | 激情五月综合色婷婷一区二区 | 亚洲国精产品一二二线 | 7777奇米四色成人眼影 | 亚洲国产精品久久久天堂 | 97夜夜澡人人爽人人喊中国片 | 国产午夜无码视频在线观看 | 久久午夜无码鲁丝片午夜精品 | 国产午夜无码精品免费看 | 激情内射日本一区二区三区 | 国内精品久久毛片一区二区 | 好男人www社区 | 大肉大捧一进一出好爽视频 | 3d动漫精品啪啪一区二区中 | 在线观看欧美一区二区三区 | 日本va欧美va欧美va精品 | 国产高潮视频在线观看 | 国产综合色产在线精品 | 亚洲一区二区三区播放 | 狂野欧美性猛xxxx乱大交 | 影音先锋中文字幕无码 | 久久亚洲日韩精品一区二区三区 | 3d动漫精品啪啪一区二区中 | 久热国产vs视频在线观看 | 熟女少妇人妻中文字幕 | 免费观看激色视频网站 | 无码人妻av免费一区二区三区 | 亚无码乱人伦一区二区 | 领导边摸边吃奶边做爽在线观看 | 久久人妻内射无码一区三区 | 特级做a爰片毛片免费69 | 国产一区二区三区四区五区加勒比 | 久久久久免费看成人影片 | 国产精华av午夜在线观看 | 日本精品人妻无码77777 天堂一区人妻无码 | 亚洲欧洲中文日韩av乱码 | 亚洲精品一区二区三区在线观看 | 人人妻人人澡人人爽人人精品浪潮 | 欧洲精品码一区二区三区免费看 | 久久 国产 尿 小便 嘘嘘 | 老头边吃奶边弄进去呻吟 | 女人被爽到呻吟gif动态图视看 | 久久人人爽人人人人片 | 最新国产麻豆aⅴ精品无码 | 精品少妇爆乳无码av无码专区 | 天堂一区人妻无码 | 国产精品久久久久久久影院 | 亚洲人成网站色7799 | 任你躁在线精品免费 | 亚洲经典千人经典日产 | 日本丰满护士爆乳xxxx | 色窝窝无码一区二区三区色欲 | 日本欧美一区二区三区乱码 | 一区二区三区高清视频一 | 中文精品无码中文字幕无码专区 | 国产av无码专区亚洲a∨毛片 | 久久久久亚洲精品男人的天堂 | 亚洲成av人综合在线观看 | 久久99精品久久久久久 | 亚洲 高清 成人 动漫 | 国产成人精品一区二区在线小狼 | 久久国产精品二国产精品 | 99精品无人区乱码1区2区3区 | 午夜免费福利小电影 | 亚洲色大成网站www国产 | 日本一卡二卡不卡视频查询 | 人妻插b视频一区二区三区 | 风流少妇按摩来高潮 | 在线а√天堂中文官网 | 丰满岳乱妇在线观看中字无码 | 少妇一晚三次一区二区三区 | 任你躁国产自任一区二区三区 | 亚洲一区二区三区含羞草 | 国产超碰人人爽人人做人人添 | 国产免费久久久久久无码 | 精品久久综合1区2区3区激情 | 蜜桃视频插满18在线观看 | 欧美zoozzooz性欧美 | 无码纯肉视频在线观看 | 精品欧美一区二区三区久久久 | 亚洲欧美国产精品专区久久 | 狂野欧美性猛xxxx乱大交 | 老子影院午夜精品无码 | 四虎国产精品免费久久 | 内射爽无广熟女亚洲 | 免费国产成人高清在线观看网站 | 女人色极品影院 | 18无码粉嫩小泬无套在线观看 | 久久久久久亚洲精品a片成人 | 巨爆乳无码视频在线观看 | 欧美精品在线观看 | 中国大陆精品视频xxxx | 一区二区三区乱码在线 | 欧洲 | 亚洲va中文字幕无码久久不卡 | 丰满人妻一区二区三区免费视频 | 欧美喷潮久久久xxxxx | 色欲综合久久中文字幕网 | 日本丰满护士爆乳xxxx | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 中文字幕乱码人妻无码久久 | 夜夜高潮次次欢爽av女 | 日韩亚洲欧美中文高清在线 | 国产精品人人爽人人做我的可爱 | 亚洲综合色区中文字幕 | 无码帝国www无码专区色综合 | 精品水蜜桃久久久久久久 | 成人精品天堂一区二区三区 | 激情内射日本一区二区三区 | 亚洲欧美国产精品专区久久 | 一区二区三区高清视频一 | 无码人妻久久一区二区三区不卡 | 曰韩无码二三区中文字幕 | 亚欧洲精品在线视频免费观看 | 极品嫩模高潮叫床 | 曰韩少妇内射免费播放 | 国产精品久久久久久久影院 | 999久久久国产精品消防器材 | 熟妇激情内射com | 无码人中文字幕 | 日日夜夜撸啊撸 | 麻花豆传媒剧国产免费mv在线 | 任你躁在线精品免费 | 蜜桃臀无码内射一区二区三区 | 久久人人爽人人爽人人片av高清 | 日本爽爽爽爽爽爽在线观看免 | 亚洲精品国产品国语在线观看 | 国产另类ts人妖一区二区 | 十八禁真人啪啪免费网站 | 人人妻人人藻人人爽欧美一区 | 日本一区二区更新不卡 | √天堂资源地址中文在线 | 免费人成在线观看网站 | 男人扒开女人内裤强吻桶进去 | 国内丰满熟女出轨videos | 人妻少妇精品久久 | 男人的天堂2018无码 | 午夜精品久久久久久久 | 噜噜噜亚洲色成人网站 | 六月丁香婷婷色狠狠久久 | 精品国产一区二区三区av 性色 | 久久久久久久人妻无码中文字幕爆 | 国产精品对白交换视频 | 日本免费一区二区三区最新 | 亚洲熟悉妇女xxx妇女av | 日本大乳高潮视频在线观看 | 亚洲国产精华液网站w | 超碰97人人做人人爱少妇 | 亚洲人成无码网www | 亚洲男人av香蕉爽爽爽爽 | 狂野欧美激情性xxxx | 色综合久久网 | 久久97精品久久久久久久不卡 | 欧美日本精品一区二区三区 | 亚洲爆乳大丰满无码专区 | 初尝人妻少妇中文字幕 | 精品无码成人片一区二区98 | 亚洲人成无码网www | 亚洲精品一区二区三区在线 | 一个人看的www免费视频在线观看 | 日韩 欧美 动漫 国产 制服 | 日日摸天天摸爽爽狠狠97 | 少妇太爽了在线观看 | 99久久婷婷国产综合精品青草免费 | 国产精品无码久久av | 日本又色又爽又黄的a片18禁 | 乱人伦人妻中文字幕无码久久网 | 在教室伦流澡到高潮hnp视频 | 乌克兰少妇性做爰 | 亚洲人成网站在线播放942 | 精品国产av色一区二区深夜久久 | 亚洲熟妇色xxxxx欧美老妇 | 一个人免费观看的www视频 | 高潮毛片无遮挡高清免费 | 全黄性性激高免费视频 | 一本色道婷婷久久欧美 | 国产精品人人妻人人爽 | 精品厕所偷拍各类美女tp嘘嘘 | 亚洲精品成a人在线观看 | 综合人妻久久一区二区精品 | 欧美三级不卡在线观看 | 波多野结衣高清一区二区三区 | 无码国模国产在线观看 | 蜜桃av抽搐高潮一区二区 | 无码av最新清无码专区吞精 | 久久99精品久久久久婷婷 | 中文字幕无码av激情不卡 | 亚洲精品国产精品乱码不卡 | 色综合久久久无码中文字幕 | 无码福利日韩神码福利片 | 天天躁夜夜躁狠狠是什么心态 | 四十如虎的丰满熟妇啪啪 | 国产成人午夜福利在线播放 | 在线观看国产午夜福利片 | 欧洲欧美人成视频在线 | 国产成人无码av在线影院 | 人妻无码αv中文字幕久久琪琪布 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 人人爽人人爽人人片av亚洲 | 国产成人精品三级麻豆 | 18精品久久久无码午夜福利 | 亚洲日韩av一区二区三区四区 | 综合网日日天干夜夜久久 | 日产国产精品亚洲系列 | 美女扒开屁股让男人桶 | 99久久久国产精品无码免费 | 欧美老妇交乱视频在线观看 | 午夜男女很黄的视频 | 99er热精品视频 | 特级做a爰片毛片免费69 | 欧美亚洲日韩国产人成在线播放 | 少妇厨房愉情理9仑片视频 | 日韩精品无码一本二本三本色 | 欧美性色19p | 在线亚洲高清揄拍自拍一品区 | 国产精品人人爽人人做我的可爱 | 无码人妻出轨黑人中文字幕 | 少妇高潮喷潮久久久影院 | 国产人妖乱国产精品人妖 | 人人爽人人爽人人片av亚洲 | 无人区乱码一区二区三区 | 久久午夜夜伦鲁鲁片无码免费 | 秋霞成人午夜鲁丝一区二区三区 | 久久久久久久久蜜桃 | 久久99精品国产麻豆蜜芽 | 国产av一区二区精品久久凹凸 | 人人妻人人澡人人爽欧美一区 | 高潮毛片无遮挡高清免费 | 国产在线精品一区二区高清不卡 | 亚洲第一网站男人都懂 | 人妻少妇精品无码专区动漫 | 欧洲精品码一区二区三区免费看 | 在教室伦流澡到高潮hnp视频 | 精品无码一区二区三区的天堂 | 亚洲一区二区三区在线观看网站 | 少妇性荡欲午夜性开放视频剧场 | 国产精品亚洲а∨无码播放麻豆 | 国产做国产爱免费视频 | 2020久久超碰国产精品最新 | 99在线 | 亚洲 | 无码人妻久久一区二区三区不卡 | 国产精品久久久久无码av色戒 | 国产精品爱久久久久久久 | 日本大乳高潮视频在线观看 | 天堂一区人妻无码 | 欧美 日韩 亚洲 在线 | 亚洲精品中文字幕 | 美女黄网站人色视频免费国产 | 特黄特色大片免费播放器图片 | 久久人人爽人人人人片 | 欧美三级不卡在线观看 | 久久久婷婷五月亚洲97号色 | 中文字幕无码乱人伦 | 国产手机在线αⅴ片无码观看 | 久久久久se色偷偷亚洲精品av | 国产在线精品一区二区三区直播 | 娇妻被黑人粗大高潮白浆 | 妺妺窝人体色www婷婷 | 性色欲情网站iwww九文堂 | 无码一区二区三区在线 | 丰满妇女强制高潮18xxxx | 内射巨臀欧美在线视频 | 久久久久久a亚洲欧洲av冫 | 18黄暴禁片在线观看 | 六月丁香婷婷色狠狠久久 | 成年美女黄网站色大免费全看 | 丰满少妇女裸体bbw | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 国产超级va在线观看视频 | 377p欧洲日本亚洲大胆 | 欧洲极品少妇 | 欧美日韩色另类综合 | 天堂无码人妻精品一区二区三区 | 成人欧美一区二区三区 | 精品日本一区二区三区在线观看 | 纯爱无遮挡h肉动漫在线播放 | 久久精品99久久香蕉国产色戒 | 国产人妻人伦精品1国产丝袜 | 激情五月综合色婷婷一区二区 | 亚洲爆乳无码专区 | 亚洲日本一区二区三区在线 | 男人的天堂2018无码 | 狠狠噜狠狠狠狠丁香五月 | 亚洲精品鲁一鲁一区二区三区 | 夜夜夜高潮夜夜爽夜夜爰爰 | 久久天天躁夜夜躁狠狠 | yw尤物av无码国产在线观看 | 老司机亚洲精品影院 | 强辱丰满人妻hd中文字幕 | 麻豆精品国产精华精华液好用吗 | 久久午夜无码鲁丝片 | 丰满少妇熟乱xxxxx视频 | 国产成人午夜福利在线播放 | 亚洲中文字幕无码中文字在线 | 精品厕所偷拍各类美女tp嘘嘘 | 青草青草久热国产精品 | 无码av免费一区二区三区试看 | 久久综合九色综合欧美狠狠 | 精品偷拍一区二区三区在线看 | 国产婷婷色一区二区三区在线 | 婷婷综合久久中文字幕蜜桃三电影 | 久久国产精品精品国产色婷婷 | 色婷婷香蕉在线一区二区 | 国产精品多人p群无码 | 国产三级久久久精品麻豆三级 | 人妻与老人中文字幕 | 亚洲欧美日韩成人高清在线一区 | 国产精品久久久久9999小说 | 无码人妻少妇伦在线电影 | 日本www一道久久久免费榴莲 | 亚洲精品国产精品乱码不卡 | 亚洲精品中文字幕 | 日本高清一区免费中文视频 | 精品久久久无码中文字幕 | 少妇人妻大乳在线视频 | 国产成人精品一区二区在线小狼 | 久久精品成人欧美大片 | 国产免费久久精品国产传媒 | 我要看www免费看插插视频 | 无遮挡啪啪摇乳动态图 | 欧美 丝袜 自拍 制服 另类 | 玩弄人妻少妇500系列视频 | 精品国产av色一区二区深夜久久 | 久久久久成人片免费观看蜜芽 | 蜜桃视频插满18在线观看 | 国产亚洲美女精品久久久2020 | 4hu四虎永久在线观看 | 国产人妻大战黑人第1集 | 又紧又大又爽精品一区二区 | 精品久久久无码中文字幕 | 人妻少妇精品无码专区动漫 | 中文无码伦av中文字幕 | 亚洲va中文字幕无码久久不卡 | 亚洲精品中文字幕 | 丝袜人妻一区二区三区 | 四虎国产精品免费久久 | 色综合久久88色综合天天 | 全黄性性激高免费视频 | 中文字幕av伊人av无码av | 麻豆人妻少妇精品无码专区 | 欧美黑人性暴力猛交喷水 | 精品无码成人片一区二区98 | 亚洲一区二区观看播放 | 亚洲s色大片在线观看 | 亚洲精品综合一区二区三区在线 | 欧美性猛交xxxx富婆 | 无码成人精品区在线观看 | 熟女少妇在线视频播放 | 亚洲国产精华液网站w | 久久综合色之久久综合 | 沈阳熟女露脸对白视频 | 色 综合 欧美 亚洲 国产 | 色偷偷av老熟女 久久精品人妻少妇一区二区三区 | 久久久久久久女国产乱让韩 | 曰本女人与公拘交酡免费视频 | 麻豆国产97在线 | 欧洲 | 欧美阿v高清资源不卡在线播放 | 香蕉久久久久久av成人 | 丰满人妻一区二区三区免费视频 | 精品无人区无码乱码毛片国产 | 天天拍夜夜添久久精品大 | 日韩人妻无码中文字幕视频 | 色综合久久久无码网中文 | 无码精品国产va在线观看dvd | 亚洲一区二区三区 | 十八禁真人啪啪免费网站 | 日本一本二本三区免费 | 国产热a欧美热a在线视频 | 又大又紧又粉嫩18p少妇 | 99久久久国产精品无码免费 | 亚洲一区二区三区 | 综合网日日天干夜夜久久 | 亚洲爆乳无码专区 | 国产成人无码av一区二区 | 欧美成人高清在线播放 | 欧美 日韩 人妻 高清 中文 | 国产亚洲视频中文字幕97精品 | 福利一区二区三区视频在线观看 | 国内精品久久久久久中文字幕 | 亚洲熟妇色xxxxx欧美老妇y | 伊人久久大香线蕉午夜 | 欧美精品一区二区精品久久 | 人妻无码αv中文字幕久久琪琪布 | 久久久精品欧美一区二区免费 | 无码国模国产在线观看 | 99精品国产综合久久久久五月天 | 国内少妇偷人精品视频免费 | 少妇人妻av毛片在线看 | 欧美日本日韩 | 少妇无码av无码专区在线观看 | 无码成人精品区在线观看 | 国产精品久久精品三级 | 大色综合色综合网站 | 亚洲一区二区三区含羞草 | 亚洲中文字幕在线观看 | 四虎国产精品一区二区 | 色欲av亚洲一区无码少妇 | 精品欧洲av无码一区二区三区 | 永久黄网站色视频免费直播 | 亚洲综合精品香蕉久久网 | 亚洲欧美色中文字幕在线 | 黑森林福利视频导航 | 精品久久久久香蕉网 | 国产精品嫩草久久久久 | 国产激情无码一区二区app | 亚洲а∨天堂久久精品2021 | 久久无码专区国产精品s | 蜜桃av抽搐高潮一区二区 | 欧美人与动性行为视频 | 青青青爽视频在线观看 | 中文字幕+乱码+中文字幕一区 | 国产国产精品人在线视 | 久久久久久久女国产乱让韩 | 男人的天堂2018无码 | 亚洲精品一区二区三区婷婷月 | 高潮喷水的毛片 | 久久国产自偷自偷免费一区调 | 欧美人与牲动交xxxx | 国产午夜视频在线观看 | 综合人妻久久一区二区精品 | 久精品国产欧美亚洲色aⅴ大片 | 国产色精品久久人妻 | 少妇久久久久久人妻无码 | 在线亚洲高清揄拍自拍一品区 | 欧美zoozzooz性欧美 | 欧美精品一区二区精品久久 | 久久精品国产大片免费观看 | 免费人成在线观看网站 | 色婷婷综合激情综在线播放 | 国产超级va在线观看视频 | 国产偷自视频区视频 | 精品 日韩 国产 欧美 视频 | 久久久久久av无码免费看大片 | 任你躁国产自任一区二区三区 | 免费看男女做好爽好硬视频 | 久久人人爽人人爽人人片av高清 | 成年美女黄网站色大免费全看 | 欧美freesex黑人又粗又大 | 男女超爽视频免费播放 | 台湾无码一区二区 | 欧美成人午夜精品久久久 | 波多野结衣一区二区三区av免费 | 大地资源网第二页免费观看 | 成人一区二区免费视频 | 在线 国产 欧美 亚洲 天堂 | 日韩欧美中文字幕公布 | 欧美丰满老熟妇xxxxx性 | 亚洲精品一区国产 | 天天拍夜夜添久久精品大 | 久久久久免费看成人影片 | a在线观看免费网站大全 | 伊人久久婷婷五月综合97色 | 亚洲娇小与黑人巨大交 | 麻豆国产人妻欲求不满谁演的 | 国产超碰人人爽人人做人人添 | 久久综合色之久久综合 | 欧美熟妇另类久久久久久不卡 | 国产在线无码精品电影网 | 国产熟妇另类久久久久 | 国产精品无码久久av | 国产偷国产偷精品高清尤物 | 国色天香社区在线视频 | 精品国产福利一区二区 | 无码播放一区二区三区 | 日本大香伊一区二区三区 | 无码乱肉视频免费大全合集 | 日本熟妇乱子伦xxxx | 亚洲国产欧美国产综合一区 | 日韩亚洲欧美中文高清在线 | 日本大香伊一区二区三区 | 日本欧美一区二区三区乱码 | 熟女少妇人妻中文字幕 | 人妻少妇精品无码专区动漫 | 亚洲 a v无 码免 费 成 人 a v | 夜夜躁日日躁狠狠久久av | 日韩欧美群交p片內射中文 | 高潮毛片无遮挡高清免费 | 青春草在线视频免费观看 | 老太婆性杂交欧美肥老太 | 综合激情五月综合激情五月激情1 | 国产av一区二区三区最新精品 | 亚洲a无码综合a国产av中文 | 扒开双腿疯狂进出爽爽爽视频 | 夜夜夜高潮夜夜爽夜夜爰爰 | 午夜精品久久久内射近拍高清 | 亚洲人成网站免费播放 | 老司机亚洲精品影院无码 | 亚洲经典千人经典日产 | 亚洲娇小与黑人巨大交 | 无码毛片视频一区二区本码 | аⅴ资源天堂资源库在线 | 欧美色就是色 | 一本大道久久东京热无码av | 在线а√天堂中文官网 | 97精品国产97久久久久久免费 | 亚洲成av人在线观看网址 | 国产尤物精品视频 | 美女极度色诱视频国产 | ass日本丰满熟妇pics | 中文字幕日产无线码一区 | 欧美性生交活xxxxxdddd | 成人性做爰aaa片免费看不忠 | 无码纯肉视频在线观看 | 成人无码精品1区2区3区免费看 | 人人妻人人澡人人爽欧美精品 | 亚洲熟悉妇女xxx妇女av | 美女极度色诱视频国产 | 婷婷综合久久中文字幕蜜桃三电影 | 人人超人人超碰超国产 | 色欲人妻aaaaaaa无码 | 国精产品一品二品国精品69xx | 成人试看120秒体验区 | 亚洲一区二区三区四区 | 人妻人人添人妻人人爱 | 九一九色国产 | 色婷婷久久一区二区三区麻豆 | 妺妺窝人体色www婷婷 | 亚洲欧美综合区丁香五月小说 | 日本熟妇人妻xxxxx人hd | 激情爆乳一区二区三区 | 国产美女精品一区二区三区 | 牲交欧美兽交欧美 | 2019午夜福利不卡片在线 | 亚洲 a v无 码免 费 成 人 a v | 久久久中文字幕日本无吗 | 97久久精品无码一区二区 | 久久午夜无码鲁丝片秋霞 | 国产真实乱对白精彩久久 | 性生交大片免费看女人按摩摩 | 色偷偷人人澡人人爽人人模 | 国产色视频一区二区三区 | 麻豆蜜桃av蜜臀av色欲av | 亚洲人成无码网www | 色综合久久久久综合一本到桃花网 | 鲁一鲁av2019在线 | 欧美日韩久久久精品a片 | 久久精品人人做人人综合试看 | 精品无码国产一区二区三区av | www成人国产高清内射 | 亚洲日本va中文字幕 | aⅴ亚洲 日韩 色 图网站 播放 | 国产精品永久免费视频 | 日韩精品无码一本二本三本色 | 亚洲精品久久久久久一区二区 | 久久人人97超碰a片精品 | 呦交小u女精品视频 | 荫蒂添的好舒服视频囗交 | 天天爽夜夜爽夜夜爽 | 性色欲情网站iwww九文堂 | 欧美猛少妇色xxxxx | 国产无套粉嫩白浆在线 | 国产另类ts人妖一区二区 | 欧美丰满老熟妇xxxxx性 | 伦伦影院午夜理论片 | 亚洲精品成a人在线观看 | 天天躁日日躁狠狠躁免费麻豆 | 少妇邻居内射在线 | 国产av无码专区亚洲awww | 天天拍夜夜添久久精品 | 国产人成高清在线视频99最全资源 | 毛片内射-百度 | 搡女人真爽免费视频大全 | 国产国产精品人在线视 | 欧美自拍另类欧美综合图片区 | 国产9 9在线 | 中文 | 天天躁夜夜躁狠狠是什么心态 | 亚洲乱亚洲乱妇50p | 色婷婷av一区二区三区之红樱桃 | 日韩精品成人一区二区三区 | 欧美日韩视频无码一区二区三 | 亚洲大尺度无码无码专区 | 啦啦啦www在线观看免费视频 | 国产午夜无码精品免费看 | 精品欧美一区二区三区久久久 | 亚洲爆乳大丰满无码专区 | 最近的中文字幕在线看视频 | 国产亚av手机在线观看 | 97久久国产亚洲精品超碰热 | 夜先锋av资源网站 | 久久亚洲a片com人成 | 国产精品a成v人在线播放 | 色综合久久久久综合一本到桃花网 | 久久精品国产99久久6动漫 | 久精品国产欧美亚洲色aⅴ大片 | 1000部夫妻午夜免费 | 日日碰狠狠躁久久躁蜜桃 | 久久久久亚洲精品男人的天堂 | 清纯唯美经典一区二区 | 无套内谢的新婚少妇国语播放 | 色窝窝无码一区二区三区色欲 | 乌克兰少妇xxxx做受 | 一本久久a久久精品vr综合 | 亚洲欧美综合区丁香五月小说 | 久久国产精品精品国产色婷婷 | 久久精品人人做人人综合 | 装睡被陌生人摸出水好爽 | 国产成人综合在线女婷五月99播放 | 无码国产色欲xxxxx视频 | 久久久中文久久久无码 | 亚洲精品美女久久久久久久 | 中文字幕无码av波多野吉衣 | 国内揄拍国内精品人妻 | 人妻人人添人妻人人爱 | 精品国产成人一区二区三区 | www国产精品内射老师 | 少女韩国电视剧在线观看完整 | 中文字幕乱码人妻无码久久 | 日韩成人一区二区三区在线观看 | 久久久精品人妻久久影视 | 色诱久久久久综合网ywww | 18黄暴禁片在线观看 | 国产卡一卡二卡三 | 久久这里只有精品视频9 | 搡女人真爽免费视频大全 | 中文字幕精品av一区二区五区 | 97久久精品无码一区二区 | 国产熟女一区二区三区四区五区 | 强伦人妻一区二区三区视频18 | а天堂中文在线官网 | 搡女人真爽免费视频大全 | 亚洲 欧美 激情 小说 另类 | 乱人伦中文视频在线观看 | 少妇性l交大片 | 久久精品女人天堂av免费观看 | 中文字幕无码免费久久9一区9 | 永久免费观看国产裸体美女 | 少妇愉情理伦片bd | 国产日产欧产精品精品app | 又大又黄又粗又爽的免费视频 | 宝宝好涨水快流出来免费视频 | 欧美xxxxx精品 | аⅴ资源天堂资源库在线 | 精品国偷自产在线 | 国产 浪潮av性色四虎 | 日韩无套无码精品 | 亚洲热妇无码av在线播放 | 欧美一区二区三区 | 人人妻人人澡人人爽人人精品浪潮 | 日韩人妻少妇一区二区三区 | 内射爽无广熟女亚洲 | 成人一区二区免费视频 | 国产特级毛片aaaaaa高潮流水 | 久久久久成人精品免费播放动漫 | 熟妇人妻中文av无码 | 无码免费一区二区三区 | a国产一区二区免费入口 | 亚洲成a人片在线观看无码3d | 女人和拘做爰正片视频 | 青草青草久热国产精品 | 桃花色综合影院 | 亚洲一区二区三区国产精华液 | 激情亚洲一区国产精品 | 国产精品高潮呻吟av久久 | 国产精品美女久久久久av爽李琼 | 在线观看国产一区二区三区 | 天天av天天av天天透 | 天海翼激烈高潮到腰振不止 | 一本无码人妻在中文字幕免费 | 久久99热只有频精品8 | 九九热爱视频精品 | 亚洲一区二区三区四区 | 麻花豆传媒剧国产免费mv在线 | 亚洲理论电影在线观看 | 国产香蕉97碰碰久久人人 | 丰满少妇人妻久久久久久 | 人妻天天爽夜夜爽一区二区 | 日韩亚洲欧美中文高清在线 | 国产熟妇另类久久久久 | 日韩少妇白浆无码系列 | 日本免费一区二区三区最新 | 人人澡人人透人人爽 | 18禁止看的免费污网站 | 波多野结衣乳巨码无在线观看 | 国产日产欧产精品精品app | 18黄暴禁片在线观看 | 色欲av亚洲一区无码少妇 | 狠狠色色综合网站 | 亚洲熟妇自偷自拍另类 | 欧美日本免费一区二区三区 | 97资源共享在线视频 | 国产无套粉嫩白浆在线 | 中文字幕无线码免费人妻 | 女人被男人躁得好爽免费视频 | 国产成人无码av片在线观看不卡 | 中文字幕无码免费久久9一区9 | 荫蒂添的好舒服视频囗交 | 精品成人av一区二区三区 | 人妻少妇精品无码专区二区 | 亚洲日韩精品欧美一区二区 | 国产精品-区区久久久狼 | 亚洲一区二区三区国产精华液 | 国产xxx69麻豆国语对白 | 中文字幕 亚洲精品 第1页 | 乱人伦人妻中文字幕无码久久网 | 亚洲s码欧洲m码国产av | 久久视频在线观看精品 | 纯爱无遮挡h肉动漫在线播放 | 欧美日韩一区二区综合 | 国产精品美女久久久网av | 曰本女人与公拘交酡免费视频 | 人妻少妇精品视频专区 | 亚洲中文字幕无码中文字在线 | 精品国产aⅴ无码一区二区 | 国产在线精品一区二区三区直播 | 国产精品va在线观看无码 | 麻豆果冻传媒2021精品传媒一区下载 | www国产亚洲精品久久网站 | а天堂中文在线官网 | 国产精品理论片在线观看 | 国产99久久精品一区二区 | 成人一区二区免费视频 | 中文字幕无码日韩欧毛 | 精品人妻中文字幕有码在线 | 一本加勒比波多野结衣 | 国产成人综合在线女婷五月99播放 | 麻豆人妻少妇精品无码专区 | 乱中年女人伦av三区 | 高清国产亚洲精品自在久久 | 中文无码成人免费视频在线观看 | 大屁股大乳丰满人妻 | 国产日产欧产精品精品app | 成 人 免费观看网站 | 人人妻人人澡人人爽欧美一区九九 | 日本va欧美va欧美va精品 | 国产成人无码专区 | 伊人久久大香线蕉午夜 | 婷婷综合久久中文字幕蜜桃三电影 | 亚洲精品久久久久久一区二区 | 成在人线av无码免费 | 内射白嫩少妇超碰 | 无遮无挡爽爽免费视频 | 一本色道婷婷久久欧美 | 少妇一晚三次一区二区三区 | 人妻少妇精品无码专区二区 | 亚洲日韩av一区二区三区四区 | 日本丰满熟妇videos | 大肉大捧一进一出好爽视频 | 377p欧洲日本亚洲大胆 | 欧美日韩色另类综合 | 一本大道久久东京热无码av | 88国产精品欧美一区二区三区 | 久久成人a毛片免费观看网站 | 中国女人内谢69xxxxxa片 | 性生交大片免费看l | 偷窥村妇洗澡毛毛多 | 最新国产麻豆aⅴ精品无码 | 亚洲va中文字幕无码久久不卡 | 色综合久久88色综合天天 | 欧美刺激性大交 | 亚洲色欲色欲天天天www | 一区二区三区乱码在线 | 欧洲 | 国产亚洲精品久久久久久久久动漫 | 亚洲s码欧洲m码国产av | 狂野欧美性猛交免费视频 | 久久无码专区国产精品s | 大肉大捧一进一出好爽视频 | 久久久精品国产sm最大网站 | 97精品国产97久久久久久免费 | 国产亚洲日韩欧美另类第八页 | 九九综合va免费看 | 国精产品一品二品国精品69xx | 欧洲熟妇精品视频 | 精品一区二区三区无码免费视频 | 中文字幕精品av一区二区五区 | 久久午夜无码鲁丝片 | 亚洲欧美色中文字幕在线 | 亚洲国产高清在线观看视频 | 青青久在线视频免费观看 | 国产内射爽爽大片视频社区在线 | 中国女人内谢69xxxx | 国产精品久久久一区二区三区 | 色综合久久久久综合一本到桃花网 | 国产午夜精品一区二区三区嫩草 | 久久人人爽人人人人片 | 美女黄网站人色视频免费国产 | 国产精品鲁鲁鲁 | 国产精品无码一区二区三区不卡 | 香港三级日本三级妇三级 | 红桃av一区二区三区在线无码av | 男人和女人高潮免费网站 | 无码人妻精品一区二区三区不卡 | 色综合久久网 | 亚洲啪av永久无码精品放毛片 | 国产成人综合在线女婷五月99播放 | 无码精品国产va在线观看dvd | 精品人妻人人做人人爽 | 曰本女人与公拘交酡免费视频 | 亚洲中文字幕无码中字 | 伊人久久大香线焦av综合影院 | 18禁黄网站男男禁片免费观看 | 免费观看又污又黄的网站 | 日韩在线不卡免费视频一区 | 久久精品人人做人人综合 | 国产成人一区二区三区别 | 少妇愉情理伦片bd | 一区二区三区乱码在线 | 欧洲 | 欧美日韩亚洲国产精品 | 无码人妻av免费一区二区三区 | 18无码粉嫩小泬无套在线观看 | 黑人粗大猛烈进出高潮视频 | 无码一区二区三区在线观看 | 麻豆果冻传媒2021精品传媒一区下载 | 国产人妻人伦精品1国产丝袜 | 亚洲国产高清在线观看视频 | 无码乱肉视频免费大全合集 | 午夜熟女插插xx免费视频 | 大色综合色综合网站 | 亚洲中文字幕久久无码 | 亚洲成av人在线观看网址 | 东京无码熟妇人妻av在线网址 | 国产亚洲精品久久久闺蜜 | av无码电影一区二区三区 | 免费国产黄网站在线观看 | 精品国产一区av天美传媒 | 久久亚洲日韩精品一区二区三区 | 日本大乳高潮视频在线观看 | 日本一卡2卡3卡四卡精品网站 | 激情内射日本一区二区三区 | 亚洲国产午夜精品理论片 | 亚洲啪av永久无码精品放毛片 | 亚洲国产成人a精品不卡在线 | 欧美日韩综合一区二区三区 | 东北女人啪啪对白 | 国产超级va在线观看视频 | 色窝窝无码一区二区三区色欲 | 麻花豆传媒剧国产免费mv在线 | 亚洲精品一区二区三区婷婷月 | 色欲综合久久中文字幕网 | a在线观看免费网站大全 | 精品日本一区二区三区在线观看 | 久久久婷婷五月亚洲97号色 | 蜜桃视频插满18在线观看 | 成人欧美一区二区三区黑人免费 | 丰满少妇熟乱xxxxx视频 | aⅴ亚洲 日韩 色 图网站 播放 | 日韩在线不卡免费视频一区 | 色妞www精品免费视频 | 日本精品人妻无码77777 天堂一区人妻无码 | a片免费视频在线观看 | 国产亚洲日韩欧美另类第八页 | 国产尤物精品视频 | 在线播放免费人成毛片乱码 | 欧美国产日韩亚洲中文 | 国产艳妇av在线观看果冻传媒 | 扒开双腿吃奶呻吟做受视频 | 无码人妻丰满熟妇区五十路百度 | 亚洲一区二区三区香蕉 | 亚洲国精产品一二二线 | 99riav国产精品视频 | 欧洲美熟女乱又伦 | 最近免费中文字幕中文高清百度 | 国内综合精品午夜久久资源 | 色妞www精品免费视频 | 性欧美牲交xxxxx视频 | 亚洲一区二区三区含羞草 | 国产色精品久久人妻 | 国产精品人人妻人人爽 | 国产9 9在线 | 中文 | 亚洲午夜福利在线观看 | 亚洲精品一区二区三区大桥未久 | 综合激情五月综合激情五月激情1 | 一本久久a久久精品vr综合 | 未满小14洗澡无码视频网站 | 欧美性生交活xxxxxdddd | 国产成人无码av片在线观看不卡 | www国产亚洲精品久久久日本 | 久久国产精品偷任你爽任你 | 精品夜夜澡人妻无码av蜜桃 | 亚洲一区av无码专区在线观看 | 欧美丰满熟妇xxxx性ppx人交 | v一区无码内射国产 | 久久国产精品精品国产色婷婷 | 国产精品嫩草久久久久 | 中文字幕人妻无码一区二区三区 | 色综合久久网 | 国产在线精品一区二区三区直播 | 网友自拍区视频精品 | 又色又爽又黄的美女裸体网站 | 少妇无套内谢久久久久 | 一区二区三区高清视频一 | 男人的天堂2018无码 | 中文无码成人免费视频在线观看 | 性欧美疯狂xxxxbbbb | 日本护士毛茸茸高潮 | 国产超碰人人爽人人做人人添 | 亚洲国产一区二区三区在线观看 | 东京热一精品无码av | 国产suv精品一区二区五 | 97久久国产亚洲精品超碰热 | 亚洲成av人综合在线观看 |