3atv精品不卡视频,97人人超碰国产精品最新,中文字幕av一区二区三区人妻少妇,久久久精品波多野结衣,日韩一区二区三区精品

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

压箱底总结:流系统端到端一致性对比

發(fā)布時間:2024/4/11 windows 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 压箱底总结:流系统端到端一致性对比 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

點擊上方“朱小廝的博客”,選擇“設(shè)為星標”

回復”666“獲取公眾號專屬資料

分布式最難的2個問題:

1、Exactly Once Message processing。

2、保證消息處理順序。

我們今天著重來討論一下:

  • 為什么很難;

  • 怎么解。

前言

這篇文章可以說是作者壓箱底兒的知識總結(jié)(之一,畢竟作者學的東西很雜 ╮( ̄▽ ̄"")╭ )了,斷斷續(xù)續(xù)寫了將近三個月, 耗費了大量的精力。

本來的目的只是想對比一下各個state of art的流系統(tǒng)有什么不同, 但是寫出來之后只是亂糟糟的羅列數(shù)據(jù)和資料,像這樣列數(shù)據(jù)一樣,“介紹下這個framework這樣實現(xiàn)的,所以有這樣的特性”,“那個是那樣的”.... blablabla... 使得文章只停留于表層,這樣寫并不是一個好的筆記。

我想記錄的是更本質(zhì)和更精髓的一些東西:我想更深入的探討一個分布式系統(tǒng)的設(shè)計是被什么現(xiàn)實和本質(zhì)問題所逼迫的結(jié)果, 2個不同的設(shè)計到底是在哪個本質(zhì)問題上分道揚鑣才造成了系統(tǒng)設(shè)計如此的不同。

追尋問題和解法的前因后果,讓未來的自己一眼掃過去之后能夠自然而然的回憶和理解出: "嗯,對,在這種現(xiàn)實限制下,要達到這種效果就必須這么做", 我想從亂糟糟的觀察到的混亂表象中,抽象出流系統(tǒng)所面對的問題和解決方案的本質(zhì),這就是本文的目的,和文章中除開引用文獻之外,作者所貢獻的一些自己的思考。

就作者學習流系統(tǒng)的感受來看, 流系統(tǒng)有2個難點:

  • 第一是end to end consistency,或者說exactly once msg processing;

  • 第二是event time based window操作。

本來作者想用一篇文章同時概括和比較這2點,無奈第一點寫完, 文章已經(jīng)長度爆炸。于是分開2篇,此為上篇, 著重于從分布式系統(tǒng)的本質(zhì)問題出發(fā), 從最底層的各種"不可能", 和它們的解(比如:consensus協(xié)議)開始, 一層一層的遞進到高層的流系統(tǒng)中, 如何實現(xiàn)容錯場景下的end to end consistency,或者說exactly once msg processing。

目錄

流系統(tǒng)的具體對比在“9.流系統(tǒng)的EOMP”這一節(jié), 前邊都是準備知識... -_-

1、一些術(shù)語

2、圣光(廣告)不會告訴你的事

3、幾個事實

4、Liveness和Safety的取舍

5、絕望中的曙光

6、Zombie Fencing

7、三節(jié)點間的EOMP

8、加入節(jié)點狀態(tài)的三節(jié)點間的EOMP

9、流系統(tǒng)的EOMP

10、異步增量checkpointing

11、系統(tǒng)內(nèi)與系統(tǒng)外

12、Latency, 冪等和non-deterministic

13、REFERENCE

一些術(shù)語

1、端到端一致性end to end Consistency

一致性其實就是業(yè)務(wù)正確性, 在不同的業(yè)務(wù)場景有不同的意思, 在"流系統(tǒng)中間件"這個業(yè)務(wù)領(lǐng)域, 端到端的一致性就代表Exact once msg processing, 一個消息只被處理一次,造成一次效果。

注意: 這里的"一個消息"代表"邏輯上的一個", 即application對中間件的期待就是把此消息作為一個來處理, 而不是指消息本身的值相等。比如要求計數(shù)+1的一個消息, 消息本身的內(nèi)容可能一模一樣, 但是application發(fā)來2次相同消息的"本意"就是要計數(shù)兩次, 那么中間件就應(yīng)該處理兩次, 如果application由于超時重發(fā)了本意只想讓中間件處理一次的+1操作, 那么中間件就應(yīng)該處理一次。

中間件怎么能區(qū)分application的"本意"來決策到底處理一次還是多次, 是end to end consistency的關(guān)鍵。

2、EOMP

由于Exactly once msg processing太經(jīng)常出現(xiàn), 我們用EOMP來代替簡寫一下。

3、容錯failure tolerance

為了方便討論,后邊談到failure, 我們指的都是crash failure, 你可以想象是任何可以造成“把機器砸了然后任何本地狀態(tài)丟失(比如硬盤損壞)一樣效果的情況出現(xiàn)"。

在今天的虛擬云時代,這其實很常見,比如container或者虛擬機被resource manager突然kill掉回收了, 那么即使物理機其實沒有問題, 你的application的邏輯節(jié)點也是被完全銷毀的樣子。

容錯在end to end Consistency的語義下,是指在機器掛了,網(wǎng)絡(luò)鏈接斷開...等情況下,系統(tǒng)的運算結(jié)果和沒有任何failure發(fā)生時是一摸一樣的。

3、Effective once msg processing(應(yīng)該翻成有效一次性處理?)

后邊我們可以看到, 保證字面上的Exact once msg processing(即整個系統(tǒng)在物理意義上真的只對消息處理一次), 這在需要考慮容錯的情況下是不可能做到的。

Effective once msg processing是一個更恰當?shù)男稳?#xff0c;而所有號稱可以做到EOMP的系統(tǒng),其實都只是能做到Effective once msg processing。即:中間件, 或者說流處理framework可能在failure發(fā)生的情況下處理了多次同一個消息,但是最終的系統(tǒng)計算結(jié)果和沒有任何failure時, 一個消息真的只處理了一次時計算的結(jié)果相等。這和冪等息息相關(guān)。

4、冪等Idempotent

一個相同的操作, 無論重復多少次, 造成的效果都和只操作一次相等。比如更新一個keyValue, 無論你update多少次, 只要key和value不變,那么效果是一樣的。再比如更新計數(shù)器處理一次消息就計數(shù)器+1, 這個操作本身不冪等, 同一個消息被中間件重"發(fā)+收"兩次就會造成計數(shù)器統(tǒng)計兩次。

而如果我們的消息有id, 那么更新計數(shù)器的邏輯修改為, 把處理過的消息的id全記錄起來, 接到消息先查重, 然后才更新計數(shù)器, 那么這個"更新計數(shù)器的邏輯"就變成冪等操作了。

把本不冪等的操作轉(zhuǎn)化為冪等操作是end to end consistency的關(guān)鍵之一。

5、確定性計算deterministic

和冪等有些類似, 不過是針對一個計算,相同的input必得到相同的output, 則是一個確定性(deterministic)。

比如從一個msg里計算出一個key和一個value, 如果對同一個消息運算無數(shù)次得到的key和value都相同, 那么這個計算就是deterministic的;而如果key里加上一個當前的時鐘的字符串表示, 那么這個計算就不是確定性的, 因為如果重新計算一次這個msg, 得到的是完全不同的key。

注意1: 非確定性計算一般會導致不冪等的操作, 比如我們?nèi)绻焉线吚永锏膋eyvalue存在數(shù)據(jù)庫里, 重復處理多少次同一個msg, 我們就會重復的插入多少條數(shù)據(jù)(因為key里的時間戳字符串不同)。

注意2: 非確定性計算并非必然導致不冪等的操作,比如這個時間戳沒有添加在key里而是添加在value里, 且key總是相同的, 那么這個計算還是"非確定性"計算。但是當我們存數(shù)據(jù)的時候先查重才存keyvalue, 那么無論我們重復處理多少次同一個msg, 我們也只會成功存入第一個keyValue, 之后的keyValue都會被過濾掉。

支持非確定業(yè)務(wù)計算的同時, 還能在容錯的情況下達成端到端一致性, 是流系統(tǒng)的大難題, 甚至我們今天會提到的幾個state of art的流系統(tǒng)都未必完全支持。(好吧Spark說的就是你)

圣光(廣告)不會告訴你的事

分布式系統(tǒng)最tricky的問題就是, 問題看起來很普通很簡單。一些問題總是看起來有簡單直接的解法,而一個"簡單解"被人查出問題時,也總是看起來可以很簡單的就可以把這個挑出的edge case很簡單的解決掉。

然而我們會立刻發(fā)現(xiàn)解決這個edge case而引入的新步驟會引發(fā)新的問題... 如此循環(huán), 直到"簡單"疊加到"無法解決的復雜"。

由于人們對這些問題的"預期是簡單的", 所以很多書, online doc, 都大大簡化了對問題的描述和對問題的分析。最普遍的是對failure recovery的介紹, 一般只會簡單的寫"failure發(fā)生時, 系統(tǒng)會怎么recovery", 但是完全不提怎么檢測failure和“根本不可能完美檢測到failure”這個分布式系統(tǒng)的基本事實, 從而給了讀者“failure可以完美檢測”的錯覺。

這是因為一來說清楚各種edge case會大大增加文檔的復雜性, 另外一點是寫了讀者可能也看不明白, 還有就是廣告效應(yīng), 比如真正字面意義的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系統(tǒng)都說自己可以支持exactly once, 那自己也得打這個廣告不是。

還有就是語焉不詳, 比如某stream系統(tǒng)說自己可以實現(xiàn)exactly once msg delivery, 別看delivery和processing好像差不多, 這里邊的用詞藝術(shù)就有意思了,delivery是指消息只在stream里出現(xiàn)一次, 但是在stream里只出現(xiàn)一次的消息卻無法保證只被consume一次確根本不提。

再比如某serverless產(chǎn)品處理某stream的消息, 描述是保證舊的消息沒有處理之前不會處理新消息, 你會想, 簡單描述成保證消息按順序處理不是一樣么? 其實差大了去了, 前者并沒有屏蔽掉舊消息突然replay, 覆蓋掉新消息的處理結(jié)果的edge case。而這個事實甚至顛覆了很多使用這個服務(wù)的Sr. SDE的對其的認知。

沒有理解分布式系統(tǒng)的幾個簡單的本質(zhì)問題之前, 你讀文檔的理解很有可能和文檔真正精準定義的事實不符。且讀者對“系統(tǒng)保證”的理解, 往往會由于文檔"藝術(shù)"定義的誤導, 而過多的假設(shè)系統(tǒng)保證的"強", 直到被坑了去尋根問底, 才會收到"你誤讀了文檔的哪里的詳細解釋"。這是分布式系統(tǒng)"最難的地方在最普通的地方"的直接結(jié)果之一。

個人認為最好的辦法就是去理解分布式系統(tǒng)軟件算法所能達到的上限=>關(guān)于各種impossibility的結(jié)論的論文,然后去學習克服他們的方法的論文。

這樣, 我們才能從各種簡化了的 tutorials里, 從API中, 從各種云服務(wù), 框架的廣告詞背后, 發(fā)現(xiàn)“圣光不會告訴你的事", 和"這個世界的真相";(從廣告和online doc天花亂墜的描述中看到分布式系統(tǒng)設(shè)計真正的取舍, 這是區(qū)分API調(diào)包俠和分布式系統(tǒng)專家的分水嶺之一)。

而不是“簡單的信了它們的邪”。而下邊,就是學習分布式系統(tǒng),你所需要了解的最重要事實中, 和end to end consistency相關(guān)的幾個。

幾個事實

1、不存在完美的failure detector

很多關(guān)于分布式系統(tǒng)的書上都會說,當failure發(fā)生時系統(tǒng)應(yīng)該怎么做來容錯, 就好像可以準確的檢測到failure一樣。然而事實是, 在目前互聯(lián)網(wǎng)的物理實現(xiàn)上(share nothing architecture, 只靠網(wǎng)絡(luò)互聯(lián),不直接共享其他比如內(nèi)存物理硬盤等),我們無法準確的檢測到failure。

簡單來說,就是當我們發(fā)現(xiàn)一個node無反應(yīng)的時候,比如ping它,給它發(fā)消息、request、查詢,都沒有反應(yīng),我們無法知道這到底是對方已經(jīng)停止工作了,還是只是處理的很慢而已。

無法制造完美的failure detector,即使在今天也是分布式系統(tǒng)的基礎(chǔ)事實。本文無意在基礎(chǔ)事實上多費唇舌, 無法接受此事實者可以去翻相關(guān)論文。╮( ̄▽ ̄"")╭

Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is>The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from>A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision…

A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]

2、不存在完美的failure detector, 所導致的幾個顛覆你認知的問題

1)分布式共識問題"Consensus"在"不存在完美的Failure Detector的情況下"不可解, 這又叫做FLP impossibility[36]。

可以說是上世紀,奠定分布式系統(tǒng)研究基石方向的一篇論文。即: 在理論上, 在分布式環(huán)境里(更準確說應(yīng)該是異步環(huán)境里), 在最多可能出現(xiàn)一個crash failure的強假設(shè)下, 不存在任何一個算法可以保證系統(tǒng)里的所有的"正常"節(jié)點對某一信息有共識。

對于"共識"你可以理解為一個數(shù)據(jù)一摸一樣的備份在多個節(jié)點上。(那么paxos, raft這種consensus協(xié)議是怎么回事呢? 稍后會解釋)

2)在分布式環(huán)境, 連分配只增序列號這件事都很難(即不同的進程去向一個系統(tǒng)申請序列號, 從0開始不斷增加, 保證process得到的序列號不能重復)。

因為本質(zhì)上這是一個consensus問題, 后邊可以看到, 能夠分配高可用性的global序列號(epoch id), 是解決zombie leader/master/processor的問題時的一大助力。

3)在保證liveness的情況下(即檢測到失敗就在另外的機器重啟邏輯節(jié)點), 無法保證系統(tǒng)中的Singleton角色“在同一時間點”只有一個。

比如在有l(wèi)eader概念的分布式系統(tǒng)里, 要求任意時間點只有一個leader做決定, 比如HBase需要只能有一個Region Server負責某region的寫操作; 再比如kafka或者Kinesis[22]里需要只能有一個partition processor接受一個stream partition的信息并且采取行動。

而事實是, 任何云服務(wù)和現(xiàn)有實現(xiàn), 都無法在物理上保證“在同一時間點”, 真的只有一個這樣的邏輯角色存在于機群中; 這就牽涉到一個概念=> Zombie Process。

4)Zombie process

由于沒有完美的failure detector, 所以即使幾率再低, 只要時間夠長, 需要failure detection的用例夠多, 系統(tǒng)不可避免會錯誤的判斷把一個并沒有真正crash掉的process當作死掉了。

而如果系統(tǒng)需要保持高可用性,需要在檢測到crash的時候,在新的機器上啟動此process繼續(xù)處理,那么當failure detector出錯,則會發(fā)生新老process共同工作的問題,此時,這個老的process就是zombie process。

嚴重注意,在分布式系統(tǒng)里,我們需要單一責任的一個節(jié)點/processor/role來做決策或者處理信息時,我們要么不保護系統(tǒng)的高可用性(機器掛了就停止服務(wù)),要么解決zombie process會帶來的問題。

高可用性的系統(tǒng)中, zombie無法消除。這關(guān)系到分布式系統(tǒng)設(shè)計里的一個核心問題:liveness和safety的取舍。

Liveness和Safety的取舍

1、在缺乏完美的failure detector的情況下, 對方遲遲不回信息(ping它也不回), 不發(fā)heartbeat, 那么本機只有2個選擇:?

  • 認為對方還沒有crash, 持續(xù)等待;

  • 認為其crash掉了, 進行failover處理。

選擇1傷害系統(tǒng)的liveness, 因為如果對方真的掛了,我們會無限等待下去, 系統(tǒng)或者計算就無法進行下去。選擇2傷害系統(tǒng)的safety, 因為如果對方其實沒有crash, 那我們就需要處理可能出現(xiàn)的重發(fā)去重, 或者zombie問題, 即系統(tǒng)的邏輯節(jié)點的“角色唯一性“就會被破壞掉了。

越好的liveness要求越快的響應(yīng)速度, 而“100%的safety“的意義, 則因系統(tǒng)的具體功能的不同而不同, 但一般都要求系統(tǒng)做決定要小心謹慎, 不能放過一個edge case, 窮盡所有必要的檢查來保證"系統(tǒng)不允許出現(xiàn)的行為絕對不會發(fā)生"。在consensus的語義下來說, safety就是絕對不能向外發(fā)出不一致的決定(比如向A說決定是X, 后來向B說決定是Y)。

可以看到, 系統(tǒng)的edge case越多, safety越難保證, 而edge cases的全集只是可能發(fā)生的情況的集合, 而某一次運行只會發(fā)生一種情況(且大概率是正常情況)。

如果系統(tǒng)不檢查最難分辨最耗時的幾種小概率發(fā)生的edge case, 那么系統(tǒng)大概率(甚至極大概率)也可以完美運轉(zhuǎn)毫無問題幾個月, 運氣好甚至幾年。這樣降低了系統(tǒng)的safety(不再是100%), 但是提高了系統(tǒng)的響應(yīng)速度(由于是概率上會出問題, 所以即使降低了safety保證, 也不是說就一定會出問題, 只是你把系統(tǒng)的正確性交給了運氣和命運)。

而如果系統(tǒng)保證檢查所有的edge case, 但是系統(tǒng)99.9999%的概率都不會進入一些edge cases, 那么這些檢查就會阻礙正常情況的運算速度。Liveness和Safety, 這是分布式系統(tǒng)設(shè)計的最基本取舍之一。

而FLP則干脆說: 在分布式consensus這個問題里, 如果你想要獲得100%的系統(tǒng)safety, 那么你絕對無法保證系統(tǒng)liveness, 即:系統(tǒng)總是存在活鎖的可能性, 算法設(shè)計只能減小這個可能性, 而無法絕對消除它。

2、更多的safety VS. liveness 取舍的例子:

Kubernetes StatefulSet, 簡單說是可以給容器(pod/container)指定一個名字的, 且保證全cluster總是只有一個容器可以有這個名字, 這樣application就可以通過這個保證來指定機群中的邏輯角色, 且用這個邏輯容器中保存一些狀態(tài)。(一般的replicaSet會load balance連接或請求到背后不同的節(jié)點, 你的一個請求要求在server本地存一些狀態(tài), 下一個請求未必還會到同一個server)

When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]

Kubernetes StatefulSet在liveness和safety里選擇了safety, 當statefulSet所在的的物理節(jié)點"掛了"之后, kubernetes默認不會重啟這個pod到其他節(jié)點去, 因為它無法確定這個物理節(jié)點到底死沒死, 為了保證safety它選擇放棄了liveness, 即系統(tǒng)無法自愈, StatefulSet提供所提供的服務(wù)不可用, 直到靠人干預來解決問題。

([38] P305: 10.5. Understanding how StatefulSets deal with node failures)?

Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...

Akka Cluster也做了相同的選擇, 在cluster membership管理中,有一個auto-downing的配置, 如果你打開它, 那么cluster就會完全相信Akka的failure detection而自動把unreachable的機器從cluster中刪去, 這意味著一些在這個unreachable節(jié)點上的Actor會自動在其他節(jié)點重啟。

Akka Cluster的文檔中, auto-downing是強烈不推薦使用的[38], 這是由于Akka Cluster提供的很多feature要求角色的絕對單一性, 比如singleton role這個功能, 在保證“cluster里只有這一個節(jié)點扮演這個actor"(safety), 和保證"cluster里總要有一個節(jié)點扮演這個actor"(liveness) 中, 選擇了safety, 即保證at most one actor存在于cluster中, 一旦次actor的節(jié)點變成unreachable(比如機器真的掛了), 那么Akka也無能為力, 只能傻等這個節(jié)點回來或者人來干預決策:

The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]

一個商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能點的解決方案(基于quorum), 有興趣的同學可以看引用文檔[29]。

This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]

3、為什么Kubernetes和Akka不能同時保證safety和liveness呢?

這是因為這兩個作為比較底層的平臺, 他們需要對上層提供非常大的自由性, 而不能限制上層的活動。比如kubernetes沒有規(guī)定用戶不能在pod上跑某種程序, Akka也沒有規(guī)定用戶不能寫某種actor的code。

這樣, 在不限制自己處理能力的同時要保證任何行為都看起來exactly happen once(因為語義上singleton節(jié)點只有一個, 那么就不能讓用戶寫的任意單線程程序出現(xiàn)多節(jié)點平行執(zhí)行的外部效果), 而這對中間件來說是不可能的, 這就引出了另外一篇論文: end to end argument[27], 作者已經(jīng)寫過一篇文章詳細介紹end to end argument(阿萊克西斯:End to End Argument(可能是最重要的系統(tǒng)設(shè)計論文)), 這里不在贅述。

后邊我們可以看到Flink, Spark等流系統(tǒng)為了保證exactly once msg processing需要怎樣和end to end argument 搏斗。

4、可以同時保證safety和liveness么?

取決于具體情況下對safety和liveness的具體要求, 在流處理的情況下, 至少本文提到的4種流系統(tǒng)都給出了自己的解。請耐心往下閱讀。

絕望中的曙光

1、可解也不可解的分布式consensus

由于異步環(huán)境下, 釘死了我們不可能有一個完美不犯錯的failure detector。這篇著名的論文Unreliable Failure Detectors for Reliable Distributed Systems[30] 詳細描述了即使我們用一個不準確的failure detector, 也可以解決consensus的方法。

但是它并沒有推翻FLP impossibility的結(jié)論:Consensus還是并非絕對可解。但是, 如果我們對需要consensus的計算加一個限制,則Consensus可解。

這個限制是: 計算和通訊只需要在"安全時間"內(nèi)完成即可, 對[30]提供的算法來講, 提供consensus的系統(tǒng)需要在這段時間內(nèi)"正確識別crash"即可,也就是說(1)識別出真正掛掉的node, 和(2)不要懷疑正確的node。

怎么理解呢, 這兩個看似對立的概念:?

  • (1)consensus的有解(比如paxos協(xié)議)是對的;

  • (2)consensus的無解證明:FLP impossibility也是對的。

要準確且簡單的解釋為什么它們都是對的有點難, 推薦還是看論文。但是用比喻來解釋的話, 根據(jù)[30], Consensus算法可以看作這樣一個東西, 當系統(tǒng)出現(xiàn)crash, failure detector判斷錯誤,或者網(wǎng)絡(luò)突然延遲...等時候, 算法會進入某種循環(huán)而不會輕易作出決定。

for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]

而只要滿足必要的條件時(計算和通訊只需要在"安全時間"內(nèi)完成), 系統(tǒng)則可以跳出循環(huán)讓機群達成一致[30,31]。

(1) There is a time after which every process that crashes is always suspected by some correct process.?

(2) There is a time after which some correct process is never suspected by any correct process.?

The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.

而FLP impossibility則可以理解為挑刺兒的說, 那這個條件永遠無法出現(xiàn)呢? 你的算法就活鎖了呀(丟失liveness)。

幸運的是, 在現(xiàn)實世界, 我們總可以對消息傳遞和處理來估計一個上限, 你可以理解為,只要消息處理總是在這個上限之內(nèi)完成,那么consensus總是可以實現(xiàn), 而消息處理的時間即使偶爾超過了這個上限, 我們的consensus協(xié)議也會進入安全循環(huán)自我保護, 從而不會破壞系統(tǒng)的safety, 而系統(tǒng)總是可以再次回歸平穩(wěn)(處理時間回歸上限之內(nèi))。

而FLP則是像說: 你無法證明系統(tǒng)總是可以回歸平穩(wěn) (確實無法證明, 因為FLP的前提是異步模型, 而我們的真實世界更像是介于異步和同步模型之間的半同步模型, 我們只能說極大概率系統(tǒng)可以"回歸平穩(wěn)", 而無法證明它的絕對保證; =>可以絕對保證"上限"的模型一般稱為同步模型)。

其實用paxos來模擬出FLP的活鎖的例子也很簡單, 你把節(jié)點間對leader的heartbeat timeout時間設(shè)為0.001ms, 那么所有的節(jié)點都會忙著說服別的其他節(jié)點自己才是leader(因為太短的保活時間, 除了自己, 節(jié)點總是會認為其他的任意節(jié)點是leader時, leader死了), 那么系統(tǒng)就會進入活鎖, 永遠無法前進達成cluster內(nèi)的consensus, 系統(tǒng)喪失liveness。

Zombie Fencing

即使consensus問題解決了, zombie節(jié)點也還是大問題, kubernetes和Akka可以選擇避開zombie, 損失liveness。

然而對于絕大多數(shù)分布式系統(tǒng)來說, 是必須直面zombie節(jié)點這個問題的,比如各種分布式系統(tǒng)的master節(jié)點, 如果master掛了整個系統(tǒng)不在另外的機器重啟master,整個系統(tǒng)就可能變?yōu)椴豢捎谩?/p>

再比如kafka和Kinesis的單一partition只能有一個consumer, 如果這個msg consumer掛了不自動重啟, 對消息的處理就會完全停止。

zombie是最容易被忽視的問題, 比如, 即使我們有了paxos, raft, zookeeper這種consensus工具可以幫我們做leader election, 也不要以為你的系統(tǒng)中不會同時有2個leader做決策了。

這是因為先一代的leader可能突然失去任何對外通信,或者cpu資源被其他進程吃光, 或者各種edge case影響, 使得其他節(jié)點無法和其通信, 新的leader被選出, 而老的leader其實還沒死, 如果老的leader在失去cpu之前的最后一件事是去寫只有l(wèi)eader才能寫的數(shù)據(jù)庫, 那么當它突然獲得cpu時間且網(wǎng)絡(luò)恢復正常, 那么這個以為自己還是leader的zombie leader就會出乎意料的去寫數(shù)據(jù)庫。

這曾經(jīng)是HBase的一個重大bug[39, Leader Election and External Resources P105]。

Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...?

The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.

(解釋不動了, 大家看英文吧...)

其實對付zombie已經(jīng)是分布式系統(tǒng)的共識了,也有很多標準的解法,以至于各個論文都不會太仔細的去描述, 這里簡單介紹幾種方法:

zombie fencing設(shè)計的關(guān)鍵點在于如何阻止已經(jīng)“成為zombie的自己”搞亂正常的“下一代的自己”的狀態(tài)。畢竟無論是zombie還是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代碼邏輯,也就是說這同一段代碼, zombie來跑就"不能過:"(比如不能對系統(tǒng)的狀態(tài)造成影響), 但是"下一代"來跑, 就可以正常工作。這一般需要滿足以下幾點:

1)如何正確區(qū)分“正常的下一代”(由于懷疑當前的節(jié)點已經(jīng)死掉了,所以重新創(chuàng)建和啟動的新一代邏輯節(jié)點)和“zombie”(懷疑錯誤,當前節(jié)點并沒有死掉,但是新一代節(jié)點已經(jīng)創(chuàng)建并啟動,當前節(jié)點成為大家都以為死掉但是還活著的zombie)一般需要一個多機復制且穩(wěn)定自增的epoch number來確定新老邏輯節(jié)點。

這個epoch number要在分布式環(huán)境中穩(wěn)定自增,一般只能通過consensus協(xié)議來實現(xiàn)。否則要分配新一代epoch number時,由于管理epoch number的機群的failover造成分配了一個老的epoch number給新啟動的“下一代”,那么zombie反而會有更大的epoch number,這就會造成整個系統(tǒng)的狀態(tài)混亂。

怎樣的混亂在介紹完zombie fencing之后就顯而易見了(因為所有其他節(jié)點都以為zombie死掉了, 把所有的最新操作和狀態(tài)發(fā)給新節(jié)點,但是新節(jié)點卻有一個比zombie還小的epoch number, 從而被zombie fence掉, 而不是自己可以fence zombie)

2)會被zombie影響的系統(tǒng)需要特殊設(shè)計使得:當“新一代”注冊后就拒絕“老一代的任何請求”,特別是寫入請求。也就是具體的利用epoch number的zombie fencing的實現(xiàn); 一般需要具體情況具體分析。

如果被影響的系統(tǒng)是自己的一個microservice,那么可以隨意設(shè)計協(xié)議來驗證一個請求所攜帶的epoch number是不是最新的。而當這個被影響的系統(tǒng)是一個外部系統(tǒng), 比如是業(yè)務(wù)系統(tǒng)需要用到的一個數(shù)據(jù)庫,由于你沒法改數(shù)據(jù)庫的代碼和數(shù)據(jù)庫client與server之間的協(xié)議, 那么就要利數(shù)據(jù)庫所提供的功能或者說它的協(xié)議來設(shè)計application層級的zombie fencing協(xié)議。

比如對提供test and set,compare and swap的kv數(shù)據(jù)庫來說,application設(shè)計自己的業(yè)務(wù)表時,要求所有的表都必須有一個epoch字段,而所有的寫入都必須用test and set操作來檢測當前epoch字段是否比要寫入的請求的epoch字段大或相等, 否則拒絕寫入。這樣, 只有"下一代"可以更改zombie寫入的數(shù)據(jù), 而zombie永遠無法更改"下一代"插入或者更新過的數(shù)據(jù)。

另一方面,很多時候"下一代"需要讀取上一代的信息,繼承上一代的數(shù)據(jù),然后繼續(xù)上一代的工作。那么如果剛讀取完數(shù)據(jù),zombie就改變了數(shù)據(jù),那么"下一代"對于當前系統(tǒng)狀態(tài)的判斷就會出差錯。

一個general的解決的方法也很簡單,要讀先寫,“下一代”開始工作前, 如果要先讀入數(shù)據(jù)了解“系統(tǒng)的當前狀態(tài)”,必須先改變數(shù)據(jù)的epoch number為自己的epoch number(當然要遵從只增更改原則test and set,如果發(fā)現(xiàn)當前數(shù)據(jù)的epoch number已經(jīng)比自己的epoch number還大了,則說明自己也已經(jīng)是zombie了,更新的"下下一代"已經(jīng)開始工作), 更改數(shù)據(jù)的epoch number成功之后,再讀入數(shù)據(jù),就可以保證比自己老的zombie絕對不可能再更改這個數(shù)據(jù),而現(xiàn)在讀入的數(shù)據(jù)可以體現(xiàn)系統(tǒng)的最新狀態(tài),從而完成對"老一代"數(shù)據(jù)的繼承。

而在增加epoch number之前所有被寫入的數(shù)據(jù)。這里即使是"新一代"啟動之后, 讀取系統(tǒng)狀態(tài)之前被zombie寫入的數(shù)據(jù), 都可以看做老一代的合法數(shù)據(jù),只要被新一代開始工作前繼承讀入即可。我們所要避免的是"新一代" 所讀取的事實被zombie所更改。而不是在物理時間的意義上在"新一代"啟動時就立刻阻止zombie的所有系統(tǒng)改動。

zombie fencing的設(shè)計取決于分布式系統(tǒng)的具體情況,比如業(yè)務(wù)邏輯可能更改的數(shù)據(jù)范圍可能是幾百萬幾千萬的數(shù)據(jù)記錄,那么這也意味著zombie可能會修改的數(shù)據(jù)范圍非常大,那么要求"下一代"在開始工作前更改所有數(shù)據(jù)的epoch number就很不現(xiàn)實。

對于zombie的影響的耐受性也會影響zombie fencing的設(shè)計,比如如果"下一代"只需要自己所接觸的有限數(shù)據(jù)在特定時刻之后不被zombie影響就能正確工作, 那么只要在"下一代"需要接觸特定數(shù)據(jù)時才更改此數(shù)據(jù)的epoch number來屏蔽zombie即可,那么即使業(yè)務(wù)可能修改的數(shù)據(jù)范圍很大,簡單的更改數(shù)據(jù)的epoch number也還是可以接受的解決方案。

最糟糕的情況,如果"zombie"可能會插入新的數(shù)據(jù), 而"下一代"的正常工作需要不能有非法的新數(shù)據(jù)插入(比如下一代開始工作前先統(tǒng)計所有資源的個數(shù),然后開始基于這個事實和"只有自己才能更改資源"的假設(shè),作出各種決策。

而此時zombie突然插入了一條新資源記錄或者資源使用記錄...),如果"新一代"完全無法預測zombie會插入什么記錄,要阻止zombie隨意插入數(shù)據(jù),“新一代”就只能在利用predicate lock來防止新紀錄插入,且不說很多數(shù)據(jù)庫根本不支持“鎖住不存在的數(shù)據(jù)”的predicate lock,就算支持此功能的數(shù)據(jù)庫也很有可能是使用表級鎖來鎖住整張表。

如果數(shù)據(jù)表設(shè)計成了需要共享給多個節(jié)點使用(比如一張資源表,不同的singleton worker負責維護不同的資源范圍),那么表級鎖就會妨礙其他worker的工作。

zombie fencing的設(shè)計在于如何引入簡單的fencing點, 對"新一代"暢通無阻,但是卻可以阻止zombie的異常活動, 如果協(xié)議設(shè)計使得"新一代"可以很容易制造這個fencing點, 則"新一代"在啟動或者需要的時候加入fencing點即可。

比如前邊說的任何數(shù)據(jù)都要附帶一個epoch值,任何數(shù)據(jù)寫入都要用test and set來對比數(shù)據(jù)的當前epoch值和請求的epoch值。

對于上文的隨機插入的業(yè)務(wù)需求, 可以要求業(yè)務(wù)邏輯插入任何數(shù)據(jù)之前,先在一個注冊表的屬于自己epoch的一行里記錄自己要寫的數(shù)據(jù)的id, 且在記錄的時候用test and set來檢測自己這一行數(shù)據(jù)的active值是否被更改為disable了。

這樣就相當于引入了一個更簡單的fencing點,因為"下一代"只要在注冊表里把所有上一代的記錄寫為disable, 就可以阻止zombie的未來任何活動,但是此時無法阻止zombie的最后一個注冊的數(shù)據(jù)插入, 但是"下一代"可以簡單的讀注冊表得知這個數(shù)據(jù)的id, 從而對這個"最后的zombie寫入"采取相應(yīng)的策略(繼承,刪除, 甚至fencing, 比如這個id并不存在,那么無法得知是zombie真的在寫之前死了所以永遠不會插入這個記錄了,還是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注冊的id先插入一條記錄來占位,這樣無論zombie是真的死了還是卡了,都無法再寫入這個數(shù)據(jù)了)。這樣,我們就引入了一個連數(shù)據(jù)插入都可以fencing的fencing點。

Zombie fencing一般都是以上這些套路, 用consensus協(xié)議確定epoch number區(qū)分"下一代"還是zombie,這個epoch number一般也可以稱為fencing token, 通過把fencing token分發(fā)給需要拒絕zombie的service,把fencing token和需要保護的數(shù)據(jù)(防止被zombie修改)存在一起。

所以一般論文[7, 26]里只會簡單的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token。

三節(jié)點間的EOMP

三點為 (上游/input提供端)=> (當前計算節(jié)點/計算結(jié)果發(fā)送端) =>(下游/計算接收端)

如果我們考慮必須保證系統(tǒng)的高可用性,即檢測到任意process的failure,都會由一個(絕對不死)的高可用的JobManager或者MasterNode,來重啟(可能在另外的node)這個process, 所以我們定義這種即使所在的host掛掉, 也會不斷重新在其他host上重啟的process為邏輯process。這時我們要面臨幾種可能造成inconsistent的情況:

"計算接受端"沒有成功ack"計算結(jié)果發(fā)送端"的消息,一般表現(xiàn)為發(fā)送端的等待ack 超時。根據(jù)之前的討論,接收端有可能把消息處理完畢了(ack的消息丟失,或者剛處理完消息還沒發(fā)ack就掛了…等情況),也可能沒有處理完畢(沒接到或剛接到消息就掛了…等情況)。

這種情況發(fā)送端可以重發(fā)信息, 而發(fā)送端是需要“上游input提供端”提供某種數(shù)據(jù)然后進行某種計算后產(chǎn)生的這個消息/計算結(jié)果(設(shè)為outputA), 那么"計算結(jié)果發(fā)送端"有兩個策略:

策略1: 利用存儲計算結(jié)果來盡量避免重算

要實現(xiàn)上下游exact once processing,需要實現(xiàn)4個條件:

  • 結(jié)果高可用;

  • 下游去重;

  • 上游可以replay;

  • 記錄上游進度。

1)要求結(jié)果高可用, 應(yīng)對timeout時, “下游計算接收端”其實并沒有成功處理"計算結(jié)果發(fā)送端"的計算結(jié)果的情況(比如下游掛了), 這時"計算結(jié)果發(fā)送端"可以把計算的結(jié)果存儲在高可用的DataStore里(比如DynamoDB,Spanner…或者自己維護的多備份數(shù)據(jù)庫)。

這樣超時只要重發(fā)這個計算結(jié)果即可, 自己甚至可以開始去做別的事情, 比如處理和計算下一個來自“上游/input提供端“的event, 而已經(jīng)被“下游計算接收端”ack的"計算結(jié)果"則可以清理,一般由異步的garbage collection清理掉。

注意, 由于存在存儲失敗的可能性, 或者剛計算完結(jié)果還沒來得及存儲就掛掉重啟的可能,我們無法真的保證避免重算,詳見:無法避免的重算的例子。

2)下游去重,應(yīng)對timeout時下游其實已經(jīng)處理完畢消息的情況

①一般的解決方案:當邏輯接收端不固定, 比如發(fā)送端要根據(jù)計算出來的outputA的某key字段把不同的key發(fā)送給負責不同key range(也就是partition)的多個"下游計算接收端"。

只需要一個sequenceId就可以實現(xiàn)接收端的消息去重。接收端和發(fā)送端各維護一個partition level的sequenceId即可。這樣當發(fā)送端收到當前message sequenceId(假設(shè)為n)的Ack才發(fā)下一個sequenceId為n+1的信息,否則就無限重試。而接收端則根據(jù)收到的消息的id是不是已經(jīng)處理過的最大id+1來判斷是這是不是下一個message。

②Google MillWheel的特例:Google MillWheel做出了一個很有意思的選擇,發(fā)送端完全不維護sequenceId,而是為每一個發(fā)出的message生成一個全局唯一的id,下游則需要記住"所有"見過的id來去重,但是這樣會造成大量查詢io和存儲cost,所以需要另外的方案來解決性能和下游沒有無限的存儲所以"不可能記住所有id"的問題。這個例子比較特殊,有興趣的同學可以查閱[4,7]

③要求觸發(fā)本次計算的“上游input提供端”可以replay input event,否則剛接到event還沒計算就掛掉重啟, 則event丟失。

無法避免的重算:任何時候計算沒完成,或者計算完成后但是成功儲存前(a.結(jié)果高可用的需求), 計算節(jié)點fail掉重啟, 我們都需要replay上次計算過的input event,所以由于計算結(jié)果都還沒存成功,所以從物理上講, 此時我們還是重算了的; 所以即使我們采用把計算結(jié)果記錄下來的策略, 我們無法從物理意義上真正避免重算, 我們避免的是有多個"重復的"成功計算結(jié)果提交給下游。

而當計算不是deterministic的, 這多個“重復的”計算結(jié)果可能是不同的值發(fā)送給不同的下游((比如按照計算結(jié)果的key發(fā)送給下游不同的partition)。那么下游就會處理同一個event所產(chǎn)生的本應(yīng)只有一個的計算結(jié)果兩次,且由于非確定性計算的原因,這兩個計算結(jié)果不一樣。這就會造成event不是EOMP的問題。(不僅在物理上計算了2次, 在效果上也影響了2次下游的計算, 打破的effective process once的要求)

④要求記錄event處理的進度, 并保證儲存計算結(jié)果不出現(xiàn)重復。記錄event處理的進度, 使得trigger本次計算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event處理完畢, 可以發(fā)下一個了), 來避免計算的re-trigger; 這要求以下策略2選一:

  • 記錄event處理的進度, 和把計算結(jié)果存在高可用存儲里的操作是一個原子操作, 要么一起成功, 要么一起失敗; 這種策略可以保證當計算結(jié)果儲存下來, 此計算不會replay了;

  • 或者存儲計算結(jié)果是一個冪等操作,那么可以先存計算結(jié)果,再記錄event處理進度,一旦計算計算結(jié)果成功但是記錄event處理進度失敗,重新計算上游的同一個event并儲存計算結(jié)果也不會引起問題。

否則要么計算沒存event就被屏蔽掉了, 要么多次計算結(jié)果存儲在DataStore里造成下游的重復信息。注意, 此時下游是無法分辨這是重復信息的, 因為這是datastore里的"2條的記錄", 將會獲得不同的message id。

冪等和end2end argument: 所以實現(xiàn)原子操作就不需要冪等了么? 是也不是, 在業(yè)務(wù)層是的, 比如要實現(xiàn)業(yè)務(wù)層的冪等,我們可以在存計算結(jié)果到datastore里的時候把一個與觸發(fā)本次計算的event的唯一id記錄在一起,這樣我們每次存的時候就可以使用樂觀鎖的方式test-and-set, 來保證如果這個id在數(shù)據(jù)庫里沒有才插入。(取決于業(yè)務(wù),我們也可以用這個id當主key來,那么即使多次寫入同樣的內(nèi)容也沒關(guān)系=>要求計算是deterministic的)?

如果我們保證觸發(fā)計算的event的"屏蔽"和計算結(jié)果的儲存是一個原子操作,那么我們就不需要上邊這種復雜的存儲策略,因為一旦計算結(jié)果存儲成功,觸發(fā)計算的event必定被"屏蔽"掉了, 而如果沒存儲成功, 則event一定會replay來重試。

然而在傳輸層卻不是的,比如儲存數(shù)據(jù)庫的tcp有可能丟包重發(fā),依靠tcp的傳輸層id自動去重,實現(xiàn)tcp的冪等。

策略2: 完全依賴重算。

高可靠重發(fā)的問題是,所有信息都必須先記錄在高可用性的DataStore里, 相對于重新計算,重發(fā)需要的網(wǎng)絡(luò)IO, 存儲,狀態(tài)管理的cost是很高的。

而如果觸發(fā)計算的event可以replay的話(其實不管重算還是不重算,為了防止“剛接到event, 計算節(jié)點就掛掉的情況”, 我們都要求event可以replay), 我們就可以選擇重算然后重發(fā)來代替存儲計算結(jié)果的重發(fā);重算需要2個條件:

  • 計算需要是 deterministic 的,用完全一樣的數(shù)據(jù),必須算出完全相同的結(jié)果,否則,當計算結(jié)果所需要發(fā)送的邏輯下游是由計算結(jié)果所決定的情況下(比如按照計算結(jié)果的key發(fā)送給下游不同的partition) 那么non-deterministic的重算有可能把計算結(jié)果發(fā)給不同的下游,這樣如果重算發(fā)生時,下游(假設(shè)是節(jié)點A)其實已經(jīng)成功處理完畢重算前上游發(fā)送的信息, 只是ACK丟失, 那么重算的結(jié)果卻發(fā)送給了另外一個(節(jié)點B), 那么就會造成一個event造成了2個下游effect的結(jié)果, 引起一個event造成2次下游影響的結(jié)果, 違反EOMP的原則;

  • 重算之前, 狀態(tài)需要rollback到?jīng)]有計算之前, 否則會影響需要狀態(tài)的計算的結(jié)果正確性,如果狀態(tài)更新非冪等,本次計算所做的狀態(tài)更新也會更新多次;詳見"加入節(jié)點狀態(tài)的三節(jié)點間的EOMP"。

(在多節(jié)點流計算里,要求上游可以重發(fā),意味著上游把計算結(jié)果存下來了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重發(fā),那么上游的上游可以用儲存的結(jié)果重發(fā)或者重算。。。以此類推)

(2種策略其實都有可能造成重算,也都對event replay有需求。為什么還要浪費資源去存儲計算結(jié)果呢?這里邊的重要區(qū)別是,當儲存結(jié)束時,對觸發(fā)本次計算的上游event的依賴結(jié)束了,而不穩(wěn)定的下游不會造成額外的重算, 和對上游, 上游的上游....計算的"鏈式反應(yīng)", 詳見流的EOMP中的討論)

加入節(jié)點狀態(tài)的三節(jié)點間的EOMP

帶狀態(tài)的計算, 比如流計算的某中間節(jié)點需要統(tǒng)計總共都收到多少信息了, 每次從上游收到新信息, 都把自己統(tǒng)計的當前歷史信息總數(shù)更新并發(fā)往下游節(jié)點, 那么這個"系統(tǒng)的歷史信息"就是這個"統(tǒng)計消息總數(shù)"的邏輯節(jié)點的狀態(tài)。

由于狀態(tài)也需要高保活,所以它也一定需要儲存在遠端dataStore里, 這樣儲存狀態(tài)的遠端datastore就相當于一個特殊的下游。不同點在于, 當采用策略2:重算, 而不存儲中間計算結(jié)果的話, 重算時則需要datastore可以把它所記錄的狀態(tài)rollback到最初剛開始處理此event的那個點。

這里我們只能rollback, 而不能只是依靠冪等來保證“狀態(tài)的更新是exactly once”的原因是, 節(jié)點在處理任意消息時的狀態(tài)也和當前信息的數(shù)據(jù)一樣是本次計算的input, 而更新后的狀態(tài)則是本次消息處理的output, 如果重算時不rollback節(jié)點的狀態(tài), 那么我們就會用一個被本消息"影響過"的狀態(tài)來進行計算, 而這是會違反exactly once msg processing語義的。

比如節(jié)點的本地狀態(tài)是上次收到的信息的數(shù)據(jù)上記錄的時間戳, 節(jié)點的運算是計算2個event數(shù)據(jù)之間的時間戳差距。假設(shè)eventA發(fā)生在時刻0, eventB發(fā)生在時刻10, 那么eventB引發(fā)的計算應(yīng)往下游發(fā)送10, 并把節(jié)點的本地狀態(tài)更新為10, 此時如果eventB的這個計算需要重算, 但是我們不rollback狀態(tài)10回到0的話, eventB重算所得的結(jié)果就會變成0。

注意: 由于state更新也是處理event的"下游", 那么計算過程中的所有狀態(tài)更新都可以算作“計算結(jié)果”的一部分, 所以當我們需要儲存計算結(jié)果時,則需要把:

  • 狀態(tài)更新儲存回高可靠的statestore里;

  • 記錄event處理進度;

  • 把計算結(jié)果存在高可用存儲里。

這3個操作作為一個原子操作(以后我們稱之為"原子完成"來省略篇幅); 而任何時候需要重算的話, 狀態(tài)必須恢復到處理event之前的樣子。

加入state,我們需要把(d. 要求記錄event處理的進度, 并保證儲存計算結(jié)果不出現(xiàn)重復, 更改為 (d+. 要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復。

并加入要求(e. state需要在replay 上游event的時候rollback到處理event之前時的狀態(tài)。

這些要求稍有抽象,讓我們看一下流系統(tǒng)一般怎么達成這些要求。

流系統(tǒng)的EOMP

考慮一個多節(jié)點的流系統(tǒng),如果我們把上游所發(fā)來的計算結(jié)果當成前邊所說的“觸發(fā)計算的event”,而自己的發(fā)給下游的計算結(jié)果msg作為觸發(fā)下游計算的event。那么我們就可以用上邊的模型保證兩兩節(jié)點之間的exact once msg processing,從而最終實現(xiàn)端到端的exact once msg processing; 這就是Google MillWheel(DataFlow) 和Kafka Stream的解決方案。

他們都選擇把每個節(jié)點的計算結(jié)果儲存起來,并保證即使non-deterministic的計算, 也只有一次的計算會起作用, 而不會出現(xiàn)(策略2-1中提到的non-deterministic造成的不一致)。他們的區(qū)別是:

  • 如何實現(xiàn)state和;

  • 如何實現(xiàn)接收端去重;

  • 如何實現(xiàn)“原子完成”

1、Google MillWheel(DataFlow)

1)每個節(jié)點維護一個用來去重的"已處理msgId"集, 從上游收到新msg之后, 檢查去重 (b.下游去重)

2)開始計算, 所有的狀態(tài)更新寫在本地, 由于一個狀態(tài)只有一個更新者(本計算), 所以可以在本地維護一個狀態(tài)的view, 所有的更新只更新本地的view而暫時不commit到"remote的高可用DataStore", MillWheel用的BigTable。

3)計算完畢后, (1).所有的要發(fā)送的計算結(jié)果,(有一些可能是在計算過程中產(chǎn)生并要求發(fā)送的, 都會cache起來), (2)所有的state的所有更新, (3) 引發(fā)計算的msgId, 會用一個atomic write寫在BigTable里。(a.要求結(jié)果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復)

4)當commit成功之后, ACK上游, 而由于上游也采用commit計算結(jié)果到BigTable里的策略,且只有當自己(這里)發(fā)出的消息ACK之后, 才會允許 garbage collection回收計算結(jié)果占用的存儲, 所以在收到ACK之前, 上游的計算結(jié)果, 也就是當前計算所需要的msg, 都可以重發(fā),直至本節(jié)點計算成功且commit結(jié)果 (c. 要求觸發(fā)本次計算的event可以replay)

5)一旦計算過程中failure發(fā)生(比如機器掛了), 會在另外的host上重啟本process節(jié)點,從BigTable恢復本地state和"用來去重的已處理msgId集", 由于上次計算的結(jié)果還沒有commit, 所以滿足(e. state需要在replay event的時候rollback到處理event之前時的狀態(tài))

5)新啟動的運算節(jié)點在load本地狀態(tài)之前先用自己的sequencer廢掉現(xiàn)存的sequencer, 這樣BigTable就可以block zombie計算節(jié)點的寫。

2、Kafka Stream

Kafka Stream是建立在kafka分布式隊列功能上個一個library, 所以在介紹kafka Stream之前, 我們先來講一下Kafka

3、簡單介紹Kafka Topic

Kafka的topic可以看作一個partition的queue, 通過發(fā)給topic時指定partition(或者用一個partitioner 比如按key做hash來指定使用那個partition), 不同的key的消息可以發(fā)送到不同的partition, 相同key的message則可以保證發(fā)送到同一個partition去, topic里的信息可以靠一個確定的index來訪問, 就好像一個數(shù)據(jù)庫一樣,所以只要在data retention到期之前,consumer都可以用同一個index來訪問之前已經(jīng)訪問過的數(shù)據(jù)。

4、Kafka Transactional Messaging

前邊說過, Kafka Stream是建立在kafka分布式隊列功能上個一個library, 主要依靠kafka的Transactional Messaging來實現(xiàn)end2end exactly once msg processing。

Transactional Messaging是指用戶可以通過類似以下code來定義哪些對kafka topic的寫屬于一個transaction, 并進一步保證tx的atomic和Isolation。

producer.initTransactions();

?try {?

? ? // called? right before sending any records?

? ? producer.beginTransaction();

? ? //sending some messages...

? ? // when done? sending, commit the transaction?

? ? producer.commitTransaction();

} catch? (Exception e) {

? ? ?producer.close();

} catch? (KafkaException e) {

? ? producer.abortTransaction();??

}?

Kafka transaction保證了, beginTransactions之后的, 所有往不同Kafka topic里發(fā)送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作廢, 從而不為read-committed的consumer所見。

而kafka stream通過用kafka本身的分布式queue的功能來實現(xiàn)了state和記錄處理event進度的功能,因為:

  • 所有的要發(fā)送的計算結(jié)果(由于可以允許計算發(fā)不同消息給多個下游,所以可能發(fā)給不同的topic和partition);

  • 記錄input event stream的消費進度;

  • 所有的state的所有更新。

這3點, Kafka Stream都是用寫消息到kafka topic里實現(xiàn)的。

1)自不必說,本來就是往topic里寫數(shù)據(jù)。

2)其實是寫當前consume position的topic;。(注意Kafka Stream的上下游消息傳遞考的是一個中間隱藏的Kafka topic, 上游往這個topic寫, 下游從這個topic讀, 上游不需要下游的ack,只要往topic里寫成功即可, 也不需要管下游已經(jīng)處理到那里了。

而下游則需要維護自己"處理到那里了"這個信息,儲存在consume position的topic, 這樣比如機器掛掉需要在另外的host上重啟計算節(jié)點,則計算節(jié)點可以從記錄consume position的topic里讀出自己處理到那里然后從失敗的地方重洗開始。

3)其實是寫一個內(nèi)部隱藏的state的change log的topic,和一個本地key value表(也就是本計算節(jié)點的state)。failover的時候, 之前的"本地"表丟失沒關(guān)系, 可以沖change log里恢復出失敗前確定commit的所有state。

(1)(2)(3)的topic都只是普通的Kafka topic。只不過(2)(3)由Kafka Stream自動創(chuàng)建和維護(一部分用來支持高層API的(1)也是自動創(chuàng)建)

  • 開始計算時, 在從上游的topic里拿msg之前, Kafka Stream會啟動一個tx, 然后開始才開始計算, 此時tx coordinator會分配一個新的epoch id給這個producer并且以后跟tx coordinator通訊都要附帶這個epochId;

  • Kafka Stream的計算節(jié)點的上游信息都來自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有確定的某種sequenceId, 所以只要計算節(jié)點記錄好自己當前處理的sequenceId, 處理完一個信息就更新自己的sequenceId到下一個record, 且commit到可靠dataStore里, 就絕對不會重復處理上游event, 而只要沒有commit這個位置則可以無數(shù)次replay當前的record; (b.下游去重, c. 要求觸發(fā)本次計算的event可以replay);

  • 在tx內(nèi)部,每從上游topic里讀一條信息就寫一條信息到記錄consume position的topic里, 每一個state的更改都會更新到本地的state(是一張表)里,且同時寫在隱藏的changelog里; 計算過程中需要往下游發(fā)信息則寫與下游聯(lián)系的topic;

  • 計算結(jié)束后, commit本次的tx, 由Kafka Transactional Messaging來保證本次tx里發(fā)生的所有(1)往下游發(fā)的消息, (2) 記錄input event stream的消費進度,(3)所有的state的所有更新是一個原子操作, 由于結(jié)果都成功寫入kafka topic,所以達到計算結(jié)果的高可用性 (a.要求結(jié)果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復);

  • 計算過程中出現(xiàn)failure(比如機器掛了), 那么當計算重啟,會重新運行initTransactions來注冊tx, 此時tx coordinator會分配一個新的epoch id給此producer, 并且從此以后拒絕老的epoch id的任何commit信息來防止zombie的計算節(jié)點; tx coordinator同時roll back(如果上一個tx已經(jīng)在prepare_commit狀態(tài), 繼續(xù)完成transaction, 具體看下邊Transactional Messaging這個章節(jié)); 如果rollback,那么input的處理進度, 狀態(tài)的更改和往下游發(fā)送的信息都會rollback, 那么計算可以重新開始,就好像沒有上次fail的失敗一樣; 如果上一個tx已經(jīng)prepare_commit, 那么完成所有信息的commit; 此時當initTransactions返回,當前計算會接著上一個tx完成的進度繼續(xù)計算;(e. state需要在replay event的時候rollback到處理event之前時的狀態(tài))

Idempotent producer

冪等producer主要解決這么一個問題: Kafka的消息producer, 也就是往Kafka發(fā)消息的client 如果不冪等, 那么因為Kafka的接受消息的broker和producer之間在什么是“重復信息”上沒有共識的話,則broker無法分辨兩個前后一模一樣的消息, 到底是producer的本意就是要發(fā)兩次,還是由于producer的重發(fā)(比如:producer在收到broker的"接受成功"的ack之前就掛了,所以不知道之前的消息有沒有成功被broker接收, 因此重啟后重發(fā)了此信息)。此時broker只能選擇接受消息,這就造成了同一個消息的多次接受。

同時我們也要解決zombie producer的問題: 如果我們保證producer高可用, 重啟我們認為fail掉的producer, 那么其實沒死的zombie producer的信息則會造成,重復且亂序的發(fā)布消息。(由于zombie的存在, 會有2個producer同時發(fā)布我們以為只有一個producer會按順序發(fā)布的消息,這樣就無法保證順序: 比如zombie在發(fā)送A, B, C...的時候, 新啟動的producer也開始發(fā)送A, B, C... )

Kafka的解法:

10用一個producer指定的固定不變的transactional.id(非自增id,叫producerName可能更好)來識別可能會在不同機器上重啟的同一個logical producer; 相當于給producer起了一個logical name。

2)注冊transaction.id來開始session, 而在session里此tx發(fā)來的消息都可以通過維護一個sequenceid來dedup。

3)非正常結(jié)束tx的話, 比如機器掛了, producer重啟, 那么就會再次注冊自己的transaction.id, 則標志前一個session失效, 而所有屬于上一個session的信息全部作廢(具體看下一節(jié)Atomic and Isolation), 這樣就可以做到producer的zombie fencing

(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])

Atomic and Isolation

1)Producer Zombie fencing: 注冊transaction.id會申請高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何寫操作 (e.g. tx coordinator關(guān)閉tx);. Zombie fencing in https://www.confluent.io/blog/transactions-apache-kafka/

2)多個Tx coordinator跑在kafka broker里, 寫是按照tx.id hash給不同的Tx coordinator, 每一個tx coordintor負責subset的transactionlog的partition。

這樣保證同一個logic produce啟動的tx必定連接同一個tx coordinator。tx coordinator保證所有的狀態(tài)都在的高可用高一致性的寫在tx log里。(且用queue zombie fencing來保護自己的狀態(tài)一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再聯(lián)系,那么可以一直跟broker發(fā)垃圾信息... P39in [26])

3)Producer注冊新的tx之后,在給任意topic的任意partition發(fā)消息之前,先跟tx coordinator注冊這個partition。

4)當寫完畢,producer給tx coordinator發(fā)commit,tx coordinator執(zhí)行2PC,在transaction log里寫prepare_commit, 這樣就一定會commit了,因為producer 通知commit就代表所有的寫已經(jīng)寫成功了, 這一步其實只是把決定記下來。

5)Tx coordinator聯(lián)系所有的注冊過的topic的partition,寫一個commit marker進去。

6)當所有的marker寫完,在transaction log里記錄commit complete。

7)注意:當在第一步tx coordinator在發(fā)現(xiàn)新的重復transaction.id來注冊時,會檢查有沒有相同的transaction.id下未關(guān)閉的tx,有的話發(fā)起rollback,先在transaction log里記下rollback的決定,然后聯(lián)系所有的注冊過的topic的partition, 寫入一個ABORT marker。

而如果此tx的狀態(tài)已經(jīng)時prepare_commit了,那么有可能tx coordinator在下邊第6步聯(lián)系所有partition來commit中間掛掉了,那么要接著完成這個commit過程;即roll forward而不是roll back。

8)Read_commit等級的consumer需要等待transaction有結(jié)果,consumer library讀到任何與Transactional Messaging相關(guān)的信息,就開始進入cache階段,并不會運行任何consumer端的計算,只有當讀到commit mark,則把cache住的record依次交給consumer端的計算,而當讀到ABORT mark,則把相關(guān)tx的record全部filter掉。注意: pending的tx會block所有Read_commit等級的consumer對topic的讀。

在保證兩兩節(jié)點之間的EOMP來實現(xiàn)整個流的EOMP的模型里,如果我們某一個或多個節(jié)點的狀態(tài)和計算結(jié)果根本不記錄在高可用DataStore里,我們還是可以實現(xiàn)EOMP, 我們只需要(1)replay這個節(jié)點的上游來重算這個節(jié)點的狀態(tài)和發(fā)給下游的計算結(jié)果, (2)下游去重。

如果上游也沒計算結(jié)果記, 那么replay上游的上游即可, 如果上游的上游也沒記....一直追溯到記錄了計算結(jié)果的上游節(jié)點即可。

如果一直都沒有failure,那么比起Dataflow和Kafka Stream那種記錄所有計算結(jié)果的模型 我們少記錄一些額外的計算結(jié)果和狀態(tài)就減少了很多系統(tǒng)負載; 這就是重算與記錄計算結(jié)果模型的結(jié)合。

重算與記錄計算結(jié)果的結(jié)合

考慮 A,B,C, D 4個節(jié)點, A的計算結(jié)果傳給B, 而B則把一部分計算結(jié)果給C一部分給D, 如果B沒有記錄自己的output, 則Cfail掉之后需要replay上游的input,這就需要B的一些重算來重新制造C所需要的input, 即使B的input(即A)記錄了所有的計算結(jié)果, 我們還需要"恰巧可以產(chǎn)生這些歷史計算結(jié)果的"B的歷史狀態(tài),才能重算出C所需要的input。(所以B必須保存歷史狀態(tài)或者用某種方法重建自己的歷史狀態(tài)才能保證可以重算C所需要的input)

如果C的狀態(tài)也丟失了, 那么對上游的負擔則更重些, B需要重新計算來提供所有的歷史計算結(jié)果(即C的所有歷史input)來讓C重建自己的歷史狀態(tài)。

可以看到, 任意一個節(jié)點的某狀態(tài)S(n+1)是:

  • 上一個歷史狀態(tài)S(n), 和;

  • 從歷史狀態(tài)S0建立開始所接收到的信息M(n)。

同時作為輸入而得到的輸出; 而這個過程中又會向下游發(fā)出一些計算結(jié)果O(n+1)。

所以M(n) + S(n) => S(n+1) + O(n+1), 當下游crash重啟需要O(n+1)時, 我們則有2種選擇:

  • 1、記錄O(n+1);

  • 2、記錄O(n+1)但是記住, O(n+1)是根據(jù)什么數(shù)據(jù)生成的。

1是記錄計算結(jié)果, 2是重算。兩者并用的好處在于, 1可以異步batch進行而不需要節(jié)點必須等待O(n+1)記錄成功才往下游發(fā)送O(n+1)。而2保證了當1還沒有完成時, 系統(tǒng)也有足夠的信息可以重建O(n+1)。

這是一個鏈式反應(yīng), 當重算需要M(n)和S(n)時, 而如果M(n)并沒有存則需要上游重算M(n), 上游還沒存這些重算M(n)的信息則需要replay上游的上游來重算這些信息,這就是所謂的鏈式反應(yīng)...。最極端的情況是什么都沒存,那么需要從頭開始跑我們的stream程序。

可以看到, 如果沒有存中間計算結(jié)果或者狀態(tài), 那么當這個數(shù)據(jù)被下游重算需要的時候, 需要我們重算這個數(shù)據(jù), 這就會產(chǎn)生對上游的計算結(jié)果或者狀態(tài)的需求, 這就要求我們?nèi)绻淮嫦逻@些數(shù)據(jù), 我們就需要記住計算這個數(shù)據(jù)的數(shù)據(jù)依賴圖, 所以要么把"中間"數(shù)據(jù)和狀態(tài)存起來待用, 要么記住他們的數(shù)據(jù)依賴圖。

而這些記錄的中間結(jié)果只有當對其的所有依賴從計算圖中消失時, 我們才可以垃圾回收/刪除這些數(shù)據(jù)(比如所有基于某狀態(tài)的計算結(jié)果都已經(jīng)存下來了, 那么這個狀態(tài)的數(shù)據(jù)就可以刪除, 再比如某計算結(jié)果所引發(fā)的下游計算結(jié)果和狀態(tài)都已經(jīng)存下來了, 那么此計算結(jié)果的數(shù)據(jù)就可以刪除了),從而不會造成儲存數(shù)據(jù)爆炸。

這, 也就是Spark Streaming的解法。

Spark

Spark有三種Stream...

(1)快要被deprecate掉的DStreaming [10, 14]

(2)新一代為了彌補和Flink之間差距的, 支持event time的Structural Streaming(可惜還是有很多不足, 具體的不同和哪里有不足, 要留到對比各個系統(tǒng)對event time和windows操作的支持的對比, 也就是下篇來詳細描述了) [12,13]

(3)實驗中的Continuous Streaming(Spark Continuous Processing) [11, 20]

(3)還在實驗狀態(tài), 基本上是把底層都改掉來使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似還有很多問題需要解決所以目前不支持EOMP, 這里不多聊了。

根據(jù)Structural Streaming的論文[12], (2)和(1)使用了相似的方法來保證EOMP, 但是其實作者發(fā)現(xiàn)(2)比起(1)還是有一些性能上的改進[21],但是總體原則還是和(1)類似的利用一個重算關(guān)系圖lineage來維護各個狀態(tài)計算結(jié)果的依賴關(guān)系, 通過異步的checkpoint來截斷l(xiāng)ineage也就是各個節(jié)點狀態(tài)和計算結(jié)果復雜的關(guān)系(比如一個數(shù)據(jù)如果已經(jīng)checkpoint了, 那么它所依賴的所有狀態(tài)和計算結(jié)果都可以在關(guān)系圖里刪去, 因為replay如果依賴于這個數(shù)據(jù), 那么使用它的checkpoint即可, 而不需要知道這個數(shù)據(jù)是怎么算出來的, 如果還沒checkpoint成功, 則需要根據(jù)數(shù)據(jù)依賴圖來重算這個數(shù)據(jù)), 像這樣利用checkpoint, 就可以防止lineage無限增長。

但是維護關(guān)系圖需要利用micro-batch來平衡"關(guān)系維護"造成的cost, 否則每一條信息的process都產(chǎn)生一個新狀態(tài)和新計算結(jié)果的話, 關(guān)系圖會爆炸式增長(用micro-batch, 可能1000條信息會積累起來當作"一個信息"發(fā)給下游, 只需要在關(guān)系圖里記錄一個batch-id即可, 而不是1000個msg id, 對與狀態(tài)來說也是這樣,處理1000個msg之前的狀態(tài)分配一個id, 處理這1000個信息之后的狀態(tài)一個id, 而不需要記錄1000個狀態(tài)id, 同時他們之間的聯(lián)系線也從1000條降低為1條。這樣就大大減小了關(guān)系圖維護的負擔。

但這樣造成的結(jié)果是micro-batch會造成很高的端到端處理的latency, 因為micro-batch里的第一條信息要等待micro-batch里的最后一條信息來了之后才能一起傳給下游。

而這個等待是疊加的,當stream的層數(shù)越深,每一層的micro-batch的第一條信息都需要等待最后一條信息被處理完畢,相比在每一層都毫不等待,micro-batch造成的額外latency就會疊加式的增高。

注意, Spark Structured Stream提供了一種continuous mode[11,12,13,20]來替代micro-batch,解決了latency的問題,但是目前支持的operator很少,且不能做到exact once msg processing, 這里不多加討論了(不過將來有望做成和flink一樣的模式, 畢竟也用的Chandy-Lamport Distributed Snapshot algorithm) : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]

spark的micro-batch會造成嚴重的latency問題, 而Dataflow和Kafka Stream的方案要求記錄每一個計算結(jié)果, 則會在大大增加系統(tǒng)負擔的同時也會有不小的latency附加。那么有沒有一種方法可以不記錄所有中間計算結(jié)果, 并且也不使用micro-batch呢?

我們來看看flink的藝術(shù);

Flink

如果我們不儲存流系統(tǒng)中間節(jié)點的計算結(jié)果在高可用DataStore里, 也不想維護復雜的數(shù)據(jù)依賴圖(需要micro-batch的根源), 那么當一個節(jié)點fail掉需要replay上游的input的時候,上游就必定需要replay自己的上游,且自己的狀態(tài)要rollback到?jīng)]有接收這些要replay的消息之前的狀態(tài);對上游的上游就有相同的要求,那么最終所有節(jié)點的上游最終會歸向數(shù)據(jù)源節(jié)點,并要求"重新replay"。總而言之2個要求:

  • 數(shù)據(jù)源節(jié)點可以replay, 并產(chǎn)生層層的蝴蝶效應(yīng)的"每個節(jié)點對上游要求的replay";

  • 所有的計算節(jié)點的狀態(tài),要恢復到?jīng)]有接收到"上游所replay的消息"之前的樣子(所以replay后可以回到現(xiàn)在的狀態(tài),且重新生成下游所需要的input, 即當前節(jié)點在處理這些replay消息時產(chǎn)生的計算結(jié)果集)。

全局一致點和全局一致狀態(tài)集

為了方便討論,我們定義2里所提到的global的狀態(tài)為一個全局穩(wěn)定點; 顯然,如果我們一條消息一條消息的處理,數(shù)據(jù)源節(jié)點等待直到所有流節(jié)點處理完這條消息所產(chǎn)生的蝴蝶效應(yīng)信息之后,才發(fā)出下一個消息B0,那么在消息B要發(fā)出但是沒發(fā)出之前,所有的節(jié)點的狀態(tài)就滿足我們對全局穩(wěn)定點的需要。

比如當我們持續(xù)處理B1,B2,B3...B100, 這時一個節(jié)點fail掉了,那么我們只要流系統(tǒng)的所有節(jié)點rollback他們的狀態(tài)到發(fā)出B0前的"全局穩(wěn)定點", 整個系統(tǒng)的計算和狀態(tài)就會干凈的回道任何節(jié)點都不曾被任何B0-B100所影響的狀態(tài), 那么此時從數(shù)據(jù)源節(jié)點replay B0, B1, B2... B100 成功, 這些消息就"exactly once process"掉了。所以,我們找到了第一個不需要micro-batch, 也不需要記錄中間節(jié)點計算結(jié)果,就能實現(xiàn)EOMP的方法:

每n條信息, 或者每一段時間, 數(shù)據(jù)源節(jié)點(或者流系統(tǒng)的第一個入口節(jié)點)停止向下游發(fā)送任何信息, 直到所有節(jié)點報告說有關(guān)這條信息的所有派生信息(由于這條信息引起的第一個計算節(jié)點的計算結(jié)果會發(fā)送給它的下游, 下游的計算結(jié)果又會發(fā)送給它的下游...等等這些都是派生信息)都已經(jīng)處理完畢, 此時把所有節(jié)點的狀態(tài)checkpointing在高可用DataStore里, 建立一個全局穩(wěn)定狀態(tài)集(由流系統(tǒng)中每個計算節(jié)點各自的全局穩(wěn)定狀態(tài)所組成), 數(shù)據(jù)源才開始繼續(xù)發(fā)送信息...這樣, 任意的節(jié)點fail掉, 我們只要在別的機器上重啟這個計算節(jié)點并download之前checkpoint的狀態(tài),流系統(tǒng)的所有節(jié)點也rollback到上一個全局穩(wěn)定狀態(tài)即可, 由于數(shù)據(jù)源發(fā)送數(shù)據(jù)的進度也屬于全局穩(wěn)定狀態(tài)集中的一員, 所以當數(shù)據(jù)源rollback自己的狀態(tài),則可以開始replay 全局穩(wěn)定點checkpoint之后才發(fā)送的信息,而此時所有節(jié)點都已經(jīng)rollback到一個"從沒見過這些信息和它們的派生信息"的狀態(tài)了,整個系統(tǒng)就好像從來沒有見過這些信息一樣, 從而實現(xiàn)即使failure發(fā)生,我們的系統(tǒng)也可以實現(xiàn)EOMP。

更進一步, 我們來看如何不停住數(shù)據(jù)源的信息接收,我們所需要處理的問題。

1)任意時間點的全局狀態(tài),都不是全局穩(wěn)定點: 如果所有節(jié)點都不等待后續(xù)節(jié)點有沒有處理完信息, 那么任意時間點, 在流的中間節(jié)點建立全局穩(wěn)定狀態(tài)的時候 ,流上游的節(jié)點已經(jīng)開始處理新的信息, 它們的全局穩(wěn)定狀態(tài)早已被新的信息所影響了, 而下游可能還沒收到建立全局穩(wěn)定狀態(tài)所需要的信息。

2)隨意指定的全局穩(wěn)定狀態(tài)集可能根本不存在, 比如數(shù)據(jù)源連續(xù)給A和B發(fā)出x,y兩條信息, 而A和B則需要把計算結(jié)果都發(fā)送給C,如果我們想定義全局穩(wěn)定狀態(tài)為所有節(jié)點"處理完x相關(guān)的消息之后", 但是"處理完y相關(guān)的信息之前"的狀態(tài)。

那么考慮這樣一個運行順序: A處理完x向C發(fā)出x-A, B處理完x, y后向C發(fā)出x-B, y-B, 然而由于網(wǎng)絡(luò)和處理速度的因素, C在還沒有收到x-A的情況下就處理完了x-B, y-B, 所以C的一個"干凈"的從未被y信息影響的狀態(tài),但包含了所有需要的x信息的穩(wěn)定狀態(tài), 在C的狀態(tài)變遷過程中是從來不存在的 (即, 處理完x-A和x-B,但是沒有處理y-B時的狀態(tài))。

問題1意味著我們不能用物理時間來建立全局一致狀態(tài)集, 那么既然流的不同節(jié)點接收到數(shù)據(jù)源任意消息x的派生消息的時間不同, 那么只要我們能讓所有節(jié)點分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有節(jié)點就可以在處理完x的派生消息之后把本地狀態(tài)復制一份儲存在高可用DataStore里, 作為全局一致狀態(tài)集的一員。

問題2意味著即使允許計算節(jié)點連續(xù)處理input而不必等待所有下游建立好全局一致狀態(tài)才發(fā)下一個計算結(jié)果, 計算節(jié)點也不能盲目的不加考慮的處理上游信息, 我們要使得計算節(jié)點的狀態(tài)變遷過程中, 至少全局一致狀態(tài)是可以出現(xiàn)的。

Flink的解法就是由一個高可用的coordinator連續(xù)發(fā)出不同的stage barrier(比如先給所有src發(fā)1,然后1分鐘后發(fā)2,2分鐘后發(fā)3..... 如此增長), 夾雜在發(fā)給數(shù)據(jù)源發(fā)出的數(shù)據(jù)流里, 所有的節(jié)點都必須忠實的轉(zhuǎn)發(fā)這個stage barrier, 這樣所有的節(jié)點的:

  • input都分為了接收到某barrier(設(shè)為barrier-a)之前的信息和收到barrier-a之后的信息,;

  • 所有的發(fā)給下游的計算結(jié)果也分為自己發(fā)出barrier-a之前的信息和發(fā)出barrier-a之后的信息;

  • 所有的狀態(tài)變遷也分為,用所有接收到barrier-a之前的信息, 所建立的狀態(tài), 和收到barrier-a之后被新的信息影響了的狀態(tài)。

那么如果所有節(jié)點都遵循2個原則:

  • 只用"接收到barrier-a之前的所有信息", 來建立自己的本地狀態(tài),并備份在高可用DataStore里;

  • 只使用"接收到barrier-a之前的所有信息"來計算結(jié)果并發(fā)送給下游之后, 才轉(zhuǎn)發(fā)barrier-a; 然后才開始處理"接收到barrier-a之后的信息"; 這樣就保證了自己在往下游發(fā)送barrier-a之前所發(fā)的所有計算結(jié)果, 都沒有被自己所收到的barrier-a之后的新消息所影響(自己發(fā)送的barrier-a之前的計算結(jié)果只和自己接收的barrier-a前的input集合相關(guān))。

而當所有的節(jié)點都保證"自己發(fā)送的barrier-a之前的計算結(jié)果只和自己接收的barrier-a前的input集合相關(guān)", barrier-a就成了系統(tǒng)系統(tǒng)的分隔點,而所有節(jié)點遵循原則-1所建立的本地狀態(tài)備份, 也絕對沒有被數(shù)據(jù)源發(fā)出的在barrier-a之后的信息和它們的派生信息所影響; 而這些所有本地狀態(tài)備份的全集,則組成了全局一致狀態(tài)集。

一個細節(jié), 當一個節(jié)點只有一個input channel的時候, 只要按順序處理input信息即可; 而當一個節(jié)點有多于一個input channel的時候, 一個input channel的barrier-a已經(jīng)接收到, 但是其他channel的barrier-a還沒有收到怎么辦呢?

  • 從收到barrier-a的channel接收新的信息并處理可行么? 顯然不行, 這樣違反了原則-1, 因為"barrier-a之前的信息全集"還沒有湊齊(其他channel的barrier-a還沒有收到), 此時如果處理了任何屬于barrier-a后的"新"信息, 我們就再也無法在狀態(tài)變遷中得到一個"干凈"不受barrier-a后的"新"信息所影響的狀態(tài)了, 這意味著我們必須block 這個已經(jīng)收到barrier-a的channel;

  • 我們可以向下游轉(zhuǎn)發(fā)barrier-a么? 顯然也不行, 這樣違反了原則-2, 理由相同, 我們還沒有收到"barrier-a之前的信息全集", 而從其他channel收到barrier-a之前還收到其他信息的話, 它們所產(chǎn)生的計算結(jié)果也必須在轉(zhuǎn)發(fā)barrier-a之前發(fā)送。

由1,2就很清楚可以推理出flink的算法了:

  • 收到任意input channel 的barrier-a之后, block此channel;

  • 收到所有input channel的barrier-a之后, 把當前狀態(tài)checkpoint并備份到高可用的DataStore里; (這里可以做到異步checkpoint并不會影響latency, 詳細介紹看后邊的異步checkpointing這一節(jié));

  • 收到所有input channel的barrier-a之后, 并且處理完所有此前收到的信息并向下游發(fā)送計算結(jié)果完畢后, 向所有和自己相連的下游轉(zhuǎn)發(fā)barrier-a;

  • 當所有節(jié)點都備份完成,我們就得到了一個全局一致狀態(tài)集, 或者說全局一致狀態(tài)快照; 系統(tǒng)的穩(wěn)定點就進步到了barrier-a, 如果下一個barrier是barrier-b, 那么在得到barrier-b的全局一致狀態(tài)集之前, 如果系統(tǒng)出現(xiàn)failure, 我們就可以通過重啟所有計算節(jié)點的方式, 讓所有節(jié)點reload barrier-a所記錄的狀態(tài)集, 從而實現(xiàn)把所有節(jié)點的狀態(tài)rollback到"上一個全局一致"的狀態(tài), 使得流系統(tǒng)可以重置到好像根本沒有看到過任何barrier-a到barrier-b之間的信息的一樣, 然后重跑這段信息;

  • 通過干凈的rollback了可能造成的重復處理的痕跡, 使得所有信息的效果都只發(fā)生了一次, 所以我們得到了一個端到端的EOMP系統(tǒng)。

異步增量checkpointing

異步checkpoint可以使得, checkpoint本身不會block流本身的計算,增量checkpoint避免了,每次一點小變動都需要checkpoint全部的state,可以節(jié)省計算機資源(比如網(wǎng)絡(luò)壓力)

flink和spark這種需要checkpoint的系統(tǒng)都可以做到異步增量checkpoint, 且這個技術(shù)也很成熟了, 本文只選flink的方法[35]來簡單說明一下 , Spark的可以看[21]

1、Flink的異步增量checkpointing

Flink使用RocksDB 作為本地狀態(tài)儲存, RocksDB本質(zhì)上就是一個LSM tree, 對狀態(tài)的寫會寫在內(nèi)存的memtable, 一般是一個linked hashmap, 寫到一定大小就存到硬盤里變成sstable(sorted-string-table), 不再更改。

此后會開一個新的memtable來接受新的寫。這樣會按歷史時間來生成很多小文件, 讀的時候先讀memtable,如果里邊有想要的key對應(yīng)的value,必定是最新的,否則按歷史時間順來查sstable(sstable有自己的cache, 所以未必需要讀硬盤)。

對于flink來說, 當需要checkpoint的時候, 只需要把當時的memtable寫在硬盤里即可, 這是唯一一個需要block住當前計算的操作, 此后也只需要把從上個checkpoint開始, 新生成的sstable異步發(fā)送到高可用的遠程文件系統(tǒng)即可(比如S3, HDFS)。這樣就做到了異步(發(fā)送到高可用datastore是異步執(zhí)行的),和增量(只發(fā)送新增文件)。

注意, 由于太多的小文件的sstable會造成讀的性能問題, 所以RocksDB需要異步的compact這些小文件到一個大文件, 對此flink也需要做出一些應(yīng)對, 詳見[35], 例子給的非常清楚,這里不再贅述。

系統(tǒng)內(nèi)與系統(tǒng)外

以上的討論都是關(guān)于中間件內(nèi)部如何實現(xiàn)EOMP, 但是由于end to end argument的影響, 中間件提供的保證再多, 沒有source的支持, 它也無法區(qū)分source(流系統(tǒng)的event來源)發(fā)來的2個內(nèi)容一樣的event, 到底是"同一個"信息的重發(fā), 還是"本意"就是想要中間件處理兩次的兩個"不同"event; 對sink(流系統(tǒng)計算結(jié)果的去處)來說,由于failure造成的重算,zombie的存在, 則需要sink能夠"融入"到流系統(tǒng)的EOMP體系中去。

對于source的要求基本就是重發(fā)和對消息提供能區(qū)分到底是不是一個event的eventId,一般就是Kafka那樣就OK, 比較簡單就不多討論了; 這里著重聊一下sink; Sink主要有兩種手段來配合流系統(tǒng)中間件的EOMP, 冪等和2階段提交(2PC)

1、冪等Sink

最簡單的來配合流系統(tǒng)EOMP的策略就是冪等, 由于是外部系統(tǒng), 所以重用我們的"兩節(jié)點EOMP模型"基本不可能, 因為基本不可能用一個tx來把要寫外部系統(tǒng)的操作和記錄已經(jīng)處理過這個操作用一個原子tx來commit, 這也是流系統(tǒng)為什么要支持2PC的原因。

由于冪等保證對同一個計算結(jié)果寫多次和寫一次一樣, 所以無論是什么流系統(tǒng), 無論系統(tǒng)是重算型, 還是記錄計算結(jié)果來避免重算型, 冪等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠冪等的sink來實現(xiàn)EOMP。

冪等的問題在于無法應(yīng)對需要重算, 且計算可以是non-deterministic的情況, 詳見: 后邊(Latency, 冪等和non-deterministic)一節(jié)的討論; 這也是Spark Streaming, 使用冪等sink的Flink無法支持non-deterministic計算的本質(zhì)原因。

相比之下, dataflow總是記錄計算結(jié)果來避免重算(即使重算也只會有一次重算的結(jié)果會影響下游), Kafka Stream支持tx可以保證只有一次計算結(jié)果可以被commit到Kafka Stream里, 如果sink也只讀committed上游kafka stream, 則可以保證即使計算是non-deterministic的, 也只會有唯一commit的計算結(jié)果被讀到(其他的計算結(jié)果沒有commit marker而被Kafka data comsume API忽略)從而影響sink的外部系統(tǒng)。

而Flink的2PC sink也做到了重算會直接導致sink的外部系統(tǒng)可以配合flink的global rollback, 所以只會有一次的計算結(jié)果被外部系統(tǒng)接受(commit)。

所以Spark Stream在4個流系統(tǒng)里, 是唯一一個完全無法支持non-deterministic計算的流系統(tǒng)。

2、Flink獨特的2PC Sink

2PC對很多熟悉數(shù)據(jù)庫的人來說應(yīng)該是臭名昭著了, 這是很復雜和很容易造成問題而需要極力避免的東西、但是時代在變化, 2PC在新時代也有了彌補自己問題的很多解法了,這里簡單介紹一下。

2PC協(xié)議由一個coordinator,和很多參與2PC的異構(gòu)系統(tǒng)組成,發(fā)起2PC的時候 coodinator要求所有人pre-commit,這是2PC的第一個P(phase),如果所有tx參與者都可以pre-commit并告知coordinator,則coordinator告訴所有人commit,否則告訴所有人abort,這是2PC的第二個P(phrase)

2PC最大的問題是它是一個blocking協(xié)議,blocking的點在于當coordinator和某一個2PC的參與者A掛了,其他參與者無法作出任何決定,只能等待coordinator或者死掉的那個參與者A上線,因為這時所有其他參與者都無法判斷以下兩種情況到底那種發(fā)生了,從而無法決定到底是commit還是abort。

  • coodinator已經(jīng)收到了所有人的pre-commit并告知參與者A commit,A commit后就掛了;

  • A并不能pre-commit,但是coodinator在告訴所有人需要abort之前就掛了。

在情況1. 所有其他參與者都應(yīng)該commit,在情況2,所有其他參與者都應(yīng)該abort;由于無法辨別到底是情況1. 還是2. 所有其他參與者必須block等待,這對很多數(shù)據(jù)庫來說意味著為此tx加的鎖都不能放掉,從而影響數(shù)據(jù)庫的其他不參與2PC的操作,甚至鎖死整個數(shù)據(jù)庫。而如果coordinator或者參與者A無法再上線或者狀態(tài)丟失,則需要非常復雜的人工操作來解決其他參與者應(yīng)該如何決策的問題。

雖然2PC有各種問題, 但是在consensus協(xié)議早已經(jīng)成功分布式系統(tǒng)的基石, 各種開源和標準實現(xiàn)可以被輕松獲得的今天, 用consensus協(xié)議來彌補2PC的問題已經(jīng)成為一個"已經(jīng)解決的問題", 如[25]4.2 The Paxos Commit Algorithm 中所說:

We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…

解決2PC問題的關(guān)鍵在于保持coordinator狀態(tài)的高可用性, 那么只要coordinator保證把commit或者abort的決定記錄在一個consensus cluster里即可,比如etcd或者zookeeper,這樣coordintor死了,重啟從consensus cluster里恢復狀態(tài)重新告知所有參與者到底應(yīng)該commit還是abort即可; 這也是為什么各種流行的分布式系統(tǒng)實現(xiàn)分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...

3、Flink的2PC Sink

2PC的第一個P的關(guān)鍵在于所有tx參與者在不知道其他參與者狀態(tài)的情況下,承諾未來一定可以前進commit成功或者干凈的回退abort。當前的tx參與者準備好了,且同意commit,2PC的第二個P的關(guān)鍵點在于整體系統(tǒng)的”唯一決定”統(tǒng)一的推進或者回退各個參與者的狀態(tài)。

而Flink的global state其實可以看做一個2PC,當一個節(jié)點收到所有的上游的barrier-n時,這個“契機”可以看做收到了coordinator的可不可以precommit的問詢,而當localstate已經(jīng)在remote 存好之后,當前節(jié)點就可以告訴coordinator它準備好了,這可以看做回復precommit(如果此節(jié)點在發(fā)給precommit)。

而當所有的節(jié)點都通知coordinator“準備好了”之后,coordinator就可以記錄下barrier-n的global state完整checkpoint的這個事實,這相當于一個不需要發(fā)給tx參與者的commit。

這是由于當failover的時候,是由coordinator告訴所有節(jié)點應(yīng)該從哪個checkpoint點來恢復本地狀態(tài),所以各個節(jié)點的localstate到底是commit了還是rollback了,可以完全由“有沒有記錄下barrier-n的global state完整checkpoint成功”這個metadata推算出來,所以也就不需要單獨給各個節(jié)點發(fā)commit/abort信息來讓各個節(jié)點commit或者abort了。

當系統(tǒng)狀態(tài)只涉及到flink的內(nèi)部狀態(tài)時(flink提供的stateApi所提供的statestore), 如果一個某節(jié)點X在回復precommit之后掛了,coordinator還是可以選擇commit,因為組成global state的節(jié)點X的local state已經(jīng)完整的存儲在remote的datastore里了。

但是如果涉及到外部狀態(tài),比如sink需要把計算結(jié)果存儲到一個非flink控制的數(shù)據(jù)庫中去時,flink的sink節(jié)點就相當于這個外部數(shù)據(jù)庫的client,需要連接外部數(shù)據(jù)庫并把數(shù)據(jù)存入外部數(shù)據(jù)庫;要使得外部數(shù)據(jù)庫的狀態(tài)和flink的狀態(tài)保持一致,則需要sink把外部數(shù)據(jù)庫的狀態(tài)引入到flink global state的2PC里,而coordinator在決定commit或者abort的時候,必須通知sink來執(zhí)行外部狀態(tài)的commit或者abort,因為coordinator是不知道外部狀態(tài)到底是什么,也無法簡單的用通知sink從不同的globalstate點恢復來代替2PC的commit/abort通知。

同時sink收到barrier-n時,sink要保證外部數(shù)據(jù)庫里與barrier-(n-1)到barrier-n之間信息相關(guān)的數(shù)據(jù)更改,處于一種“在任何情況下都一定可以commit成功,但是還沒有真的commit,所以外部數(shù)據(jù)庫的消費者不可見這些狀態(tài),且可以rollback的,可進可退的狀態(tài)”。[40]給出了如何用文件實現(xiàn)的一個例子;我這里給出一個如何使用支持transaction的數(shù)據(jù)庫的例子。

首先為了避免產(chǎn)生歧義, 我們定義:

1)flink-precommit ack為 barrier-n流到各個節(jié)點(包括sink), 各個節(jié)點完成local snapshot checkpoint后發(fā)給coordinator的ack, sink則是完成“某個操作”后發(fā)給coordinator的ack, 這個操作需要把外部系統(tǒng)(比如數(shù)據(jù)庫)置于一種, 保證任何情況下都可以服從coordinator的最終決定的狀態(tài), 一個既可以commit(如果coordinator最終決定commit), 又可以rollback(如果coordinator決定abort)的狀態(tài), 且數(shù)據(jù)不為外部系統(tǒng)的consumer所見。

2)定義flink-commit為coordinator收到所有人的pre-commit ack后的的最終commit決定。

3)定義db-commit就是普通的外部數(shù)據(jù)庫的commit。

①當程序開始,sink立刻開一個外部數(shù)據(jù)庫的transaction,當sink收到上游的所有的barrier-1,則立刻db-commit當前transaction然后回復coordinator flink-precommit成功(flink-precommit ack),因為此時如果不db-commit,一旦回復coordinator flink-precommit之后,這個sink掛了,那么外部數(shù)據(jù)庫一般就會自動rollback;此時就算sink在其他機器上重啟,我們也丟失了所有要最終flink-commit的數(shù)據(jù); 而如果這個sink的crash是發(fā)生在coordinator收到所有節(jié)點的flink-precommit ack并最終決定flink-commit之后, 所有其他節(jié)點(比如另外一個sink)的狀態(tài)可能都commit了(所以無法簡單rollback); 而只有此sink的所有數(shù)據(jù)都無法恢復, 這就破壞了global consistency。

②但是上邊我們在flink-precommit階段就db-commit了外部數(shù)據(jù)庫的transaction; 這時會有兩個問題:?

  • 第一, 我們暴露了只是應(yīng)該precommit的數(shù)據(jù)(這些數(shù)據(jù)不應(yīng)被數(shù)據(jù)庫的外部consumer所見);

  • 第二, 如果有一個其他節(jié)點不同意commit而發(fā)給coordinator abort的決定, 那么coordinator則會決定abort, 所以我們的sink則需要服從rollback的決定, 但是我們已經(jīng)db-commit了的數(shù)據(jù), 而一般數(shù)據(jù)庫都不支持rollback已經(jīng)commit的數(shù)據(jù), 這就造成了問題。

為了解決這兩個問題, 這時我們需要設(shè)計一個和外部數(shù)據(jù)庫的數(shù)據(jù)消費者的數(shù)據(jù)“屏蔽協(xié)議”。比如利用一個字段來表示當前數(shù)據(jù)只是“precommit”,所有的外部數(shù)據(jù)庫的讀寫者都應(yīng)該忽略這些數(shù)據(jù)(而只有當這個字段是committed才能讀寫)。

這樣當flink的coordinator通知flink-commit時,我們用另外一個外部數(shù)據(jù)庫的tx來把所有涉及到的precommit的數(shù)據(jù)的這個字段改為committed即可, 這就解決了第一個問題。對于第二個問題來說, 如果最終flink coordinator決定abort, 我們把此字段設(shè)為abort并利用一個異步垃圾回收的程序把所有標記為abort的數(shù)據(jù)清理掉即可。

③這樣設(shè)計的關(guān)鍵是, 即使sink precommit ack之后掛了, 我們要flink-commit的數(shù)據(jù)也不會丟。所以其實flink-precommit ack時, sink把數(shù)據(jù)寫在任何其他可以保證數(shù)據(jù)高可用的地方都行(只要sink fail掉重啟之后還能找到它), 未必需要是同一個數(shù)據(jù)庫的同一個表。如果采取這種策略, 那么在flink-commit時則需要重新把要db-commit的數(shù)據(jù)從存的地方讀出來, 然后重新寫入到真正要寫的數(shù)據(jù)庫并db-commit。

④flink提供了一個TwoPhaseCommitSinkFunction,[40]里有詳細描述如何簡單的extends這個interface來實現(xiàn)一個可以和flink的global consistency配合的sink節(jié)點的邏輯,本文不再贅述。

需要注意的一點是,當sink收到coordinator的flink-commit指令之后,運行sink的db-commit邏輯,在外部數(shù)據(jù)庫的db-commit更改完畢(比如把要commit的數(shù)據(jù)的status的值從precommit改為committed)后,但是flink記住sink已經(jīng)完成commit之前(flink在跑完sink的commit函數(shù)后會記住這個sinki已經(jīng)commit了, 所以不再重復call sink的commit, 否則flink就會一直重試commit), 此時,一旦sink掛了,那么在另外的機器重啟的sink,flink無法得知外部數(shù)據(jù)庫已經(jīng)commit成功了,所以flink會再次重試commit函數(shù)來嘗試commit。從而造成重復commit,這也是[40]中提到的commit必須設(shè)計為冪等操作的原因。

注意1: 可以使用2PC作為sink的關(guān)鍵是, 你的sink可以保證在ack pre-commit之后, 保證無論任何情況都可以成功commit; 這不是說你的sink所連接的外部系統(tǒng)支持tx就可以的, 需要application設(shè)計者根據(jù)情況具體設(shè)計。[1]的P213頁, 就描述了sink是用kafka transaction記錄計算結(jié)果到kafka,但是即使用了transaction也可能丟數(shù)據(jù)的一種edge case。而[41] Kafka 0.11 and newer=>Caveats 里也有提到。

丟失數(shù)據(jù)的原因就在于, kafka sink的默認實現(xiàn):FlinkKafkaProducer011, 在precommit的時候沒有真的commit數(shù)據(jù), 因此當kafka sink fail掉沒有及時重啟, 一旦kafka tx超時, 所有tx里的數(shù)據(jù)都會丟失, 而此時如果coordinator已經(jīng)決定commit就絕不會再重發(fā)數(shù)據(jù)(source也已經(jīng)commit發(fā)出的消息的index),從而kafka sink的此次tx的所有數(shù)據(jù)永久丟失。

這里提供的DB版本的sink實現(xiàn)思路, 在precommit階段就commit數(shù)據(jù), 來保證“無論如何數(shù)據(jù)都不會丟”, 但是用app level的flag屏蔽外部可見; 這樣做的原因就是為了克服類似kafka sink的這種缺陷。

注意2: 使用2PC Sink的Flink應(yīng)該是可以應(yīng)對non-deterministic計算的, 因為一旦failure發(fā)生, 所有之前的狀態(tài)和對sink的寫入都會被rollback; 但是這樣的話, Flink在sink端就變成了micro-batch模型, batch大小取決于發(fā)barrier的頻率; 但是即使這樣, 由于只有sink需要聚集一個batch才能做一次2PC, 但是中間節(jié)點往下游發(fā)送計算結(jié)果還是即算即發(fā)的, 所以比起Spark這種所有中間計算都是micro-batch,micro-batch造成的額外latency會疊加式的增高的模型, 端到端的latency應(yīng)該還是會要小一些。

Latency, 冪等和non-deterministic

利用冪等的sink可以做到實時記錄計算結(jié)果, 達到最小的end to end latency。因為sink根本不需要等待barrier, 來一條計算結(jié)果就向外部系統(tǒng)commit一條記錄就好, 而由冪等保證了就算整個系統(tǒng)開始重算, 在sink端也會表現(xiàn)出每個source端的event只產(chǎn)生了一次效果的結(jié)果。

但是冪等是很難克服non-deterministic計算的。因為non-deterministic計算使得同一個source發(fā)出的event引起千變?nèi)f化的"蝴蝶效應(yīng)" (比如第一次計算event生成的Key是A, 第二次重算生成的Key是B, 如果下一個節(jié)點是partitionByKey, 那么這里的2次計算結(jié)果就會發(fā)送給了完全不同的下游節(jié)點, 考慮幾百次不確定計算引起的不同蝴蝶效應(yīng), 等計算結(jié)果到達各個sink節(jié)時, 計算的key和value甚至結(jié)果的個數(shù)和在sink節(jié)點的distribution都完全不同了, 那么sink也就完全無法利用冪等來屏蔽掉同一個event replay所造成的"蝴蝶效應(yīng)"了)

相比之下, 如果整個流系統(tǒng)的計算都是確定性的, 那么無論在source端replay多少次同一個event, 它所產(chǎn)生的"蝴蝶效應(yīng)"在sink端也必定相同, 則application設(shè)計者則可以很容易設(shè)計出冪等操作來屏蔽掉重復的計算結(jié)果。

如果業(yè)務(wù)里無法去除non-determnistic的計算, 那么你只能選擇Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持冪等的Spark和利用冪等sink的Flink無法支持non-determnistic的業(yè)務(wù)計算。

REFERENCE

  • Stream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications

  • Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems

  • Lightweight Asynchronous Snapshots for Distributed Dataflows

  • Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing

  • Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API

  • The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

  • MillWheel: Fault-Tolerant Stream Processing at Internet Scale

  • Distributed Snapshots: Determining Global States of Distributed Systems (Chandy-Lamport)

  • State Management in Apache Flink R Consistent Stateful Distributed Stream Processing

  • Discretized Streams: Fault-Tolerant Streaming Computation at Scale

  • Continuous Processing in Structured Streaming Design Sketch

  • Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark

  • Structured Streaming Programming Guide 2.4.3

  • Spark Streaming Programming Guide2.4.3

  • Watermarks, Tables, Event Time, and the Dataflow Model

  • Kafka Streams’ Take on Watermarks and Triggers

  • Streams Architecture Kafka

  • Enabling Exactly Once in Kafka Streams

  • Transactions in Apache Kafka

  • Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3

  • State Management in Spark Structured Streaming

  • Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers

  • Big Data: Principles and best practices of scalable realtime data systems

  • Making Sense of Stream Processing

  • Consensus on Transaction Commit

  • Exactly Once Delivery and Transactional Messaging in Kafka=>docs.google.com/documen

  • End-to-End Arguments in System Design

  • Transactional Messaging in Kafka

  • Akka Split Brain Resolver

  • Unreliable Failure Detectors for Reliable Distributed Systems

  • The Weakest Failure Detector for Solving Consensus

  • Exactly once Semantics are Possible: Here’s How Kafka Does it

  • 24/7 Spark Streaming on YARN in Production

  • Monitoring Back Pressure (flink)

  • Managing Large State in Apache Flink: An Intro to Incremental Checkpointing

  • Impossibility of Distributed Consensus with One Faulty Process (AKA, FLP impossibility)

  • Kubernetes in Action

  • Akka:Auto-Downing(DO NOT USE)

  • ZooKeeper: Distributed Process Coordination

  • An Overview of End-to-End Exactly-Once Processing in Apache Flink

  • Kafka producers and fault tolerance

作者丨阿萊克西斯來源丨h(huán)ttps://zhuanlan.zhihu.com/p/77677075

想知道更多?描下面的二維碼關(guān)注我


加技術(shù)群入口(備注:Tech):

免費星球入口:

免費資料入口:后臺回復“666”

朕已閱?

總結(jié)

以上是生活随笔為你收集整理的压箱底总结:流系统端到端一致性对比的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

国产成人一区二区三区在线观看 | 久久国产自偷自偷免费一区调 | 丝袜足控一区二区三区 | a国产一区二区免费入口 | 亚洲国产精品久久久久久 | 久久人人爽人人人人片 | 国产精品成人av在线观看 | 狠狠噜狠狠狠狠丁香五月 | 国产国语老龄妇女a片 | 少妇厨房愉情理9仑片视频 | 全黄性性激高免费视频 | 色综合天天综合狠狠爱 | 亚洲欧美日韩国产精品一区二区 | 日韩av无码中文无码电影 | 暴力强奷在线播放无码 | 国产av剧情md精品麻豆 | 国产亚洲精品久久久久久久 | 亚洲欧美色中文字幕在线 | 精品乱码久久久久久久 | 日韩精品无码一本二本三本色 | 欧美日韩久久久精品a片 | 国精产品一品二品国精品69xx | 精品国产麻豆免费人成网站 | 久久久久成人精品免费播放动漫 | 国产午夜精品一区二区三区嫩草 | 三级4级全黄60分钟 | 亚洲人成影院在线无码按摩店 | 国产精品久久久久久亚洲影视内衣 | 国产精品爱久久久久久久 | 亚洲一区二区三区偷拍女厕 | 人妻少妇被猛烈进入中文字幕 | 亚洲日韩精品欧美一区二区 | 久久午夜夜伦鲁鲁片无码免费 | 国产精品久久久久无码av色戒 | 麻豆蜜桃av蜜臀av色欲av | 日欧一片内射va在线影院 | 无码av岛国片在线播放 | 中文字幕中文有码在线 | 久久综合网欧美色妞网 | 国产区女主播在线观看 | 网友自拍区视频精品 | 久久亚洲精品中文字幕无男同 | 色一情一乱一伦一视频免费看 | 国产人妻精品午夜福利免费 | 日本丰满护士爆乳xxxx | 亚洲成a人片在线观看无码 | 中文精品无码中文字幕无码专区 | 久久精品女人天堂av免费观看 | 中文字幕乱码人妻无码久久 | 亚洲一区二区三区在线观看网站 | 俺去俺来也www色官网 | 97精品人妻一区二区三区香蕉 | 天下第一社区视频www日本 | 国产成人无码午夜视频在线观看 | 未满成年国产在线观看 | 中文字幕无码乱人伦 | 亚洲国产精品成人久久蜜臀 | 两性色午夜免费视频 | 亚洲国产精品成人久久蜜臀 | 久久国产36精品色熟妇 | 亚洲人成影院在线观看 | 久久国产精品萌白酱免费 | 性欧美大战久久久久久久 | 奇米综合四色77777久久 东京无码熟妇人妻av在线网址 | 成人亚洲精品久久久久 | 4hu四虎永久在线观看 | 日日摸日日碰夜夜爽av | 给我免费的视频在线观看 | 国产情侣作爱视频免费观看 | 精品偷拍一区二区三区在线看 | 97精品人妻一区二区三区香蕉 | 久久精品成人欧美大片 | 成年美女黄网站色大免费视频 | 精品国产精品久久一区免费式 | 性色av无码免费一区二区三区 | 国产精品无码一区二区三区不卡 | 亚洲精品成a人在线观看 | 国产香蕉尹人视频在线 | 欧美老人巨大xxxx做受 | 55夜色66夜色国产精品视频 | 鲁一鲁av2019在线 | 中文字幕乱码亚洲无线三区 | 在线观看国产午夜福利片 | 国产香蕉97碰碰久久人人 | 日韩欧美群交p片內射中文 | 中文字幕 人妻熟女 | 无码毛片视频一区二区本码 | 中文字幕乱码亚洲无线三区 | 未满小14洗澡无码视频网站 | 99久久99久久免费精品蜜桃 | 亚洲一区二区三区播放 | 国产精品亚洲а∨无码播放麻豆 | 国产在线精品一区二区三区直播 | 国产熟妇高潮叫床视频播放 | 亚洲精品鲁一鲁一区二区三区 | 中文字幕无码av激情不卡 | 好男人社区资源 | 成熟人妻av无码专区 | 色综合久久久无码中文字幕 | 亚洲一区二区三区偷拍女厕 | 成人av无码一区二区三区 | 日韩人妻无码一区二区三区久久99 | 国产亚洲精品久久久ai换 | 国产特级毛片aaaaaaa高清 | 色综合久久88色综合天天 | 亚洲精品综合一区二区三区在线 | 玩弄人妻少妇500系列视频 | 久久久av男人的天堂 | 国产97人人超碰caoprom | 亚洲欧美日韩国产精品一区二区 | 中文字幕人妻丝袜二区 | 鲁鲁鲁爽爽爽在线视频观看 | 亚洲乱亚洲乱妇50p | 日韩精品久久久肉伦网站 | 2020久久香蕉国产线看观看 | 又大又黄又粗又爽的免费视频 | 婷婷六月久久综合丁香 | 国产精品第一国产精品 | 久久亚洲精品中文字幕无男同 | 亚洲国产午夜精品理论片 | 人妻aⅴ无码一区二区三区 | 无码人妻久久一区二区三区不卡 | 在线观看国产午夜福利片 | 亚洲精品国产精品乱码不卡 | 精品水蜜桃久久久久久久 | 亚洲国产欧美国产综合一区 | 欧洲极品少妇 | 少妇无码一区二区二三区 | 国产午夜亚洲精品不卡下载 | 亚洲狠狠色丁香婷婷综合 | 国产精品va在线播放 | 欧美黑人性暴力猛交喷水 | 中文毛片无遮挡高清免费 | 无码人妻精品一区二区三区不卡 | 国产精品二区一区二区aⅴ污介绍 | 色妞www精品免费视频 | 久久精品国产日本波多野结衣 | 99国产精品白浆在线观看免费 | 中文字幕 人妻熟女 | 亚洲一区二区观看播放 | 免费无码午夜福利片69 | 久久综合色之久久综合 | 免费人成在线视频无码 | 久久精品国产99久久6动漫 | 亚洲理论电影在线观看 | 亚洲精品一区二区三区在线 | 亚洲日韩av一区二区三区中文 | 久久精品99久久香蕉国产色戒 | 夜夜躁日日躁狠狠久久av | 中文字幕人妻无码一夲道 | 午夜嘿嘿嘿影院 | 日韩精品成人一区二区三区 | 午夜肉伦伦影院 | 久久国产精品二国产精品 | 久久国语露脸国产精品电影 | 国产性生交xxxxx无码 | 亚洲国精产品一二二线 | 久久久久亚洲精品中文字幕 | 大地资源网第二页免费观看 | 国产精品福利视频导航 | 亚洲午夜无码久久 | 亚洲 日韩 欧美 成人 在线观看 | 丰满人妻被黑人猛烈进入 | 一本大道伊人av久久综合 | 成人动漫在线观看 | 欧美精品在线观看 | 亚洲欧美国产精品专区久久 | 亚洲欧美日韩成人高清在线一区 | 四虎永久在线精品免费网址 | 动漫av一区二区在线观看 | 999久久久国产精品消防器材 | 国产成人精品三级麻豆 | 久久综合激激的五月天 | 国产成人一区二区三区别 | 国产精品丝袜黑色高跟鞋 | 色综合久久久无码中文字幕 | 奇米影视7777久久精品人人爽 | 国产精品久久久久久久影院 | 亚洲成a人片在线观看无码 | 黑人大群体交免费视频 | 初尝人妻少妇中文字幕 | 99久久婷婷国产综合精品青草免费 | 亚洲欧洲日本无在线码 | 性色欲网站人妻丰满中文久久不卡 | 国产女主播喷水视频在线观看 | 国产午夜无码视频在线观看 | 999久久久国产精品消防器材 | 人人爽人人澡人人人妻 | 国产精品久久福利网站 | 欧美激情一区二区三区成人 | 国产精品亚洲五月天高清 | 国产无遮挡又黄又爽免费视频 | 黑森林福利视频导航 | 久久精品国产日本波多野结衣 | 一个人看的视频www在线 | 欧美大屁股xxxxhd黑色 | 久久综合久久自在自线精品自 | 一个人免费观看的www视频 | 久久精品人妻少妇一区二区三区 | 国产精品a成v人在线播放 | 免费国产成人高清在线观看网站 | 欧美日韩色另类综合 | 四虎国产精品免费久久 | 国产性生交xxxxx无码 | 伊人久久大香线焦av综合影院 | 欧美精品免费观看二区 | 久久99久久99精品中文字幕 | 大色综合色综合网站 | 日日摸夜夜摸狠狠摸婷婷 | 国产av一区二区精品久久凹凸 | 国产精品久久久av久久久 | 精品人人妻人人澡人人爽人人 | 鲁鲁鲁爽爽爽在线视频观看 | 国产成人精品视频ⅴa片软件竹菊 | 天堂久久天堂av色综合 | 中文字幕乱码人妻二区三区 | 欧美日韩久久久精品a片 | 日本护士毛茸茸高潮 | 少妇性荡欲午夜性开放视频剧场 | 国产成人人人97超碰超爽8 | 国产精品久久久久影院嫩草 | 俄罗斯老熟妇色xxxx | 丰满人妻翻云覆雨呻吟视频 | 台湾无码一区二区 | 性史性农村dvd毛片 | 伊人色综合久久天天小片 | 青草视频在线播放 | 日韩欧美群交p片內射中文 | 亚洲精品成a人在线观看 | 最新版天堂资源中文官网 | 成人免费视频视频在线观看 免费 | 国产成人综合美国十次 | 久久无码专区国产精品s | 亚洲天堂2017无码 | a片免费视频在线观看 | 夜先锋av资源网站 | 亚洲色欲色欲欲www在线 | 一区二区三区乱码在线 | 欧洲 | 大色综合色综合网站 | 久久精品国产亚洲精品 | 精品乱码久久久久久久 | 欧洲美熟女乱又伦 | 中文字幕无码免费久久9一区9 | 日本www一道久久久免费榴莲 | 中文字幕乱码人妻二区三区 | 国产激情精品一区二区三区 | 国产精品久久久久无码av色戒 | 亚洲日韩一区二区三区 | 国产口爆吞精在线视频 | 伊人久久大香线蕉亚洲 | 激情人妻另类人妻伦 | 免费看少妇作爱视频 | 久久无码中文字幕免费影院蜜桃 | 女人被爽到呻吟gif动态图视看 | 99er热精品视频 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 思思久久99热只有频精品66 | 亚洲va中文字幕无码久久不卡 | 久久熟妇人妻午夜寂寞影院 | 欧美猛少妇色xxxxx | 思思久久99热只有频精品66 | 无码免费一区二区三区 | 人人超人人超碰超国产 | 欧美日韩色另类综合 | 国精品人妻无码一区二区三区蜜柚 | 亚洲人成网站色7799 | 久久精品国产一区二区三区肥胖 | 久久久中文久久久无码 | 国产午夜无码视频在线观看 | 少妇被粗大的猛进出69影院 | 亚洲 另类 在线 欧美 制服 | 亚洲一区二区观看播放 | 无码av免费一区二区三区试看 | 人妻插b视频一区二区三区 | 精品久久久久久亚洲精品 | 在线观看免费人成视频 | 精品无码一区二区三区的天堂 | 东京热无码av男人的天堂 | 日本免费一区二区三区最新 | 荫蒂被男人添的好舒服爽免费视频 | 又粗又大又硬又长又爽 | 狂野欧美性猛xxxx乱大交 | 欧美日韩一区二区综合 | 无套内谢的新婚少妇国语播放 | 久久亚洲中文字幕无码 | 国产无遮挡吃胸膜奶免费看 | 偷窥村妇洗澡毛毛多 | 亚洲国产欧美日韩精品一区二区三区 | 亲嘴扒胸摸屁股激烈网站 | 2020久久超碰国产精品最新 | www国产亚洲精品久久网站 | 无码毛片视频一区二区本码 | 精品乱子伦一区二区三区 | 精品久久久久久人妻无码中文字幕 | 中文字幕无码乱人伦 | 久久这里只有精品视频9 | 久久熟妇人妻午夜寂寞影院 | 中文字幕无码av激情不卡 | 一本色道婷婷久久欧美 | 欧美精品一区二区精品久久 | 亚洲国精产品一二二线 | www成人国产高清内射 | 九九久久精品国产免费看小说 | 老熟女重囗味hdxx69 | 性欧美牲交xxxxx视频 | 国产真人无遮挡作爱免费视频 | 亚洲人成影院在线无码按摩店 | 精品国产一区二区三区四区 | av无码不卡在线观看免费 | 又大又硬又爽免费视频 | 亚洲精品国产精品乱码不卡 | 国产精品久久久久久久影院 | 狠狠综合久久久久综合网 | 久久天天躁狠狠躁夜夜免费观看 | 久热国产vs视频在线观看 | 亚洲中文字幕在线无码一区二区 | 人妻少妇精品视频专区 | 国产午夜福利亚洲第一 | 国产精品丝袜黑色高跟鞋 | 亚洲色成人中文字幕网站 | 久久99精品久久久久久动态图 | 人人妻人人澡人人爽人人精品浪潮 | 狠狠色噜噜狠狠狠狠7777米奇 | 亚洲 a v无 码免 费 成 人 a v | 亚洲一区二区三区含羞草 | 欧美日韩综合一区二区三区 | 中文字幕中文有码在线 | 国产精品久久久av久久久 | 粉嫩少妇内射浓精videos | 国产无套内射久久久国产 | 清纯唯美经典一区二区 | 樱花草在线播放免费中文 | 红桃av一区二区三区在线无码av | 中文字幕乱码人妻二区三区 | 无码av免费一区二区三区试看 | 国产精品久久国产精品99 | www一区二区www免费 | 国产无套内射久久久国产 | 最近的中文字幕在线看视频 | 波多野结衣aⅴ在线 | 精品乱子伦一区二区三区 | 成人女人看片免费视频放人 | 免费无码午夜福利片69 | 国产高清不卡无码视频 | 亚洲伊人久久精品影院 | 中文字幕无码日韩专区 | 国产色精品久久人妻 | 澳门永久av免费网站 | 国产人妻精品一区二区三区 | 国产真实乱对白精彩久久 | 夜夜影院未满十八勿进 | 一区二区三区乱码在线 | 欧洲 | 粉嫩少妇内射浓精videos | 亚洲中文字幕无码中字 | 国产成人亚洲综合无码 | 人人超人人超碰超国产 | 色综合久久久久综合一本到桃花网 | 无码一区二区三区在线观看 | 野狼第一精品社区 | 国产成人午夜福利在线播放 | 亚洲熟妇色xxxxx亚洲 | 粗大的内捧猛烈进出视频 | 欧美熟妇另类久久久久久不卡 | 中文字幕日产无线码一区 | 国产特级毛片aaaaaa高潮流水 | √8天堂资源地址中文在线 | 国产绳艺sm调教室论坛 | 久久久婷婷五月亚洲97号色 | √天堂资源地址中文在线 | 免费无码午夜福利片69 | 成人影院yy111111在线观看 | 国产人成高清在线视频99最全资源 | 国精品人妻无码一区二区三区蜜柚 | 国产成人午夜福利在线播放 | 午夜精品一区二区三区的区别 | 日日摸夜夜摸狠狠摸婷婷 | 亚洲va欧美va天堂v国产综合 | 日韩精品无码一本二本三本色 | 青草青草久热国产精品 | 中文字幕av日韩精品一区二区 | 无遮挡国产高潮视频免费观看 | 久久亚洲a片com人成 | 国产无遮挡又黄又爽又色 | 亚洲天堂2017无码中文 | 亚洲国产成人a精品不卡在线 | 欧洲美熟女乱又伦 | √天堂资源地址中文在线 | 国产片av国语在线观看 | 青草视频在线播放 | 网友自拍区视频精品 | 日韩人妻无码中文字幕视频 | 成人影院yy111111在线观看 | 中文字幕 亚洲精品 第1页 | 欧洲精品码一区二区三区免费看 | 国产亚洲精品久久久闺蜜 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 国产精品资源一区二区 | 小sao货水好多真紧h无码视频 | 久久99久久99精品中文字幕 | 欧美丰满老熟妇xxxxx性 | 水蜜桃色314在线观看 | 综合激情五月综合激情五月激情1 | 日本va欧美va欧美va精品 | 欧美人与善在线com | 成人免费视频视频在线观看 免费 | 亚洲精品久久久久中文第一幕 | 小sao货水好多真紧h无码视频 | 国产无遮挡又黄又爽免费视频 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 亚洲日韩av片在线观看 | 99久久久无码国产aaa精品 | 欧美 日韩 人妻 高清 中文 | 久久国产自偷自偷免费一区调 | 国产熟女一区二区三区四区五区 | 亚洲 a v无 码免 费 成 人 a v | 日本大香伊一区二区三区 | 日日躁夜夜躁狠狠躁 | 亚洲人亚洲人成电影网站色 | 国产无套内射久久久国产 | 国产精品亚洲五月天高清 | 人妻尝试又大又粗久久 | 久久久久av无码免费网 | 中文无码成人免费视频在线观看 | 男女猛烈xx00免费视频试看 | 国产麻豆精品一区二区三区v视界 | 亚洲 欧美 激情 小说 另类 | 亚洲色大成网站www | 无遮无挡爽爽免费视频 | 无码人妻丰满熟妇区五十路百度 | 高潮毛片无遮挡高清免费 | 综合网日日天干夜夜久久 | 久久久中文字幕日本无吗 | 无套内谢的新婚少妇国语播放 | 亚洲国产欧美日韩精品一区二区三区 | 人妻天天爽夜夜爽一区二区 | 天下第一社区视频www日本 | 少妇无码吹潮 | 免费观看又污又黄的网站 | ass日本丰满熟妇pics | 少妇人妻av毛片在线看 | 国产av久久久久精东av | 人妻有码中文字幕在线 | 自拍偷自拍亚洲精品10p | 无码帝国www无码专区色综合 | 蜜臀av在线观看 在线欧美精品一区二区三区 | 色窝窝无码一区二区三区色欲 | 久久伊人色av天堂九九小黄鸭 | 精品国产精品久久一区免费式 | www国产亚洲精品久久网站 | 国产婷婷色一区二区三区在线 | 老司机亚洲精品影院无码 | 欧美高清在线精品一区 | 国产在线一区二区三区四区五区 | 国产亚洲精品久久久久久国模美 | 国产熟妇另类久久久久 | 久久久久久亚洲精品a片成人 | 婷婷六月久久综合丁香 | 天堂亚洲免费视频 | 国产又爽又黄又刺激的视频 | 少妇高潮一区二区三区99 | 无码纯肉视频在线观看 | 亚洲性无码av中文字幕 | 色欲综合久久中文字幕网 | 澳门永久av免费网站 | 男女猛烈xx00免费视频试看 | 亚洲自偷精品视频自拍 | 欧美黑人乱大交 | 精品无码av一区二区三区 | 国产无遮挡又黄又爽又色 | 亚洲国产精品久久久天堂 | 精品国产一区av天美传媒 | aⅴ亚洲 日韩 色 图网站 播放 | 成人性做爰aaa片免费看不忠 | 精品乱子伦一区二区三区 | 999久久久国产精品消防器材 | 亚洲人成人无码网www国产 | 纯爱无遮挡h肉动漫在线播放 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 精品国产福利一区二区 | 欧美国产日韩亚洲中文 | 欧美日韩亚洲国产精品 | 色五月五月丁香亚洲综合网 | 国产精品毛多多水多 | 午夜无码人妻av大片色欲 | 波多野结衣一区二区三区av免费 | 精品日本一区二区三区在线观看 | a片免费视频在线观看 | a片免费视频在线观看 | 国产人成高清在线视频99最全资源 | 97久久精品无码一区二区 | 男女猛烈xx00免费视频试看 | 日韩成人一区二区三区在线观看 | 国产9 9在线 | 中文 | 97精品人妻一区二区三区香蕉 | 麻豆国产人妻欲求不满谁演的 | 亚洲人成网站免费播放 | 人人澡人人透人人爽 | 麻豆国产丝袜白领秘书在线观看 | 国产精品人妻一区二区三区四 | 狠狠色噜噜狠狠狠狠7777米奇 | 水蜜桃色314在线观看 | 又黄又爽又色的视频 | 亚洲一区二区观看播放 | 99麻豆久久久国产精品免费 | 国产精品自产拍在线观看 | 午夜精品久久久久久久 | 国产免费观看黄av片 | 国产成人无码a区在线观看视频app | 亚洲区小说区激情区图片区 | 亚洲爆乳精品无码一区二区三区 | 亚洲日本在线电影 | 内射白嫩少妇超碰 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 东京无码熟妇人妻av在线网址 | 欧美精品一区二区精品久久 | 久久久久久久人妻无码中文字幕爆 | 两性色午夜视频免费播放 | 一本色道久久综合狠狠躁 | 国产精品理论片在线观看 | 亚洲欧美国产精品久久 | 精品久久久久久人妻无码中文字幕 | 久在线观看福利视频 | 日韩欧美中文字幕公布 | 蜜臀aⅴ国产精品久久久国产老师 | 青草视频在线播放 | 久久精品丝袜高跟鞋 | 日本欧美一区二区三区乱码 | 乱中年女人伦av三区 | 精品亚洲韩国一区二区三区 | 久久人人爽人人爽人人片ⅴ | 蜜臀aⅴ国产精品久久久国产老师 | 亚洲日韩一区二区三区 | 老司机亚洲精品影院无码 | 少妇性俱乐部纵欲狂欢电影 | √天堂中文官网8在线 | 国产成人无码区免费内射一片色欲 | 伊人久久婷婷五月综合97色 | 97夜夜澡人人爽人人喊中国片 | 国产精品香蕉在线观看 | 久久精品国产一区二区三区肥胖 | 国产精品久久久久7777 | 亚洲日韩中文字幕在线播放 | 久久亚洲国产成人精品性色 | 呦交小u女精品视频 | 国精产品一区二区三区 | 丝袜人妻一区二区三区 | 免费人成在线视频无码 | 青青青爽视频在线观看 | 国产av一区二区三区最新精品 | 无码av岛国片在线播放 | 国产三级精品三级男人的天堂 | 久久精品国产日本波多野结衣 | 少妇高潮一区二区三区99 | 国产亚洲精品久久久久久大师 | 久久国产精品偷任你爽任你 | 国产又爽又猛又粗的视频a片 | 成人免费视频一区二区 | 久久久久久久久蜜桃 | 俺去俺来也在线www色官网 | 色噜噜亚洲男人的天堂 | 免费中文字幕日韩欧美 | 国产特级毛片aaaaaaa高清 | 久久久久免费看成人影片 | 久久久www成人免费毛片 | 熟妇女人妻丰满少妇中文字幕 | 色妞www精品免费视频 | 国产人妻人伦精品1国产丝袜 | 欧美熟妇另类久久久久久不卡 | 亚洲熟妇自偷自拍另类 | 欧美亚洲日韩国产人成在线播放 | 日韩精品乱码av一区二区 | 67194成是人免费无码 | 一本精品99久久精品77 | 成人精品一区二区三区中文字幕 | 午夜精品一区二区三区在线观看 | 啦啦啦www在线观看免费视频 | 亚洲精品综合一区二区三区在线 | 久久久av男人的天堂 | 久久久久亚洲精品中文字幕 | 久久人妻内射无码一区三区 | 欧美成人高清在线播放 | 日日天干夜夜狠狠爱 | 中文字幕乱码人妻无码久久 | 少女韩国电视剧在线观看完整 | 欧美野外疯狂做受xxxx高潮 | 性色av无码免费一区二区三区 | 久久亚洲国产成人精品性色 | 精品一区二区三区无码免费视频 | 精品久久久无码人妻字幂 | 亚洲精品国偷拍自产在线观看蜜桃 | 免费无码一区二区三区蜜桃大 | 爱做久久久久久 | 性欧美videos高清精品 | 国产乱人伦av在线无码 | 无码国模国产在线观看 | 鲁大师影院在线观看 | 国内少妇偷人精品视频 | 欧美兽交xxxx×视频 | 亚洲成a人片在线观看无码3d | 搡女人真爽免费视频大全 | 国内少妇偷人精品视频 | 伦伦影院午夜理论片 | 色综合久久网 | 国产美女极度色诱视频www | 天堂一区人妻无码 | 少妇高潮一区二区三区99 | 少妇性俱乐部纵欲狂欢电影 | 国产亚洲精品久久久久久国模美 | 亚洲国产精品一区二区美利坚 | 亚洲欧美精品伊人久久 | 捆绑白丝粉色jk震动捧喷白浆 | 亚洲日韩av片在线观看 | 亚洲色在线无码国产精品不卡 | 丝袜人妻一区二区三区 | 国产成人无码专区 | 377p欧洲日本亚洲大胆 | 人妻体内射精一区二区三四 | 女人被男人躁得好爽免费视频 | 久久精品成人欧美大片 | 欧美国产日韩亚洲中文 | 97精品国产97久久久久久免费 | 国产午夜亚洲精品不卡下载 | 性欧美疯狂xxxxbbbb | 国产精品国产自线拍免费软件 | 丝袜足控一区二区三区 | 中文精品无码中文字幕无码专区 | 精品久久久无码人妻字幂 | 中文字幕人妻无码一区二区三区 | 国产区女主播在线观看 | 中文字幕日韩精品一区二区三区 | 理论片87福利理论电影 | 欧美日韩人成综合在线播放 | 激情亚洲一区国产精品 | 国产精品对白交换视频 | 狠狠色丁香久久婷婷综合五月 | 亚洲一区二区三区在线观看网站 | 无码任你躁久久久久久久 | 国产精品亚洲а∨无码播放麻豆 | 18禁止看的免费污网站 | 少妇被粗大的猛进出69影院 | 男女猛烈xx00免费视频试看 | 久久久久免费看成人影片 | 水蜜桃色314在线观看 | 亚洲自偷自拍另类第1页 | 精品亚洲成av人在线观看 | 精品无人区无码乱码毛片国产 | 久久久久亚洲精品男人的天堂 | 中文字幕无码日韩专区 | 亚洲国产精品无码久久久久高潮 | 成人精品视频一区二区三区尤物 | 欧美性生交活xxxxxdddd | 人妻中文无码久热丝袜 | 欧洲欧美人成视频在线 | 88国产精品欧美一区二区三区 | 精品熟女少妇av免费观看 | 国产成人午夜福利在线播放 | ass日本丰满熟妇pics | 天堂а√在线地址中文在线 | 国产乱人偷精品人妻a片 | 精品一区二区三区波多野结衣 | 亚洲成a人片在线观看日本 | 国产成人一区二区三区别 | 波多野结衣av在线观看 | 曰韩少妇内射免费播放 | 夜精品a片一区二区三区无码白浆 | 狠狠噜狠狠狠狠丁香五月 | av无码电影一区二区三区 | 夜精品a片一区二区三区无码白浆 | 扒开双腿疯狂进出爽爽爽视频 | 国产 浪潮av性色四虎 | av在线亚洲欧洲日产一区二区 | 亚洲欧美国产精品久久 | 色五月丁香五月综合五月 | 99久久99久久免费精品蜜桃 | 欧美成人高清在线播放 | 亚洲国产欧美国产综合一区 | a国产一区二区免费入口 | 天天摸天天透天天添 | 亚洲欧洲中文日韩av乱码 | 日日摸夜夜摸狠狠摸婷婷 | 老熟女重囗味hdxx69 | 少妇一晚三次一区二区三区 | 国产疯狂伦交大片 | 一个人看的www免费视频在线观看 | 正在播放老肥熟妇露脸 | 高清不卡一区二区三区 | 亚洲精品国产第一综合99久久 | 国产69精品久久久久app下载 | 久久精品99久久香蕉国产色戒 | 久久久久久av无码免费看大片 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 激情国产av做激情国产爱 | 国产莉萝无码av在线播放 | 超碰97人人射妻 | 曰韩少妇内射免费播放 | 狠狠色噜噜狠狠狠7777奇米 | 中文字幕日产无线码一区 | 亚洲国产精品无码一区二区三区 | 日本精品人妻无码77777 天堂一区人妻无码 | 中国女人内谢69xxxx | 狠狠综合久久久久综合网 | 久久国产36精品色熟妇 | 野狼第一精品社区 | 亚洲精品午夜国产va久久成人 | 青青草原综合久久大伊人精品 | 熟妇人妻无乱码中文字幕 | 国产真人无遮挡作爱免费视频 | 精品一二三区久久aaa片 | 熟妇女人妻丰满少妇中文字幕 | 玩弄中年熟妇正在播放 | 亚洲色www成人永久网址 | 国产精品亚洲а∨无码播放麻豆 | 午夜丰满少妇性开放视频 | 好男人社区资源 | 国产精华av午夜在线观看 | 久久久精品456亚洲影院 | 高潮喷水的毛片 | 7777奇米四色成人眼影 | 日本一卡2卡3卡四卡精品网站 | 在线视频网站www色 | v一区无码内射国产 | 日韩 欧美 动漫 国产 制服 | 久久精品国产大片免费观看 | 中文亚洲成a人片在线观看 | 亚洲国产日韩a在线播放 | 无人区乱码一区二区三区 | 日日躁夜夜躁狠狠躁 | 日韩在线不卡免费视频一区 | 丁香花在线影院观看在线播放 | 国产综合色产在线精品 | 国产精品欧美成人 | 99国产欧美久久久精品 | 国产电影无码午夜在线播放 | 欧美性黑人极品hd | 国产色视频一区二区三区 | 性做久久久久久久久 | 性色欲网站人妻丰满中文久久不卡 | 国产精品爱久久久久久久 | 成人无码视频在线观看网站 | 亚洲中文字幕无码中文字在线 | 国产综合久久久久鬼色 | 亚洲精品一区二区三区在线 | 精品偷拍一区二区三区在线看 | 在线播放免费人成毛片乱码 | 欧洲熟妇精品视频 | 色一情一乱一伦一区二区三欧美 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 亚洲精品午夜无码电影网 | 一本加勒比波多野结衣 | 色婷婷香蕉在线一区二区 | 国产精品免费大片 | 国产又爽又猛又粗的视频a片 | 国产亚av手机在线观看 | 国产高潮视频在线观看 | 天干天干啦夜天干天2017 | 丰满人妻精品国产99aⅴ | 国产亲子乱弄免费视频 | 国产熟妇另类久久久久 | 亚洲日本va午夜在线电影 | 漂亮人妻洗澡被公强 日日躁 | 日韩人妻无码一区二区三区久久99 | 亚洲人成网站免费播放 | 偷窥村妇洗澡毛毛多 | а天堂中文在线官网 | 荫蒂添的好舒服视频囗交 | 色情久久久av熟女人妻网站 | 2020最新国产自产精品 | 亚洲一区二区三区偷拍女厕 | 精品aⅴ一区二区三区 | 色妞www精品免费视频 | 国产精品va在线播放 | 久久久婷婷五月亚洲97号色 | 无码国内精品人妻少妇 | 又大又紧又粉嫩18p少妇 | 色老头在线一区二区三区 | 成人亚洲精品久久久久软件 | 国产午夜福利100集发布 | 中文字幕av日韩精品一区二区 | 精品一区二区三区无码免费视频 | 高清无码午夜福利视频 | 成在人线av无码免观看麻豆 | 国产艳妇av在线观看果冻传媒 | 亚洲人亚洲人成电影网站色 | 国产偷抇久久精品a片69 | 少妇无码一区二区二三区 | 国产熟妇高潮叫床视频播放 | 国产网红无码精品视频 | 精品久久久久香蕉网 | 国产成人无码av片在线观看不卡 | 暴力强奷在线播放无码 | 丰满人妻一区二区三区免费视频 | ass日本丰满熟妇pics | 无码人妻av免费一区二区三区 | 亚洲国产精品久久久天堂 | 欧美阿v高清资源不卡在线播放 | 亚洲精品无码人妻无码 | 亚洲国产精品成人久久蜜臀 | 亚洲色成人中文字幕网站 | 国产无遮挡吃胸膜奶免费看 | 亚洲欧美中文字幕5发布 | 性欧美熟妇videofreesex | 欧美一区二区三区视频在线观看 | 日日摸夜夜摸狠狠摸婷婷 | 亚洲国产精品无码久久久久高潮 | 日韩成人一区二区三区在线观看 | 亚洲国产精品无码久久久久高潮 | 色欲久久久天天天综合网精品 | 亚洲一区二区三区四区 | 18禁黄网站男男禁片免费观看 | 亚洲国产一区二区三区在线观看 | 日本爽爽爽爽爽爽在线观看免 | 久久人人爽人人爽人人片av高清 | 国产极品美女高潮无套在线观看 | 成人精品视频一区二区三区尤物 | 久久国语露脸国产精品电影 | 精品乱码久久久久久久 | 国产午夜福利亚洲第一 | 久久精品成人欧美大片 | 久久综合九色综合97网 | 亚洲国产午夜精品理论片 | 漂亮人妻洗澡被公强 日日躁 | 久久精品一区二区三区四区 | 国产两女互慰高潮视频在线观看 | 无码毛片视频一区二区本码 | 东京热男人av天堂 | 露脸叫床粗话东北少妇 | 两性色午夜免费视频 | 国产精品99久久精品爆乳 | 亚洲色偷偷偷综合网 | 国产精品久久久久无码av色戒 | 又粗又大又硬毛片免费看 | 美女毛片一区二区三区四区 | 伊人色综合久久天天小片 | 日韩在线不卡免费视频一区 | 波多野结衣乳巨码无在线观看 | 俺去俺来也在线www色官网 | 亚洲中文字幕在线无码一区二区 | 人人超人人超碰超国产 | 亚洲呦女专区 | 任你躁在线精品免费 | 婷婷丁香六月激情综合啪 | 亚洲一区二区观看播放 | 国产美女极度色诱视频www | 在线欧美精品一区二区三区 | 女人高潮内射99精品 | 国产sm调教视频在线观看 | 1000部啪啪未满十八勿入下载 | 成人av无码一区二区三区 | 天天爽夜夜爽夜夜爽 | 国产精品自产拍在线观看 | 亚洲综合无码一区二区三区 | 久久无码人妻影院 | 国产精品久久精品三级 | 黑人巨大精品欧美一区二区 | 久久综合狠狠综合久久综合88 | 久久久久久久女国产乱让韩 | 欧美熟妇另类久久久久久多毛 | 久久精品中文字幕大胸 | 欧美亚洲国产一区二区三区 | 自拍偷自拍亚洲精品被多人伦好爽 | 久久这里只有精品视频9 | 乱人伦人妻中文字幕无码久久网 | 少妇性l交大片欧洲热妇乱xxx | 天堂а√在线地址中文在线 | 久久伊人色av天堂九九小黄鸭 | 亚洲人成网站色7799 | 国产精品无码一区二区三区不卡 | 亚洲日本va中文字幕 | 亚洲欧美日韩国产精品一区二区 | 乱人伦中文视频在线观看 | 亚洲啪av永久无码精品放毛片 | 99riav国产精品视频 | 乱码av麻豆丝袜熟女系列 | 国产亚洲精品久久久久久 | 亚洲の无码国产の无码步美 | 俄罗斯老熟妇色xxxx | 思思久久99热只有频精品66 | 亚洲aⅴ无码成人网站国产app | 亚洲精品国产第一综合99久久 | 好爽又高潮了毛片免费下载 | 欧美国产日韩久久mv | 中文字幕久久久久人妻 | 人妻与老人中文字幕 | 久久www免费人成人片 | 美女黄网站人色视频免费国产 | 国产一区二区三区影院 | 狠狠cao日日穞夜夜穞av | 日日躁夜夜躁狠狠躁 | 日韩成人一区二区三区在线观看 | 小sao货水好多真紧h无码视频 | 久久国产36精品色熟妇 | 欧美性生交xxxxx久久久 | 在线精品亚洲一区二区 | 国产成人无码av片在线观看不卡 | 国产偷自视频区视频 | 精品夜夜澡人妻无码av蜜桃 | 黄网在线观看免费网站 | 亚洲精品一区二区三区在线 | 亚洲欧美日韩成人高清在线一区 | 午夜熟女插插xx免费视频 | 欧美喷潮久久久xxxxx | 国产av久久久久精东av | 婷婷综合久久中文字幕蜜桃三电影 | 国产三级久久久精品麻豆三级 | 无码人妻精品一区二区三区下载 | 国产成人午夜福利在线播放 | 沈阳熟女露脸对白视频 | 国产精品99久久精品爆乳 | 国产精品久久久久久久9999 | 国产精品理论片在线观看 | 精品无码国产自产拍在线观看蜜 | 女高中生第一次破苞av | 婷婷六月久久综合丁香 | 日日碰狠狠躁久久躁蜜桃 | 国产精品久久久久无码av色戒 | 国产人妻大战黑人第1集 | 国产精品美女久久久网av | 未满小14洗澡无码视频网站 | 免费人成网站视频在线观看 | 日日麻批免费40分钟无码 | 99麻豆久久久国产精品免费 | 成人aaa片一区国产精品 | 爱做久久久久久 | 久久综合网欧美色妞网 | 99riav国产精品视频 | 欧美性黑人极品hd | 亚洲日本在线电影 | 国内精品久久久久久中文字幕 | 中国女人内谢69xxxxxa片 | 图片区 小说区 区 亚洲五月 | 97久久超碰中文字幕 | 特大黑人娇小亚洲女 | 无码一区二区三区在线 | 国产亚洲精品久久久ai换 | 欧美高清在线精品一区 | 在线播放无码字幕亚洲 | 国产成人午夜福利在线播放 | 曰韩无码二三区中文字幕 | 午夜无码区在线观看 | 永久黄网站色视频免费直播 | 99久久久国产精品无码免费 | 日日噜噜噜噜夜夜爽亚洲精品 | 精品亚洲成av人在线观看 | 永久免费观看美女裸体的网站 | 国产内射老熟女aaaa | 国产国产精品人在线视 | 无遮无挡爽爽免费视频 | 国内综合精品午夜久久资源 | 亚洲区小说区激情区图片区 | 国产精品亚洲lv粉色 | 天堂无码人妻精品一区二区三区 | 自拍偷自拍亚洲精品被多人伦好爽 | 扒开双腿吃奶呻吟做受视频 | 国产性生交xxxxx无码 | 动漫av一区二区在线观看 | 东京热男人av天堂 | 久久无码专区国产精品s | 国产成人久久精品流白浆 | 狠狠躁日日躁夜夜躁2020 | 国产精品久久久 | 一本久久a久久精品亚洲 | 欧美一区二区三区 | 亚洲中文字幕av在天堂 | 亚洲中文字幕无码中文字在线 | 中文字幕av日韩精品一区二区 | 国产成人无码av片在线观看不卡 | 又大又紧又粉嫩18p少妇 | 国产亚洲tv在线观看 | 色欲久久久天天天综合网精品 | 色综合久久久久综合一本到桃花网 | 曰韩无码二三区中文字幕 | 欧美阿v高清资源不卡在线播放 | 一区二区传媒有限公司 | 中文字幕av伊人av无码av | 人妻互换免费中文字幕 | 亚洲国产精品久久久久久 | 色婷婷久久一区二区三区麻豆 | 精品国产一区二区三区四区在线看 | 久久国产精品偷任你爽任你 | 欧美激情一区二区三区成人 | 国产av剧情md精品麻豆 | yw尤物av无码国产在线观看 | 午夜精品一区二区三区的区别 | 欧美freesex黑人又粗又大 | 亚洲欧美国产精品专区久久 | 色妞www精品免费视频 | 色综合天天综合狠狠爱 | 亚无码乱人伦一区二区 | 99久久人妻精品免费一区 | 一二三四在线观看免费视频 | 成年女人永久免费看片 | 亚洲色www成人永久网址 | 午夜不卡av免费 一本久久a久久精品vr综合 | 午夜精品久久久久久久 | 国产无套内射久久久国产 | 日本一卡2卡3卡四卡精品网站 | 东京热一精品无码av | 亚洲成av人在线观看网址 | 荫蒂被男人添的好舒服爽免费视频 | 亚洲精品久久久久avwww潮水 | 亚洲一区二区三区在线观看网站 | 亚洲精品无码国产 | 乱人伦中文视频在线观看 | 免费无码的av片在线观看 | 男女爱爱好爽视频免费看 | 国产精品永久免费视频 | 中文字幕乱妇无码av在线 | 亚洲欧美精品aaaaaa片 | 色诱久久久久综合网ywww | 成人免费视频视频在线观看 免费 | 女人被爽到呻吟gif动态图视看 | 欧洲极品少妇 | 精品国精品国产自在久国产87 | 自拍偷自拍亚洲精品被多人伦好爽 | 久久天天躁狠狠躁夜夜免费观看 | 国内少妇偷人精品视频 | 国精产品一品二品国精品69xx | 亚洲欧美国产精品久久 | 中文毛片无遮挡高清免费 | 欧美丰满老熟妇xxxxx性 | 国产乱人伦偷精品视频 | 中文字幕+乱码+中文字幕一区 | 欧美一区二区三区视频在线观看 | 一本久道高清无码视频 | 男女猛烈xx00免费视频试看 | 蜜桃臀无码内射一区二区三区 | 国产精品理论片在线观看 | 久久国产精品萌白酱免费 | 好男人www社区 | 亚洲日韩av一区二区三区四区 | 精品国偷自产在线视频 | 无码人妻av免费一区二区三区 | 久久伊人色av天堂九九小黄鸭 | 国产免费久久精品国产传媒 | 女人被男人爽到呻吟的视频 | 狠狠色噜噜狠狠狠7777奇米 | 亚洲欧洲日本无在线码 | 人妻aⅴ无码一区二区三区 | 天天拍夜夜添久久精品大 | 人妻有码中文字幕在线 | 欧美亚洲国产一区二区三区 | 国产精品嫩草久久久久 | 大胆欧美熟妇xx | 高潮毛片无遮挡高清免费视频 | 日韩精品a片一区二区三区妖精 | 国产真人无遮挡作爱免费视频 | 亚洲精品国偷拍自产在线观看蜜桃 | 少妇人妻偷人精品无码视频 | 日韩精品无码免费一区二区三区 | 久久久久久av无码免费看大片 | 午夜精品一区二区三区的区别 | 日本一区二区三区免费高清 | 精品无人区无码乱码毛片国产 | 久久久久人妻一区精品色欧美 | 亚洲国产av精品一区二区蜜芽 | 欧美成人午夜精品久久久 | 人人妻人人澡人人爽精品欧美 | 亚洲天堂2017无码中文 | 麻豆精品国产精华精华液好用吗 | 国产手机在线αⅴ片无码观看 | 性欧美熟妇videofreesex | 日韩精品a片一区二区三区妖精 | 无套内谢老熟女 | 国产热a欧美热a在线视频 | 中文字幕无码av波多野吉衣 | 又大又硬又黄的免费视频 | 300部国产真实乱 | 荫蒂添的好舒服视频囗交 | 桃花色综合影院 | 少妇性l交大片 | 天天做天天爱天天爽综合网 | 无码人中文字幕 | 色综合视频一区二区三区 | 一个人看的www免费视频在线观看 | 波多野42部无码喷潮在线 | 性欧美牲交在线视频 | 精品一区二区不卡无码av | 一个人看的视频www在线 | 久久无码中文字幕免费影院蜜桃 | 67194成是人免费无码 | 国产综合色产在线精品 | 少妇太爽了在线观看 | 精品一二三区久久aaa片 | 久久综合网欧美色妞网 | 亚洲熟女一区二区三区 | 亚洲狠狠婷婷综合久久 | 国产在热线精品视频 | 午夜成人1000部免费视频 | 亚洲gv猛男gv无码男同 | 日本精品高清一区二区 | 特大黑人娇小亚洲女 | 国内精品人妻无码久久久影院蜜桃 | 国产无套内射久久久国产 | 亚洲综合在线一区二区三区 | 欧美性猛交xxxx富婆 | 精品久久久久香蕉网 | 亚洲精品www久久久 | 亚洲色www成人永久网址 | 人人爽人人澡人人高潮 | 国产欧美亚洲精品a | 内射爽无广熟女亚洲 | 成在人线av无码免费 | 精品国精品国产自在久国产87 | 亚洲人亚洲人成电影网站色 | 中文字幕人妻无码一区二区三区 | 亚洲啪av永久无码精品放毛片 | 婷婷五月综合缴情在线视频 | 国产精品久久国产精品99 | 97资源共享在线视频 | 成人免费无码大片a毛片 | 三上悠亚人妻中文字幕在线 | 综合人妻久久一区二区精品 | 日本熟妇人妻xxxxx人hd | 色综合久久久无码网中文 | 内射欧美老妇wbb | 又大又紧又粉嫩18p少妇 | 麻豆av传媒蜜桃天美传媒 | 亚洲理论电影在线观看 | 亚洲色成人中文字幕网站 | 中文字幕乱码人妻二区三区 | 中文字幕 亚洲精品 第1页 | 免费看男女做好爽好硬视频 | 久久99精品久久久久婷婷 | 日本一本二本三区免费 | 一本精品99久久精品77 | 日韩精品乱码av一区二区 | 国产两女互慰高潮视频在线观看 | 亚洲国产一区二区三区在线观看 | 无码人妻丰满熟妇区五十路百度 | 亚洲一区二区三区四区 | 天堂а√在线地址中文在线 | 97se亚洲精品一区 | 欧美阿v高清资源不卡在线播放 | 人妻少妇精品久久 | 高潮毛片无遮挡高清免费视频 | 精品日本一区二区三区在线观看 | 国产人妻大战黑人第1集 | 久久熟妇人妻午夜寂寞影院 | 国产极品美女高潮无套在线观看 | 亚洲精品国产a久久久久久 | 六月丁香婷婷色狠狠久久 | 精品久久久中文字幕人妻 | 天堂亚洲2017在线观看 | 久久久久久久久888 | 激情内射日本一区二区三区 | 夜精品a片一区二区三区无码白浆 | 亚洲精品成人av在线 | 久久伊人色av天堂九九小黄鸭 | 5858s亚洲色大成网站www | 97人妻精品一区二区三区 | 最近的中文字幕在线看视频 | 东京一本一道一二三区 | 日本大乳高潮视频在线观看 | 大肉大捧一进一出视频出来呀 | 欧美丰满老熟妇xxxxx性 | 欧美人与物videos另类 | 最近的中文字幕在线看视频 | 亚洲欧美中文字幕5发布 | 成人片黄网站色大片免费观看 | 性欧美大战久久久久久久 | v一区无码内射国产 | 天天摸天天碰天天添 | 狠狠色欧美亚洲狠狠色www | 学生妹亚洲一区二区 | 国产精品丝袜黑色高跟鞋 | a片在线免费观看 | 国产精品va在线播放 | 午夜免费福利小电影 | 性欧美熟妇videofreesex | 精品夜夜澡人妻无码av蜜桃 | 国产明星裸体无码xxxx视频 | 亲嘴扒胸摸屁股激烈网站 | 亚洲欧美日韩国产精品一区二区 | 久久久久亚洲精品中文字幕 | 精品人人妻人人澡人人爽人人 | 亚洲大尺度无码无码专区 | 野外少妇愉情中文字幕 | 在线观看国产午夜福利片 | 在线精品国产一区二区三区 | 亚洲天堂2017无码 | 国产口爆吞精在线视频 | 无码帝国www无码专区色综合 | 日韩精品乱码av一区二区 | 十八禁真人啪啪免费网站 | 国产后入清纯学生妹 | www国产亚洲精品久久网站 | 亚洲自偷自偷在线制服 | 国产成人无码av片在线观看不卡 | 精品无码国产一区二区三区av | 国产一精品一av一免费 | 99er热精品视频 | 国产特级毛片aaaaaa高潮流水 | 亚洲熟熟妇xxxx | 欧美精品免费观看二区 | 乌克兰少妇性做爰 | av无码久久久久不卡免费网站 | 成人精品视频一区二区三区尤物 | 日本乱人伦片中文三区 | 色婷婷av一区二区三区之红樱桃 | 国产精品免费大片 | 丰满少妇弄高潮了www | 一本无码人妻在中文字幕免费 | 国产成人无码区免费内射一片色欲 | 国产午夜视频在线观看 | 久久久久se色偷偷亚洲精品av | 日韩欧美群交p片內射中文 | 撕开奶罩揉吮奶头视频 | 精品国产一区二区三区四区 | 77777熟女视频在线观看 а天堂中文在线官网 | 精品欧美一区二区三区久久久 | 欧美老熟妇乱xxxxx | 色综合久久久无码中文字幕 | 成人精品视频一区二区 | 亚洲欧洲中文日韩av乱码 | 亚洲综合精品香蕉久久网 | 国产激情一区二区三区 | 又湿又紧又大又爽a视频国产 | 欧美第一黄网免费网站 | 亚洲精品久久久久avwww潮水 | 国产成人无码区免费内射一片色欲 | 4hu四虎永久在线观看 | 76少妇精品导航 | 娇妻被黑人粗大高潮白浆 | 欧美丰满老熟妇xxxxx性 | 999久久久国产精品消防器材 | 我要看www免费看插插视频 | 日本精品人妻无码77777 天堂一区人妻无码 | 亚洲精品国偷拍自产在线观看蜜桃 | 国产av一区二区三区最新精品 | 国产人妻人伦精品1国产丝袜 | 国产精品久久久av久久久 | 日韩人妻少妇一区二区三区 | 男人的天堂av网站 | 国产成人亚洲综合无码 | 国产精品va在线播放 | 欧美性生交xxxxx久久久 | 日韩人妻无码一区二区三区久久99 | 中文字幕乱码亚洲无线三区 | 成人性做爰aaa片免费看不忠 | 国产熟妇另类久久久久 | 亚洲日韩av一区二区三区四区 | 亚洲人交乣女bbw | 思思久久99热只有频精品66 | 亚洲精品久久久久久久久久久 | 无码纯肉视频在线观看 | 久久无码专区国产精品s | 野外少妇愉情中文字幕 | 野狼第一精品社区 | 亚洲国产欧美日韩精品一区二区三区 | 中国大陆精品视频xxxx | 大乳丰满人妻中文字幕日本 | 久久无码专区国产精品s | 久久99精品国产.久久久久 | 久久精品女人天堂av免费观看 | 国产精品久久久久无码av色戒 | 亚洲成a人片在线观看无码 | 国产成人精品三级麻豆 | 内射爽无广熟女亚洲 | 亚洲精品一区三区三区在线观看 | 精品熟女少妇av免费观看 | 国产午夜视频在线观看 | 国产亚洲视频中文字幕97精品 | 女人被男人躁得好爽免费视频 | 欧美高清在线精品一区 | 国产激情综合五月久久 | 撕开奶罩揉吮奶头视频 | 亚洲精品综合一区二区三区在线 | 99久久无码一区人妻 | 久久无码中文字幕免费影院蜜桃 | 男女猛烈xx00免费视频试看 | 亚洲色成人中文字幕网站 | 无码吃奶揉捏奶头高潮视频 | 国精品人妻无码一区二区三区蜜柚 | 亚洲人亚洲人成电影网站色 | aa片在线观看视频在线播放 | 久久精品一区二区三区四区 | 无码福利日韩神码福利片 | 一本久道久久综合婷婷五月 | 亚洲无人区午夜福利码高清完整版 | 蜜桃无码一区二区三区 | 天海翼激烈高潮到腰振不止 | 国产精品国产三级国产专播 | 国产精品99爱免费视频 | 国产精品嫩草久久久久 | 亚洲第一无码av无码专区 | 少妇愉情理伦片bd | 乱人伦中文视频在线观看 | 久久久成人毛片无码 | 国产人成高清在线视频99最全资源 | 男女爱爱好爽视频免费看 | 在线a亚洲视频播放在线观看 | 丰满肥臀大屁股熟妇激情视频 | 欧美亚洲日韩国产人成在线播放 | 日韩人妻无码中文字幕视频 | 欧美 亚洲 国产 另类 | 国色天香社区在线视频 | 午夜精品久久久久久久久 | 又粗又大又硬毛片免费看 | 亚洲爆乳精品无码一区二区三区 | 亚洲一区二区三区香蕉 | 欧美日韩视频无码一区二区三 | 久久五月精品中文字幕 | 四虎影视成人永久免费观看视频 | 国产亲子乱弄免费视频 | 亚洲の无码国产の无码影院 | 国内少妇偷人精品视频 | 内射老妇bbwx0c0ck | 国产亲子乱弄免费视频 | 久久精品国产99久久6动漫 | 色五月五月丁香亚洲综合网 | 欧美日韩在线亚洲综合国产人 | 国产亲子乱弄免费视频 | 久久综合给久久狠狠97色 | 亚洲色在线无码国产精品不卡 | 四虎国产精品一区二区 | 精品国产乱码久久久久乱码 | 亚洲成a人一区二区三区 | 国产色精品久久人妻 | 亚洲乱码日产精品bd | 女人被爽到呻吟gif动态图视看 | 任你躁国产自任一区二区三区 | 人妻熟女一区 | 国产艳妇av在线观看果冻传媒 | 成人欧美一区二区三区黑人免费 | 老子影院午夜精品无码 | 东京热一精品无码av | 久久国语露脸国产精品电影 | 激情五月综合色婷婷一区二区 | 99久久人妻精品免费二区 | 欧美zoozzooz性欧美 | 亚洲人成网站色7799 | 欧美freesex黑人又粗又大 | 精品水蜜桃久久久久久久 | 国产97色在线 | 免 | 97无码免费人妻超级碰碰夜夜 | 免费视频欧美无人区码 | 无套内谢老熟女 | 国产97人人超碰caoprom | 无码av免费一区二区三区试看 | 成熟妇人a片免费看网站 | 色欲综合久久中文字幕网 | 欧美三级a做爰在线观看 | 久久精品中文字幕一区 | 四虎永久在线精品免费网址 | 亚洲午夜福利在线观看 | 丰满人妻被黑人猛烈进入 | 久久亚洲a片com人成 | 亚洲国产精品毛片av不卡在线 | 国产成人久久精品流白浆 | 国模大胆一区二区三区 | 色狠狠av一区二区三区 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 亚洲精品国产精品乱码不卡 | 一二三四在线观看免费视频 | 日本护士xxxxhd少妇 | 美女毛片一区二区三区四区 | 无人区乱码一区二区三区 | 亚洲一区二区三区四区 | 四虎国产精品免费久久 | 97久久精品无码一区二区 | 国产超碰人人爽人人做人人添 | 丰满少妇女裸体bbw | 色一情一乱一伦一区二区三欧美 | 日日摸日日碰夜夜爽av | 人妻aⅴ无码一区二区三区 | 精品水蜜桃久久久久久久 | 久久精品人妻少妇一区二区三区 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 最新国产乱人伦偷精品免费网站 | 国产精品高潮呻吟av久久 | 波多野42部无码喷潮在线 | 狠狠cao日日穞夜夜穞av | 国产成人无码av片在线观看不卡 | 国产精品人人妻人人爽 | 免费无码午夜福利片69 | 久久精品国产一区二区三区 | 99麻豆久久久国产精品免费 | 九九综合va免费看 | 狠狠色色综合网站 | 欧美自拍另类欧美综合图片区 | 中文字幕av无码一区二区三区电影 | 久久综合给合久久狠狠狠97色 | 久久99精品国产.久久久久 | 99riav国产精品视频 | 正在播放东北夫妻内射 | а天堂中文在线官网 | 色一情一乱一伦一区二区三欧美 | 天天拍夜夜添久久精品 | 在线天堂新版最新版在线8 | 免费无码的av片在线观看 | 亚欧洲精品在线视频免费观看 | 久久精品国产精品国产精品污 | 牲交欧美兽交欧美 | 欧美 日韩 亚洲 在线 | 四虎国产精品一区二区 | 两性色午夜视频免费播放 | 在线播放亚洲第一字幕 | 97夜夜澡人人爽人人喊中国片 | 精品人人妻人人澡人人爽人人 | 亚洲国产精品成人久久蜜臀 | 亚洲伊人久久精品影院 | aa片在线观看视频在线播放 | 国产极品美女高潮无套在线观看 | 亚洲一区二区三区香蕉 | 人人妻人人澡人人爽欧美一区九九 | 宝宝好涨水快流出来免费视频 | 久久99精品国产麻豆蜜芽 | 亚洲自偷精品视频自拍 | 噜噜噜亚洲色成人网站 | 奇米影视7777久久精品 | 亚洲成色在线综合网站 | 久久久久亚洲精品男人的天堂 | 国产亚洲日韩欧美另类第八页 | 无码成人精品区在线观看 | 自拍偷自拍亚洲精品被多人伦好爽 | 国产精品亚洲а∨无码播放麻豆 | 色窝窝无码一区二区三区色欲 | 98国产精品综合一区二区三区 | 2020最新国产自产精品 | 国产高清av在线播放 | 久久国产精品精品国产色婷婷 | 人人爽人人澡人人高潮 | 欧美丰满少妇xxxx性 | 亚洲色大成网站www国产 | 人妻体内射精一区二区三四 | 日韩av无码一区二区三区 | 亚欧洲精品在线视频免费观看 | 99久久精品无码一区二区毛片 | 亚洲人成影院在线观看 | 大肉大捧一进一出视频出来呀 | 岛国片人妻三上悠亚 | 18黄暴禁片在线观看 | 久久久久99精品成人片 | 一个人看的www免费视频在线观看 | 亚洲欧洲中文日韩av乱码 | 成人影院yy111111在线观看 | 国产xxx69麻豆国语对白 | 午夜福利一区二区三区在线观看 | 麻豆国产人妻欲求不满谁演的 | 色欲人妻aaaaaaa无码 | 精品国产一区二区三区四区 | 精品国产一区二区三区四区 | 欧美黑人乱大交 | 亚洲一区二区三区播放 | 无码人妻丰满熟妇区毛片18 | 高潮毛片无遮挡高清免费视频 | 欧美色就是色 | 色五月五月丁香亚洲综合网 | 久久人人爽人人爽人人片ⅴ | 一个人看的视频www在线 | 成人动漫在线观看 | 欧美精品一区二区精品久久 | 2020久久香蕉国产线看观看 | 国产精品18久久久久久麻辣 | 久久国产精品萌白酱免费 | 亚洲 高清 成人 动漫 | 久久国产自偷自偷免费一区调 | 自拍偷自拍亚洲精品10p | 伊在人天堂亚洲香蕉精品区 | 骚片av蜜桃精品一区 | 国内精品人妻无码久久久影院 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 婷婷六月久久综合丁香 | 国产午夜手机精彩视频 | 国产精品沙发午睡系列 | 强奷人妻日本中文字幕 | 亚洲精品美女久久久久久久 | 中文亚洲成a人片在线观看 | 色婷婷综合中文久久一本 | 亚洲第一无码av无码专区 | 中文字幕乱码人妻二区三区 | 亚洲码国产精品高潮在线 | 国产精品丝袜黑色高跟鞋 | 国产九九九九九九九a片 | 精品乱码久久久久久久 | 国产精品美女久久久网av | 性色欲情网站iwww九文堂 | 中文字幕日韩精品一区二区三区 | 久久99精品久久久久久动态图 | 色综合久久久无码中文字幕 | 宝宝好涨水快流出来免费视频 | 高清无码午夜福利视频 | 曰韩无码二三区中文字幕 | 国产精品igao视频网 | 美女黄网站人色视频免费国产 | 亚洲大尺度无码无码专区 | 精品国产麻豆免费人成网站 | www国产精品内射老师 | 久久精品视频在线看15 | 粉嫩少妇内射浓精videos | 成在人线av无码免观看麻豆 | 性欧美大战久久久久久久 | 激情国产av做激情国产爱 | 国产人妻精品一区二区三区不卡 | 亚洲伊人久久精品影院 | 日本高清一区免费中文视频 | 国内精品久久毛片一区二区 | 中文字幕乱妇无码av在线 | 国产免费无码一区二区视频 | 天天拍夜夜添久久精品大 | 荡女精品导航 | 日日夜夜撸啊撸 | 狠狠色噜噜狠狠狠狠7777米奇 | 久久久中文久久久无码 | 99er热精品视频 | 少妇一晚三次一区二区三区 | 亚洲人成人无码网www国产 | 小泽玛莉亚一区二区视频在线 | 一本大道久久东京热无码av | 国产亚洲日韩欧美另类第八页 | 高潮毛片无遮挡高清免费视频 | 无遮挡啪啪摇乳动态图 | 亚洲中文字幕乱码av波多ji | 久久亚洲中文字幕无码 | 人妻aⅴ无码一区二区三区 | 国产午夜视频在线观看 | 久久国内精品自在自线 | 亚洲熟妇色xxxxx欧美老妇 | 一个人免费观看的www视频 | 亚洲一区二区三区无码久久 | 国产av无码专区亚洲awww | 少妇厨房愉情理9仑片视频 | 成人性做爰aaa片免费看 | 国产精品丝袜黑色高跟鞋 | 无码国产乱人伦偷精品视频 | 久久亚洲中文字幕精品一区 | 国产成人精品优优av | 国产9 9在线 | 中文 | 中文字幕无码人妻少妇免费 | 国产婷婷色一区二区三区在线 | 国产精品无套呻吟在线 | 成人精品一区二区三区中文字幕 | 亚洲日韩av一区二区三区中文 | 午夜福利一区二区三区在线观看 | 午夜精品久久久久久久久 | 亚洲欧美日韩成人高清在线一区 | 狠狠色噜噜狠狠狠7777奇米 | 中文字幕+乱码+中文字幕一区 | 免费无码午夜福利片69 | 六月丁香婷婷色狠狠久久 | 少妇被粗大的猛进出69影院 | 中文无码精品a∨在线观看不卡 | 黑人大群体交免费视频 | 亚洲第一无码av无码专区 | 水蜜桃亚洲一二三四在线 | 亚洲色欲久久久综合网东京热 | 乌克兰少妇xxxx做受 | 嫩b人妻精品一区二区三区 | 国产乱人伦av在线无码 | 精品偷自拍另类在线观看 | 超碰97人人射妻 | 狠狠色噜噜狠狠狠狠7777米奇 | 日本熟妇人妻xxxxx人hd | 国产精品久久国产精品99 | 亚洲精品中文字幕乱码 | 亚洲综合无码一区二区三区 | 日韩欧美中文字幕公布 | 国产精品高潮呻吟av久久4虎 | 欧美xxxx黑人又粗又长 | 欧美自拍另类欧美综合图片区 | 人人澡人人透人人爽 | 野狼第一精品社区 | 人妻有码中文字幕在线 | 精品国偷自产在线 | 亚洲精品国产第一综合99久久 | 亚洲熟熟妇xxxx | 国产婷婷色一区二区三区在线 | 久久久久久久人妻无码中文字幕爆 | 国产精品永久免费视频 | 国产黄在线观看免费观看不卡 | 久久精品中文字幕大胸 | 亚洲色欲久久久综合网东京热 | 天天拍夜夜添久久精品大 | 日本又色又爽又黄的a片18禁 | 性做久久久久久久免费看 | а√资源新版在线天堂 | 亚洲啪av永久无码精品放毛片 | 亚洲国产高清在线观看视频 | 99久久精品午夜一区二区 | 无遮挡啪啪摇乳动态图 | 亚洲一区av无码专区在线观看 | 国产在线精品一区二区三区直播 | 欧美日韩一区二区三区自拍 | 亚洲s码欧洲m码国产av | 内射老妇bbwx0c0ck | 国产9 9在线 | 中文 | 亚洲日韩乱码中文无码蜜桃臀网站 | 国产三级久久久精品麻豆三级 | 精品一二三区久久aaa片 | 又大又紧又粉嫩18p少妇 | 日日天日日夜日日摸 | 日韩精品a片一区二区三区妖精 | 精品人妻人人做人人爽 | 无码av中文字幕免费放 | 亚洲男人av香蕉爽爽爽爽 | 中文字幕无码视频专区 | 少妇激情av一区二区 | 亚洲人成网站在线播放942 | 麻豆国产人妻欲求不满 | 亚洲男人av天堂午夜在 | 国产免费久久精品国产传媒 | 日日碰狠狠躁久久躁蜜桃 | 西西人体www44rt大胆高清 | 久激情内射婷内射蜜桃人妖 | 性生交大片免费看女人按摩摩 | 久青草影院在线观看国产 | 色一情一乱一伦一区二区三欧美 | 国产 精品 自在自线 | 少女韩国电视剧在线观看完整 | 久久久精品成人免费观看 | 蜜桃视频韩日免费播放 | 国产人妻久久精品二区三区老狼 | 国产成人精品视频ⅴa片软件竹菊 | 无码av岛国片在线播放 | 日本一区二区三区免费高清 | 国产女主播喷水视频在线观看 | 牛和人交xxxx欧美 | 丰满妇女强制高潮18xxxx | 99er热精品视频 | 国产黄在线观看免费观看不卡 | 免费人成在线视频无码 | 亚洲精品久久久久久久久久久 | 人妻无码αv中文字幕久久琪琪布 | 国产肉丝袜在线观看 | 粗大的内捧猛烈进出视频 | √天堂中文官网8在线 | 高潮毛片无遮挡高清免费视频 | 久久久www成人免费毛片 | 亚洲日本va午夜在线电影 | 欧美日本精品一区二区三区 | 美女扒开屁股让男人桶 | 国产人成高清在线视频99最全资源 | 免费无码肉片在线观看 | 熟女体下毛毛黑森林 | 中文亚洲成a人片在线观看 | 学生妹亚洲一区二区 | 久久99精品久久久久久动态图 | 亚洲精品国偷拍自产在线麻豆 | 国产又爽又黄又刺激的视频 | 亚洲 日韩 欧美 成人 在线观看 | 久久国产精品二国产精品 | 成熟妇人a片免费看网站 | 小泽玛莉亚一区二区视频在线 | 久热国产vs视频在线观看 | 青春草在线视频免费观看 | 久久99精品久久久久久动态图 | 无码精品人妻一区二区三区av | 国产亚洲欧美日韩亚洲中文色 | 中文字幕无码免费久久9一区9 | 亚洲日韩av片在线观看 | 伊人久久大香线蕉亚洲 | 中文字幕+乱码+中文字幕一区 | 亚洲日韩乱码中文无码蜜桃臀网站 | 性啪啪chinese东北女人 | 欧美日韩久久久精品a片 | 国产香蕉尹人视频在线 | 亚洲国产欧美国产综合一区 | 亚洲狠狠色丁香婷婷综合 | 麻豆果冻传媒2021精品传媒一区下载 | 野外少妇愉情中文字幕 | 欧美日韩在线亚洲综合国产人 | 香港三级日本三级妇三级 | 国内少妇偷人精品视频免费 | 天天躁日日躁狠狠躁免费麻豆 | 男女爱爱好爽视频免费看 | 伊人色综合久久天天小片 | 黑人巨大精品欧美黑寡妇 | 中文字幕人妻无码一区二区三区 | 久久午夜无码鲁丝片秋霞 | 精品欧洲av无码一区二区三区 | 国内精品久久毛片一区二区 | 国产综合在线观看 | 小sao货水好多真紧h无码视频 | 国产亚洲精品久久久ai换 | 国产香蕉尹人视频在线 | 中文字幕人成乱码熟女app |