压箱底总结:流系统端到端一致性对比
點擊上方“朱小廝的博客”,選擇“設(shè)為星標”
回復”666“獲取公眾號專屬資料
分布式最難的2個問題:
1、Exactly Once Message processing。
2、保證消息處理順序。
我們今天著重來討論一下:
為什么很難;
怎么解。
前言
這篇文章可以說是作者壓箱底兒的知識總結(jié)(之一,畢竟作者學的東西很雜 ╮( ̄▽ ̄"")╭ )了,斷斷續(xù)續(xù)寫了將近三個月, 耗費了大量的精力。
本來的目的只是想對比一下各個state of art的流系統(tǒng)有什么不同, 但是寫出來之后只是亂糟糟的羅列數(shù)據(jù)和資料,像這樣列數(shù)據(jù)一樣,“介紹下這個framework這樣實現(xiàn)的,所以有這樣的特性”,“那個是那樣的”.... blablabla... 使得文章只停留于表層,這樣寫并不是一個好的筆記。
我想記錄的是更本質(zhì)和更精髓的一些東西:我想更深入的探討一個分布式系統(tǒng)的設(shè)計是被什么現(xiàn)實和本質(zhì)問題所逼迫的結(jié)果, 2個不同的設(shè)計到底是在哪個本質(zhì)問題上分道揚鑣才造成了系統(tǒng)設(shè)計如此的不同。
追尋問題和解法的前因后果,讓未來的自己一眼掃過去之后能夠自然而然的回憶和理解出: "嗯,對,在這種現(xiàn)實限制下,要達到這種效果就必須這么做", 我想從亂糟糟的觀察到的混亂表象中,抽象出流系統(tǒng)所面對的問題和解決方案的本質(zhì),這就是本文的目的,和文章中除開引用文獻之外,作者所貢獻的一些自己的思考。
就作者學習流系統(tǒng)的感受來看, 流系統(tǒng)有2個難點:
第一是end to end consistency,或者說exactly once msg processing;
第二是event time based window操作。
本來作者想用一篇文章同時概括和比較這2點,無奈第一點寫完, 文章已經(jīng)長度爆炸。于是分開2篇,此為上篇, 著重于從分布式系統(tǒng)的本質(zhì)問題出發(fā), 從最底層的各種"不可能", 和它們的解(比如:consensus協(xié)議)開始, 一層一層的遞進到高層的流系統(tǒng)中, 如何實現(xiàn)容錯場景下的end to end consistency,或者說exactly once msg processing。
目錄
流系統(tǒng)的具體對比在“9.流系統(tǒng)的EOMP”這一節(jié), 前邊都是準備知識... -_-
1、一些術(shù)語
2、圣光(廣告)不會告訴你的事
3、幾個事實
4、Liveness和Safety的取舍
5、絕望中的曙光
6、Zombie Fencing
7、三節(jié)點間的EOMP
8、加入節(jié)點狀態(tài)的三節(jié)點間的EOMP
9、流系統(tǒng)的EOMP
10、異步增量checkpointing
11、系統(tǒng)內(nèi)與系統(tǒng)外
12、Latency, 冪等和non-deterministic
13、REFERENCE
一些術(shù)語
1、端到端一致性end to end Consistency
一致性其實就是業(yè)務(wù)正確性, 在不同的業(yè)務(wù)場景有不同的意思, 在"流系統(tǒng)中間件"這個業(yè)務(wù)領(lǐng)域, 端到端的一致性就代表Exact once msg processing, 一個消息只被處理一次,造成一次效果。
注意: 這里的"一個消息"代表"邏輯上的一個", 即application對中間件的期待就是把此消息作為一個來處理, 而不是指消息本身的值相等。比如要求計數(shù)+1的一個消息, 消息本身的內(nèi)容可能一模一樣, 但是application發(fā)來2次相同消息的"本意"就是要計數(shù)兩次, 那么中間件就應(yīng)該處理兩次, 如果application由于超時重發(fā)了本意只想讓中間件處理一次的+1操作, 那么中間件就應(yīng)該處理一次。
中間件怎么能區(qū)分application的"本意"來決策到底處理一次還是多次, 是end to end consistency的關(guān)鍵。
2、EOMP
由于Exactly once msg processing太經(jīng)常出現(xiàn), 我們用EOMP來代替簡寫一下。
3、容錯failure tolerance
為了方便討論,后邊談到failure, 我們指的都是crash failure, 你可以想象是任何可以造成“把機器砸了然后任何本地狀態(tài)丟失(比如硬盤損壞)一樣效果的情況出現(xiàn)"。
在今天的虛擬云時代,這其實很常見,比如container或者虛擬機被resource manager突然kill掉回收了, 那么即使物理機其實沒有問題, 你的application的邏輯節(jié)點也是被完全銷毀的樣子。
容錯在end to end Consistency的語義下,是指在機器掛了,網(wǎng)絡(luò)鏈接斷開...等情況下,系統(tǒng)的運算結(jié)果和沒有任何failure發(fā)生時是一摸一樣的。
3、Effective once msg processing(應(yīng)該翻成有效一次性處理?)
后邊我們可以看到, 保證字面上的Exact once msg processing(即整個系統(tǒng)在物理意義上真的只對消息處理一次), 這在需要考慮容錯的情況下是不可能做到的。
Effective once msg processing是一個更恰當?shù)男稳?#xff0c;而所有號稱可以做到EOMP的系統(tǒng),其實都只是能做到Effective once msg processing。即:中間件, 或者說流處理framework可能在failure發(fā)生的情況下處理了多次同一個消息,但是最終的系統(tǒng)計算結(jié)果和沒有任何failure時, 一個消息真的只處理了一次時計算的結(jié)果相等。這和冪等息息相關(guān)。
4、冪等Idempotent
一個相同的操作, 無論重復多少次, 造成的效果都和只操作一次相等。比如更新一個keyValue, 無論你update多少次, 只要key和value不變,那么效果是一樣的。再比如更新計數(shù)器處理一次消息就計數(shù)器+1, 這個操作本身不冪等, 同一個消息被中間件重"發(fā)+收"兩次就會造成計數(shù)器統(tǒng)計兩次。
而如果我們的消息有id, 那么更新計數(shù)器的邏輯修改為, 把處理過的消息的id全記錄起來, 接到消息先查重, 然后才更新計數(shù)器, 那么這個"更新計數(shù)器的邏輯"就變成冪等操作了。
把本不冪等的操作轉(zhuǎn)化為冪等操作是end to end consistency的關(guān)鍵之一。
5、確定性計算deterministic
和冪等有些類似, 不過是針對一個計算,相同的input必得到相同的output, 則是一個確定性(deterministic)。
比如從一個msg里計算出一個key和一個value, 如果對同一個消息運算無數(shù)次得到的key和value都相同, 那么這個計算就是deterministic的;而如果key里加上一個當前的時鐘的字符串表示, 那么這個計算就不是確定性的, 因為如果重新計算一次這個msg, 得到的是完全不同的key。
注意1: 非確定性計算一般會導致不冪等的操作, 比如我們?nèi)绻焉线吚永锏膋eyvalue存在數(shù)據(jù)庫里, 重復處理多少次同一個msg, 我們就會重復的插入多少條數(shù)據(jù)(因為key里的時間戳字符串不同)。
注意2: 非確定性計算并非必然導致不冪等的操作,比如這個時間戳沒有添加在key里而是添加在value里, 且key總是相同的, 那么這個計算還是"非確定性"計算。但是當我們存數(shù)據(jù)的時候先查重才存keyvalue, 那么無論我們重復處理多少次同一個msg, 我們也只會成功存入第一個keyValue, 之后的keyValue都會被過濾掉。
支持非確定業(yè)務(wù)計算的同時, 還能在容錯的情況下達成端到端一致性, 是流系統(tǒng)的大難題, 甚至我們今天會提到的幾個state of art的流系統(tǒng)都未必完全支持。(好吧Spark說的就是你)
圣光(廣告)不會告訴你的事
分布式系統(tǒng)最tricky的問題就是, 問題看起來很普通很簡單。一些問題總是看起來有簡單直接的解法,而一個"簡單解"被人查出問題時,也總是看起來可以很簡單的就可以把這個挑出的edge case很簡單的解決掉。
然而我們會立刻發(fā)現(xiàn)解決這個edge case而引入的新步驟會引發(fā)新的問題... 如此循環(huán), 直到"簡單"疊加到"無法解決的復雜"。
由于人們對這些問題的"預期是簡單的", 所以很多書, online doc, 都大大簡化了對問題的描述和對問題的分析。最普遍的是對failure recovery的介紹, 一般只會簡單的寫"failure發(fā)生時, 系統(tǒng)會怎么recovery", 但是完全不提怎么檢測failure和“根本不可能完美檢測到failure”這個分布式系統(tǒng)的基本事實, 從而給了讀者“failure可以完美檢測”的錯覺。
這是因為一來說清楚各種edge case會大大增加文檔的復雜性, 另外一點是寫了讀者可能也看不明白, 還有就是廣告效應(yīng), 比如真正字面意義的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系統(tǒng)都說自己可以支持exactly once, 那自己也得打這個廣告不是。
還有就是語焉不詳, 比如某stream系統(tǒng)說自己可以實現(xiàn)exactly once msg delivery, 別看delivery和processing好像差不多, 這里邊的用詞藝術(shù)就有意思了,delivery是指消息只在stream里出現(xiàn)一次, 但是在stream里只出現(xiàn)一次的消息卻無法保證只被consume一次確根本不提。
再比如某serverless產(chǎn)品處理某stream的消息, 描述是保證舊的消息沒有處理之前不會處理新消息, 你會想, 簡單描述成保證消息按順序處理不是一樣么? 其實差大了去了, 前者并沒有屏蔽掉舊消息突然replay, 覆蓋掉新消息的處理結(jié)果的edge case。而這個事實甚至顛覆了很多使用這個服務(wù)的Sr. SDE的對其的認知。
沒有理解分布式系統(tǒng)的幾個簡單的本質(zhì)問題之前, 你讀文檔的理解很有可能和文檔真正精準定義的事實不符。且讀者對“系統(tǒng)保證”的理解, 往往會由于文檔"藝術(shù)"定義的誤導, 而過多的假設(shè)系統(tǒng)保證的"強", 直到被坑了去尋根問底, 才會收到"你誤讀了文檔的哪里的詳細解釋"。這是分布式系統(tǒng)"最難的地方在最普通的地方"的直接結(jié)果之一。
個人認為最好的辦法就是去理解分布式系統(tǒng)軟件算法所能達到的上限=>關(guān)于各種impossibility的結(jié)論的論文,然后去學習克服他們的方法的論文。
這樣, 我們才能從各種簡化了的 tutorials里, 從API中, 從各種云服務(wù), 框架的廣告詞背后, 發(fā)現(xiàn)“圣光不會告訴你的事", 和"這個世界的真相";(從廣告和online doc天花亂墜的描述中看到分布式系統(tǒng)設(shè)計真正的取舍, 這是區(qū)分API調(diào)包俠和分布式系統(tǒng)專家的分水嶺之一)。
而不是“簡單的信了它們的邪”。而下邊,就是學習分布式系統(tǒng),你所需要了解的最重要事實中, 和end to end consistency相關(guān)的幾個。
幾個事實
1、不存在完美的failure detector
很多關(guān)于分布式系統(tǒng)的書上都會說,當failure發(fā)生時系統(tǒng)應(yīng)該怎么做來容錯, 就好像可以準確的檢測到failure一樣。然而事實是, 在目前互聯(lián)網(wǎng)的物理實現(xiàn)上(share nothing architecture, 只靠網(wǎng)絡(luò)互聯(lián),不直接共享其他比如內(nèi)存物理硬盤等),我們無法準確的檢測到failure。
簡單來說,就是當我們發(fā)現(xiàn)一個node無反應(yīng)的時候,比如ping它,給它發(fā)消息、request、查詢,都沒有反應(yīng),我們無法知道這到底是對方已經(jīng)停止工作了,還是只是處理的很慢而已。
無法制造完美的failure detector,即使在今天也是分布式系統(tǒng)的基礎(chǔ)事實。本文無意在基礎(chǔ)事實上多費唇舌, 無法接受此事實者可以去翻相關(guān)論文。╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is>The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from>A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision…
A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]
2、不存在完美的failure detector, 所導致的幾個顛覆你認知的問題
1)分布式共識問題"Consensus"在"不存在完美的Failure Detector的情況下"不可解, 這又叫做FLP impossibility[36]。
可以說是上世紀,奠定分布式系統(tǒng)研究基石方向的一篇論文。即: 在理論上, 在分布式環(huán)境里(更準確說應(yīng)該是異步環(huán)境里), 在最多可能出現(xiàn)一個crash failure的強假設(shè)下, 不存在任何一個算法可以保證系統(tǒng)里的所有的"正常"節(jié)點對某一信息有共識。
對于"共識"你可以理解為一個數(shù)據(jù)一摸一樣的備份在多個節(jié)點上。(那么paxos, raft這種consensus協(xié)議是怎么回事呢? 稍后會解釋)
2)在分布式環(huán)境, 連分配只增序列號這件事都很難(即不同的進程去向一個系統(tǒng)申請序列號, 從0開始不斷增加, 保證process得到的序列號不能重復)。
因為本質(zhì)上這是一個consensus問題, 后邊可以看到, 能夠分配高可用性的global序列號(epoch id), 是解決zombie leader/master/processor的問題時的一大助力。
3)在保證liveness的情況下(即檢測到失敗就在另外的機器重啟邏輯節(jié)點), 無法保證系統(tǒng)中的Singleton角色“在同一時間點”只有一個。
比如在有l(wèi)eader概念的分布式系統(tǒng)里, 要求任意時間點只有一個leader做決定, 比如HBase需要只能有一個Region Server負責某region的寫操作; 再比如kafka或者Kinesis[22]里需要只能有一個partition processor接受一個stream partition的信息并且采取行動。
而事實是, 任何云服務(wù)和現(xiàn)有實現(xiàn), 都無法在物理上保證“在同一時間點”, 真的只有一個這樣的邏輯角色存在于機群中; 這就牽涉到一個概念=> Zombie Process。
4)Zombie process
由于沒有完美的failure detector, 所以即使幾率再低, 只要時間夠長, 需要failure detection的用例夠多, 系統(tǒng)不可避免會錯誤的判斷把一個并沒有真正crash掉的process當作死掉了。
而如果系統(tǒng)需要保持高可用性,需要在檢測到crash的時候,在新的機器上啟動此process繼續(xù)處理,那么當failure detector出錯,則會發(fā)生新老process共同工作的問題,此時,這個老的process就是zombie process。
嚴重注意,在分布式系統(tǒng)里,我們需要單一責任的一個節(jié)點/processor/role來做決策或者處理信息時,我們要么不保護系統(tǒng)的高可用性(機器掛了就停止服務(wù)),要么解決zombie process會帶來的問題。
高可用性的系統(tǒng)中, zombie無法消除。這關(guān)系到分布式系統(tǒng)設(shè)計里的一個核心問題:liveness和safety的取舍。
Liveness和Safety的取舍
1、在缺乏完美的failure detector的情況下, 對方遲遲不回信息(ping它也不回), 不發(fā)heartbeat, 那么本機只有2個選擇:?
認為對方還沒有crash, 持續(xù)等待;
認為其crash掉了, 進行failover處理。
選擇1傷害系統(tǒng)的liveness, 因為如果對方真的掛了,我們會無限等待下去, 系統(tǒng)或者計算就無法進行下去。選擇2傷害系統(tǒng)的safety, 因為如果對方其實沒有crash, 那我們就需要處理可能出現(xiàn)的重發(fā)去重, 或者zombie問題, 即系統(tǒng)的邏輯節(jié)點的“角色唯一性“就會被破壞掉了。
越好的liveness要求越快的響應(yīng)速度, 而“100%的safety“的意義, 則因系統(tǒng)的具體功能的不同而不同, 但一般都要求系統(tǒng)做決定要小心謹慎, 不能放過一個edge case, 窮盡所有必要的檢查來保證"系統(tǒng)不允許出現(xiàn)的行為絕對不會發(fā)生"。在consensus的語義下來說, safety就是絕對不能向外發(fā)出不一致的決定(比如向A說決定是X, 后來向B說決定是Y)。
可以看到, 系統(tǒng)的edge case越多, safety越難保證, 而edge cases的全集只是可能發(fā)生的情況的集合, 而某一次運行只會發(fā)生一種情況(且大概率是正常情況)。
如果系統(tǒng)不檢查最難分辨最耗時的幾種小概率發(fā)生的edge case, 那么系統(tǒng)大概率(甚至極大概率)也可以完美運轉(zhuǎn)毫無問題幾個月, 運氣好甚至幾年。這樣降低了系統(tǒng)的safety(不再是100%), 但是提高了系統(tǒng)的響應(yīng)速度(由于是概率上會出問題, 所以即使降低了safety保證, 也不是說就一定會出問題, 只是你把系統(tǒng)的正確性交給了運氣和命運)。
而如果系統(tǒng)保證檢查所有的edge case, 但是系統(tǒng)99.9999%的概率都不會進入一些edge cases, 那么這些檢查就會阻礙正常情況的運算速度。Liveness和Safety, 這是分布式系統(tǒng)設(shè)計的最基本取舍之一。
而FLP則干脆說: 在分布式consensus這個問題里, 如果你想要獲得100%的系統(tǒng)safety, 那么你絕對無法保證系統(tǒng)liveness, 即:系統(tǒng)總是存在活鎖的可能性, 算法設(shè)計只能減小這個可能性, 而無法絕對消除它。
2、更多的safety VS. liveness 取舍的例子:
Kubernetes StatefulSet, 簡單說是可以給容器(pod/container)指定一個名字的, 且保證全cluster總是只有一個容器可以有這個名字, 這樣application就可以通過這個保證來指定機群中的邏輯角色, 且用這個邏輯容器中保存一些狀態(tài)。(一般的replicaSet會load balance連接或請求到背后不同的節(jié)點, 你的一個請求要求在server本地存一些狀態(tài), 下一個請求未必還會到同一個server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]
Kubernetes StatefulSet在liveness和safety里選擇了safety, 當statefulSet所在的的物理節(jié)點"掛了"之后, kubernetes默認不會重啟這個pod到其他節(jié)點去, 因為它無法確定這個物理節(jié)點到底死沒死, 為了保證safety它選擇放棄了liveness, 即系統(tǒng)無法自愈, StatefulSet提供所提供的服務(wù)不可用, 直到靠人干預來解決問題。
([38] P305: 10.5. Understanding how StatefulSets deal with node failures)?
Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...
Akka Cluster也做了相同的選擇, 在cluster membership管理中,有一個auto-downing的配置, 如果你打開它, 那么cluster就會完全相信Akka的failure detection而自動把unreachable的機器從cluster中刪去, 這意味著一些在這個unreachable節(jié)點上的Actor會自動在其他節(jié)點重啟。
Akka Cluster的文檔中, auto-downing是強烈不推薦使用的[38], 這是由于Akka Cluster提供的很多feature要求角色的絕對單一性, 比如singleton role這個功能, 在保證“cluster里只有這一個節(jié)點扮演這個actor"(safety), 和保證"cluster里總要有一個節(jié)點扮演這個actor"(liveness) 中, 選擇了safety, 即保證at most one actor存在于cluster中, 一旦次actor的節(jié)點變成unreachable(比如機器真的掛了), 那么Akka也無能為力, 只能傻等這個節(jié)點回來或者人來干預決策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]
一個商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能點的解決方案(基于quorum), 有興趣的同學可以看引用文檔[29]。
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]
3、為什么Kubernetes和Akka不能同時保證safety和liveness呢?
這是因為這兩個作為比較底層的平臺, 他們需要對上層提供非常大的自由性, 而不能限制上層的活動。比如kubernetes沒有規(guī)定用戶不能在pod上跑某種程序, Akka也沒有規(guī)定用戶不能寫某種actor的code。
這樣, 在不限制自己處理能力的同時要保證任何行為都看起來exactly happen once(因為語義上singleton節(jié)點只有一個, 那么就不能讓用戶寫的任意單線程程序出現(xiàn)多節(jié)點平行執(zhí)行的外部效果), 而這對中間件來說是不可能的, 這就引出了另外一篇論文: end to end argument[27], 作者已經(jīng)寫過一篇文章詳細介紹end to end argument(阿萊克西斯:End to End Argument(可能是最重要的系統(tǒng)設(shè)計論文)), 這里不在贅述。
后邊我們可以看到Flink, Spark等流系統(tǒng)為了保證exactly once msg processing需要怎樣和end to end argument 搏斗。
4、可以同時保證safety和liveness么?
取決于具體情況下對safety和liveness的具體要求, 在流處理的情況下, 至少本文提到的4種流系統(tǒng)都給出了自己的解。請耐心往下閱讀。
絕望中的曙光
1、可解也不可解的分布式consensus
由于異步環(huán)境下, 釘死了我們不可能有一個完美不犯錯的failure detector。這篇著名的論文Unreliable Failure Detectors for Reliable Distributed Systems[30] 詳細描述了即使我們用一個不準確的failure detector, 也可以解決consensus的方法。
但是它并沒有推翻FLP impossibility的結(jié)論:Consensus還是并非絕對可解。但是, 如果我們對需要consensus的計算加一個限制,則Consensus可解。
這個限制是: 計算和通訊只需要在"安全時間"內(nèi)完成即可, 對[30]提供的算法來講, 提供consensus的系統(tǒng)需要在這段時間內(nèi)"正確識別crash"即可,也就是說(1)識別出真正掛掉的node, 和(2)不要懷疑正確的node。
怎么理解呢, 這兩個看似對立的概念:?
(1)consensus的有解(比如paxos協(xié)議)是對的;
(2)consensus的無解證明:FLP impossibility也是對的。
要準確且簡單的解釋為什么它們都是對的有點難, 推薦還是看論文。但是用比喻來解釋的話, 根據(jù)[30], Consensus算法可以看作這樣一個東西, 當系統(tǒng)出現(xiàn)crash, failure detector判斷錯誤,或者網(wǎng)絡(luò)突然延遲...等時候, 算法會進入某種循環(huán)而不會輕易作出決定。
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]
而只要滿足必要的條件時(計算和通訊只需要在"安全時間"內(nèi)完成), 系統(tǒng)則可以跳出循環(huán)讓機群達成一致[30,31]。
(1) There is a time after which every process that crashes is always suspected by some correct process.?
(2) There is a time after which some correct process is never suspected by any correct process.?
The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.
而FLP impossibility則可以理解為挑刺兒的說, 那這個條件永遠無法出現(xiàn)呢? 你的算法就活鎖了呀(丟失liveness)。
幸運的是, 在現(xiàn)實世界, 我們總可以對消息傳遞和處理來估計一個上限, 你可以理解為,只要消息處理總是在這個上限之內(nèi)完成,那么consensus總是可以實現(xiàn), 而消息處理的時間即使偶爾超過了這個上限, 我們的consensus協(xié)議也會進入安全循環(huán)自我保護, 從而不會破壞系統(tǒng)的safety, 而系統(tǒng)總是可以再次回歸平穩(wěn)(處理時間回歸上限之內(nèi))。
而FLP則是像說: 你無法證明系統(tǒng)總是可以回歸平穩(wěn) (確實無法證明, 因為FLP的前提是異步模型, 而我們的真實世界更像是介于異步和同步模型之間的半同步模型, 我們只能說極大概率系統(tǒng)可以"回歸平穩(wěn)", 而無法證明它的絕對保證; =>可以絕對保證"上限"的模型一般稱為同步模型)。
其實用paxos來模擬出FLP的活鎖的例子也很簡單, 你把節(jié)點間對leader的heartbeat timeout時間設(shè)為0.001ms, 那么所有的節(jié)點都會忙著說服別的其他節(jié)點自己才是leader(因為太短的保活時間, 除了自己, 節(jié)點總是會認為其他的任意節(jié)點是leader時, leader死了), 那么系統(tǒng)就會進入活鎖, 永遠無法前進達成cluster內(nèi)的consensus, 系統(tǒng)喪失liveness。
Zombie Fencing
即使consensus問題解決了, zombie節(jié)點也還是大問題, kubernetes和Akka可以選擇避開zombie, 損失liveness。
然而對于絕大多數(shù)分布式系統(tǒng)來說, 是必須直面zombie節(jié)點這個問題的,比如各種分布式系統(tǒng)的master節(jié)點, 如果master掛了整個系統(tǒng)不在另外的機器重啟master,整個系統(tǒng)就可能變?yōu)椴豢捎谩?/p>
再比如kafka和Kinesis的單一partition只能有一個consumer, 如果這個msg consumer掛了不自動重啟, 對消息的處理就會完全停止。
zombie是最容易被忽視的問題, 比如, 即使我們有了paxos, raft, zookeeper這種consensus工具可以幫我們做leader election, 也不要以為你的系統(tǒng)中不會同時有2個leader做決策了。
這是因為先一代的leader可能突然失去任何對外通信,或者cpu資源被其他進程吃光, 或者各種edge case影響, 使得其他節(jié)點無法和其通信, 新的leader被選出, 而老的leader其實還沒死, 如果老的leader在失去cpu之前的最后一件事是去寫只有l(wèi)eader才能寫的數(shù)據(jù)庫, 那么當它突然獲得cpu時間且網(wǎng)絡(luò)恢復正常, 那么這個以為自己還是leader的zombie leader就會出乎意料的去寫數(shù)據(jù)庫。
這曾經(jīng)是HBase的一個重大bug[39, Leader Election and External Resources P105]。
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...?
The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.
(解釋不動了, 大家看英文吧...)
其實對付zombie已經(jīng)是分布式系統(tǒng)的共識了,也有很多標準的解法,以至于各個論文都不會太仔細的去描述, 這里簡單介紹幾種方法:
zombie fencing設(shè)計的關(guān)鍵點在于如何阻止已經(jīng)“成為zombie的自己”搞亂正常的“下一代的自己”的狀態(tài)。畢竟無論是zombie還是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代碼邏輯,也就是說這同一段代碼, zombie來跑就"不能過:"(比如不能對系統(tǒng)的狀態(tài)造成影響), 但是"下一代"來跑, 就可以正常工作。這一般需要滿足以下幾點:
1)如何正確區(qū)分“正常的下一代”(由于懷疑當前的節(jié)點已經(jīng)死掉了,所以重新創(chuàng)建和啟動的新一代邏輯節(jié)點)和“zombie”(懷疑錯誤,當前節(jié)點并沒有死掉,但是新一代節(jié)點已經(jīng)創(chuàng)建并啟動,當前節(jié)點成為大家都以為死掉但是還活著的zombie)一般需要一個多機復制且穩(wěn)定自增的epoch number來確定新老邏輯節(jié)點。
這個epoch number要在分布式環(huán)境中穩(wěn)定自增,一般只能通過consensus協(xié)議來實現(xiàn)。否則要分配新一代epoch number時,由于管理epoch number的機群的failover造成分配了一個老的epoch number給新啟動的“下一代”,那么zombie反而會有更大的epoch number,這就會造成整個系統(tǒng)的狀態(tài)混亂。
怎樣的混亂在介紹完zombie fencing之后就顯而易見了(因為所有其他節(jié)點都以為zombie死掉了, 把所有的最新操作和狀態(tài)發(fā)給新節(jié)點,但是新節(jié)點卻有一個比zombie還小的epoch number, 從而被zombie fence掉, 而不是自己可以fence zombie)
2)會被zombie影響的系統(tǒng)需要特殊設(shè)計使得:當“新一代”注冊后就拒絕“老一代的任何請求”,特別是寫入請求。也就是具體的利用epoch number的zombie fencing的實現(xiàn); 一般需要具體情況具體分析。
如果被影響的系統(tǒng)是自己的一個microservice,那么可以隨意設(shè)計協(xié)議來驗證一個請求所攜帶的epoch number是不是最新的。而當這個被影響的系統(tǒng)是一個外部系統(tǒng), 比如是業(yè)務(wù)系統(tǒng)需要用到的一個數(shù)據(jù)庫,由于你沒法改數(shù)據(jù)庫的代碼和數(shù)據(jù)庫client與server之間的協(xié)議, 那么就要利數(shù)據(jù)庫所提供的功能或者說它的協(xié)議來設(shè)計application層級的zombie fencing協(xié)議。
比如對提供test and set,compare and swap的kv數(shù)據(jù)庫來說,application設(shè)計自己的業(yè)務(wù)表時,要求所有的表都必須有一個epoch字段,而所有的寫入都必須用test and set操作來檢測當前epoch字段是否比要寫入的請求的epoch字段大或相等, 否則拒絕寫入。這樣, 只有"下一代"可以更改zombie寫入的數(shù)據(jù), 而zombie永遠無法更改"下一代"插入或者更新過的數(shù)據(jù)。
另一方面,很多時候"下一代"需要讀取上一代的信息,繼承上一代的數(shù)據(jù),然后繼續(xù)上一代的工作。那么如果剛讀取完數(shù)據(jù),zombie就改變了數(shù)據(jù),那么"下一代"對于當前系統(tǒng)狀態(tài)的判斷就會出差錯。
一個general的解決的方法也很簡單,要讀先寫,“下一代”開始工作前, 如果要先讀入數(shù)據(jù)了解“系統(tǒng)的當前狀態(tài)”,必須先改變數(shù)據(jù)的epoch number為自己的epoch number(當然要遵從只增更改原則test and set,如果發(fā)現(xiàn)當前數(shù)據(jù)的epoch number已經(jīng)比自己的epoch number還大了,則說明自己也已經(jīng)是zombie了,更新的"下下一代"已經(jīng)開始工作), 更改數(shù)據(jù)的epoch number成功之后,再讀入數(shù)據(jù),就可以保證比自己老的zombie絕對不可能再更改這個數(shù)據(jù),而現(xiàn)在讀入的數(shù)據(jù)可以體現(xiàn)系統(tǒng)的最新狀態(tài),從而完成對"老一代"數(shù)據(jù)的繼承。
而在增加epoch number之前所有被寫入的數(shù)據(jù)。這里即使是"新一代"啟動之后, 讀取系統(tǒng)狀態(tài)之前被zombie寫入的數(shù)據(jù), 都可以看做老一代的合法數(shù)據(jù),只要被新一代開始工作前繼承讀入即可。我們所要避免的是"新一代" 所讀取的事實被zombie所更改。而不是在物理時間的意義上在"新一代"啟動時就立刻阻止zombie的所有系統(tǒng)改動。
zombie fencing的設(shè)計取決于分布式系統(tǒng)的具體情況,比如業(yè)務(wù)邏輯可能更改的數(shù)據(jù)范圍可能是幾百萬幾千萬的數(shù)據(jù)記錄,那么這也意味著zombie可能會修改的數(shù)據(jù)范圍非常大,那么要求"下一代"在開始工作前更改所有數(shù)據(jù)的epoch number就很不現(xiàn)實。
對于zombie的影響的耐受性也會影響zombie fencing的設(shè)計,比如如果"下一代"只需要自己所接觸的有限數(shù)據(jù)在特定時刻之后不被zombie影響就能正確工作, 那么只要在"下一代"需要接觸特定數(shù)據(jù)時才更改此數(shù)據(jù)的epoch number來屏蔽zombie即可,那么即使業(yè)務(wù)可能修改的數(shù)據(jù)范圍很大,簡單的更改數(shù)據(jù)的epoch number也還是可以接受的解決方案。
最糟糕的情況,如果"zombie"可能會插入新的數(shù)據(jù), 而"下一代"的正常工作需要不能有非法的新數(shù)據(jù)插入(比如下一代開始工作前先統(tǒng)計所有資源的個數(shù),然后開始基于這個事實和"只有自己才能更改資源"的假設(shè),作出各種決策。
而此時zombie突然插入了一條新資源記錄或者資源使用記錄...),如果"新一代"完全無法預測zombie會插入什么記錄,要阻止zombie隨意插入數(shù)據(jù),“新一代”就只能在利用predicate lock來防止新紀錄插入,且不說很多數(shù)據(jù)庫根本不支持“鎖住不存在的數(shù)據(jù)”的predicate lock,就算支持此功能的數(shù)據(jù)庫也很有可能是使用表級鎖來鎖住整張表。
如果數(shù)據(jù)表設(shè)計成了需要共享給多個節(jié)點使用(比如一張資源表,不同的singleton worker負責維護不同的資源范圍),那么表級鎖就會妨礙其他worker的工作。
zombie fencing的設(shè)計在于如何引入簡單的fencing點, 對"新一代"暢通無阻,但是卻可以阻止zombie的異常活動, 如果協(xié)議設(shè)計使得"新一代"可以很容易制造這個fencing點, 則"新一代"在啟動或者需要的時候加入fencing點即可。
比如前邊說的任何數(shù)據(jù)都要附帶一個epoch值,任何數(shù)據(jù)寫入都要用test and set來對比數(shù)據(jù)的當前epoch值和請求的epoch值。
對于上文的隨機插入的業(yè)務(wù)需求, 可以要求業(yè)務(wù)邏輯插入任何數(shù)據(jù)之前,先在一個注冊表的屬于自己epoch的一行里記錄自己要寫的數(shù)據(jù)的id, 且在記錄的時候用test and set來檢測自己這一行數(shù)據(jù)的active值是否被更改為disable了。
這樣就相當于引入了一個更簡單的fencing點,因為"下一代"只要在注冊表里把所有上一代的記錄寫為disable, 就可以阻止zombie的未來任何活動,但是此時無法阻止zombie的最后一個注冊的數(shù)據(jù)插入, 但是"下一代"可以簡單的讀注冊表得知這個數(shù)據(jù)的id, 從而對這個"最后的zombie寫入"采取相應(yīng)的策略(繼承,刪除, 甚至fencing, 比如這個id并不存在,那么無法得知是zombie真的在寫之前死了所以永遠不會插入這個記錄了,還是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注冊的id先插入一條記錄來占位,這樣無論zombie是真的死了還是卡了,都無法再寫入這個數(shù)據(jù)了)。這樣,我們就引入了一個連數(shù)據(jù)插入都可以fencing的fencing點。
Zombie fencing一般都是以上這些套路, 用consensus協(xié)議確定epoch number區(qū)分"下一代"還是zombie,這個epoch number一般也可以稱為fencing token, 通過把fencing token分發(fā)給需要拒絕zombie的service,把fencing token和需要保護的數(shù)據(jù)(防止被zombie修改)存在一起。
所以一般論文[7, 26]里只會簡單的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token。
三節(jié)點間的EOMP
三點為 (上游/input提供端)=> (當前計算節(jié)點/計算結(jié)果發(fā)送端) =>(下游/計算接收端)
如果我們考慮必須保證系統(tǒng)的高可用性,即檢測到任意process的failure,都會由一個(絕對不死)的高可用的JobManager或者MasterNode,來重啟(可能在另外的node)這個process, 所以我們定義這種即使所在的host掛掉, 也會不斷重新在其他host上重啟的process為邏輯process。這時我們要面臨幾種可能造成inconsistent的情況:
"計算接受端"沒有成功ack"計算結(jié)果發(fā)送端"的消息,一般表現(xiàn)為發(fā)送端的等待ack 超時。根據(jù)之前的討論,接收端有可能把消息處理完畢了(ack的消息丟失,或者剛處理完消息還沒發(fā)ack就掛了…等情況),也可能沒有處理完畢(沒接到或剛接到消息就掛了…等情況)。
這種情況發(fā)送端可以重發(fā)信息, 而發(fā)送端是需要“上游input提供端”提供某種數(shù)據(jù)然后進行某種計算后產(chǎn)生的這個消息/計算結(jié)果(設(shè)為outputA), 那么"計算結(jié)果發(fā)送端"有兩個策略:
策略1: 利用存儲計算結(jié)果來盡量避免重算
要實現(xiàn)上下游exact once processing,需要實現(xiàn)4個條件:
結(jié)果高可用;
下游去重;
上游可以replay;
記錄上游進度。
1)要求結(jié)果高可用, 應(yīng)對timeout時, “下游計算接收端”其實并沒有成功處理"計算結(jié)果發(fā)送端"的計算結(jié)果的情況(比如下游掛了), 這時"計算結(jié)果發(fā)送端"可以把計算的結(jié)果存儲在高可用的DataStore里(比如DynamoDB,Spanner…或者自己維護的多備份數(shù)據(jù)庫)。
這樣超時只要重發(fā)這個計算結(jié)果即可, 自己甚至可以開始去做別的事情, 比如處理和計算下一個來自“上游/input提供端“的event, 而已經(jīng)被“下游計算接收端”ack的"計算結(jié)果"則可以清理,一般由異步的garbage collection清理掉。
注意, 由于存在存儲失敗的可能性, 或者剛計算完結(jié)果還沒來得及存儲就掛掉重啟的可能,我們無法真的保證避免重算,詳見:無法避免的重算的例子。
2)下游去重,應(yīng)對timeout時下游其實已經(jīng)處理完畢消息的情況
①一般的解決方案:當邏輯接收端不固定, 比如發(fā)送端要根據(jù)計算出來的outputA的某key字段把不同的key發(fā)送給負責不同key range(也就是partition)的多個"下游計算接收端"。
只需要一個sequenceId就可以實現(xiàn)接收端的消息去重。接收端和發(fā)送端各維護一個partition level的sequenceId即可。這樣當發(fā)送端收到當前message sequenceId(假設(shè)為n)的Ack才發(fā)下一個sequenceId為n+1的信息,否則就無限重試。而接收端則根據(jù)收到的消息的id是不是已經(jīng)處理過的最大id+1來判斷是這是不是下一個message。
②Google MillWheel的特例:Google MillWheel做出了一個很有意思的選擇,發(fā)送端完全不維護sequenceId,而是為每一個發(fā)出的message生成一個全局唯一的id,下游則需要記住"所有"見過的id來去重,但是這樣會造成大量查詢io和存儲cost,所以需要另外的方案來解決性能和下游沒有無限的存儲所以"不可能記住所有id"的問題。這個例子比較特殊,有興趣的同學可以查閱[4,7]
③要求觸發(fā)本次計算的“上游input提供端”可以replay input event,否則剛接到event還沒計算就掛掉重啟, 則event丟失。
無法避免的重算:任何時候計算沒完成,或者計算完成后但是成功儲存前(a.結(jié)果高可用的需求), 計算節(jié)點fail掉重啟, 我們都需要replay上次計算過的input event,所以由于計算結(jié)果都還沒存成功,所以從物理上講, 此時我們還是重算了的; 所以即使我們采用把計算結(jié)果記錄下來的策略, 我們無法從物理意義上真正避免重算, 我們避免的是有多個"重復的"成功計算結(jié)果提交給下游。
而當計算不是deterministic的, 這多個“重復的”計算結(jié)果可能是不同的值發(fā)送給不同的下游((比如按照計算結(jié)果的key發(fā)送給下游不同的partition)。那么下游就會處理同一個event所產(chǎn)生的本應(yīng)只有一個的計算結(jié)果兩次,且由于非確定性計算的原因,這兩個計算結(jié)果不一樣。這就會造成event不是EOMP的問題。(不僅在物理上計算了2次, 在效果上也影響了2次下游的計算, 打破的effective process once的要求)
④要求記錄event處理的進度, 并保證儲存計算結(jié)果不出現(xiàn)重復。記錄event處理的進度, 使得trigger本次計算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event處理完畢, 可以發(fā)下一個了), 來避免計算的re-trigger; 這要求以下策略2選一:
記錄event處理的進度, 和把計算結(jié)果存在高可用存儲里的操作是一個原子操作, 要么一起成功, 要么一起失敗; 這種策略可以保證當計算結(jié)果儲存下來, 此計算不會replay了;
或者存儲計算結(jié)果是一個冪等操作,那么可以先存計算結(jié)果,再記錄event處理進度,一旦計算計算結(jié)果成功但是記錄event處理進度失敗,重新計算上游的同一個event并儲存計算結(jié)果也不會引起問題。
否則要么計算沒存event就被屏蔽掉了, 要么多次計算結(jié)果存儲在DataStore里造成下游的重復信息。注意, 此時下游是無法分辨這是重復信息的, 因為這是datastore里的"2條的記錄", 將會獲得不同的message id。
冪等和end2end argument: 所以實現(xiàn)原子操作就不需要冪等了么? 是也不是, 在業(yè)務(wù)層是的, 比如要實現(xiàn)業(yè)務(wù)層的冪等,我們可以在存計算結(jié)果到datastore里的時候把一個與觸發(fā)本次計算的event的唯一id記錄在一起,這樣我們每次存的時候就可以使用樂觀鎖的方式test-and-set, 來保證如果這個id在數(shù)據(jù)庫里沒有才插入。(取決于業(yè)務(wù),我們也可以用這個id當主key來,那么即使多次寫入同樣的內(nèi)容也沒關(guān)系=>要求計算是deterministic的)?
如果我們保證觸發(fā)計算的event的"屏蔽"和計算結(jié)果的儲存是一個原子操作,那么我們就不需要上邊這種復雜的存儲策略,因為一旦計算結(jié)果存儲成功,觸發(fā)計算的event必定被"屏蔽"掉了, 而如果沒存儲成功, 則event一定會replay來重試。
然而在傳輸層卻不是的,比如儲存數(shù)據(jù)庫的tcp有可能丟包重發(fā),依靠tcp的傳輸層id自動去重,實現(xiàn)tcp的冪等。
策略2: 完全依賴重算。
高可靠重發(fā)的問題是,所有信息都必須先記錄在高可用性的DataStore里, 相對于重新計算,重發(fā)需要的網(wǎng)絡(luò)IO, 存儲,狀態(tài)管理的cost是很高的。
而如果觸發(fā)計算的event可以replay的話(其實不管重算還是不重算,為了防止“剛接到event, 計算節(jié)點就掛掉的情況”, 我們都要求event可以replay), 我們就可以選擇重算然后重發(fā)來代替存儲計算結(jié)果的重發(fā);重算需要2個條件:
計算需要是 deterministic 的,用完全一樣的數(shù)據(jù),必須算出完全相同的結(jié)果,否則,當計算結(jié)果所需要發(fā)送的邏輯下游是由計算結(jié)果所決定的情況下(比如按照計算結(jié)果的key發(fā)送給下游不同的partition) 那么non-deterministic的重算有可能把計算結(jié)果發(fā)給不同的下游,這樣如果重算發(fā)生時,下游(假設(shè)是節(jié)點A)其實已經(jīng)成功處理完畢重算前上游發(fā)送的信息, 只是ACK丟失, 那么重算的結(jié)果卻發(fā)送給了另外一個(節(jié)點B), 那么就會造成一個event造成了2個下游effect的結(jié)果, 引起一個event造成2次下游影響的結(jié)果, 違反EOMP的原則;
重算之前, 狀態(tài)需要rollback到?jīng)]有計算之前, 否則會影響需要狀態(tài)的計算的結(jié)果正確性,如果狀態(tài)更新非冪等,本次計算所做的狀態(tài)更新也會更新多次;詳見"加入節(jié)點狀態(tài)的三節(jié)點間的EOMP"。
(在多節(jié)點流計算里,要求上游可以重發(fā),意味著上游把計算結(jié)果存下來了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重發(fā),那么上游的上游可以用儲存的結(jié)果重發(fā)或者重算。。。以此類推)
(2種策略其實都有可能造成重算,也都對event replay有需求。為什么還要浪費資源去存儲計算結(jié)果呢?這里邊的重要區(qū)別是,當儲存結(jié)束時,對觸發(fā)本次計算的上游event的依賴結(jié)束了,而不穩(wěn)定的下游不會造成額外的重算, 和對上游, 上游的上游....計算的"鏈式反應(yīng)", 詳見流的EOMP中的討論)
加入節(jié)點狀態(tài)的三節(jié)點間的EOMP
帶狀態(tài)的計算, 比如流計算的某中間節(jié)點需要統(tǒng)計總共都收到多少信息了, 每次從上游收到新信息, 都把自己統(tǒng)計的當前歷史信息總數(shù)更新并發(fā)往下游節(jié)點, 那么這個"系統(tǒng)的歷史信息"就是這個"統(tǒng)計消息總數(shù)"的邏輯節(jié)點的狀態(tài)。
由于狀態(tài)也需要高保活,所以它也一定需要儲存在遠端dataStore里, 這樣儲存狀態(tài)的遠端datastore就相當于一個特殊的下游。不同點在于, 當采用策略2:重算, 而不存儲中間計算結(jié)果的話, 重算時則需要datastore可以把它所記錄的狀態(tài)rollback到最初剛開始處理此event的那個點。
這里我們只能rollback, 而不能只是依靠冪等來保證“狀態(tài)的更新是exactly once”的原因是, 節(jié)點在處理任意消息時的狀態(tài)也和當前信息的數(shù)據(jù)一樣是本次計算的input, 而更新后的狀態(tài)則是本次消息處理的output, 如果重算時不rollback節(jié)點的狀態(tài), 那么我們就會用一個被本消息"影響過"的狀態(tài)來進行計算, 而這是會違反exactly once msg processing語義的。
比如節(jié)點的本地狀態(tài)是上次收到的信息的數(shù)據(jù)上記錄的時間戳, 節(jié)點的運算是計算2個event數(shù)據(jù)之間的時間戳差距。假設(shè)eventA發(fā)生在時刻0, eventB發(fā)生在時刻10, 那么eventB引發(fā)的計算應(yīng)往下游發(fā)送10, 并把節(jié)點的本地狀態(tài)更新為10, 此時如果eventB的這個計算需要重算, 但是我們不rollback狀態(tài)10回到0的話, eventB重算所得的結(jié)果就會變成0。
注意: 由于state更新也是處理event的"下游", 那么計算過程中的所有狀態(tài)更新都可以算作“計算結(jié)果”的一部分, 所以當我們需要儲存計算結(jié)果時,則需要把:
狀態(tài)更新儲存回高可靠的statestore里;
記錄event處理進度;
把計算結(jié)果存在高可用存儲里。
這3個操作作為一個原子操作(以后我們稱之為"原子完成"來省略篇幅); 而任何時候需要重算的話, 狀態(tài)必須恢復到處理event之前的樣子。
加入state,我們需要把(d. 要求記錄event處理的進度, 并保證儲存計算結(jié)果不出現(xiàn)重復, 更改為 (d+. 要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復。
并加入要求(e. state需要在replay 上游event的時候rollback到處理event之前時的狀態(tài)。
這些要求稍有抽象,讓我們看一下流系統(tǒng)一般怎么達成這些要求。
流系統(tǒng)的EOMP
考慮一個多節(jié)點的流系統(tǒng),如果我們把上游所發(fā)來的計算結(jié)果當成前邊所說的“觸發(fā)計算的event”,而自己的發(fā)給下游的計算結(jié)果msg作為觸發(fā)下游計算的event。那么我們就可以用上邊的模型保證兩兩節(jié)點之間的exact once msg processing,從而最終實現(xiàn)端到端的exact once msg processing; 這就是Google MillWheel(DataFlow) 和Kafka Stream的解決方案。
他們都選擇把每個節(jié)點的計算結(jié)果儲存起來,并保證即使non-deterministic的計算, 也只有一次的計算會起作用, 而不會出現(xiàn)(策略2-1中提到的non-deterministic造成的不一致)。他們的區(qū)別是:
如何實現(xiàn)state和;
如何實現(xiàn)接收端去重;
如何實現(xiàn)“原子完成”
1、Google MillWheel(DataFlow)
1)每個節(jié)點維護一個用來去重的"已處理msgId"集, 從上游收到新msg之后, 檢查去重 (b.下游去重)
2)開始計算, 所有的狀態(tài)更新寫在本地, 由于一個狀態(tài)只有一個更新者(本計算), 所以可以在本地維護一個狀態(tài)的view, 所有的更新只更新本地的view而暫時不commit到"remote的高可用DataStore", MillWheel用的BigTable。
3)計算完畢后, (1).所有的要發(fā)送的計算結(jié)果,(有一些可能是在計算過程中產(chǎn)生并要求發(fā)送的, 都會cache起來), (2)所有的state的所有更新, (3) 引發(fā)計算的msgId, 會用一個atomic write寫在BigTable里。(a.要求結(jié)果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復)
4)當commit成功之后, ACK上游, 而由于上游也采用commit計算結(jié)果到BigTable里的策略,且只有當自己(這里)發(fā)出的消息ACK之后, 才會允許 garbage collection回收計算結(jié)果占用的存儲, 所以在收到ACK之前, 上游的計算結(jié)果, 也就是當前計算所需要的msg, 都可以重發(fā),直至本節(jié)點計算成功且commit結(jié)果 (c. 要求觸發(fā)本次計算的event可以replay)
5)一旦計算過程中failure發(fā)生(比如機器掛了), 會在另外的host上重啟本process節(jié)點,從BigTable恢復本地state和"用來去重的已處理msgId集", 由于上次計算的結(jié)果還沒有commit, 所以滿足(e. state需要在replay event的時候rollback到處理event之前時的狀態(tài))
5)新啟動的運算節(jié)點在load本地狀態(tài)之前先用自己的sequencer廢掉現(xiàn)存的sequencer, 這樣BigTable就可以block zombie計算節(jié)點的寫。
2、Kafka Stream
Kafka Stream是建立在kafka分布式隊列功能上個一個library, 所以在介紹kafka Stream之前, 我們先來講一下Kafka
3、簡單介紹Kafka Topic
Kafka的topic可以看作一個partition的queue, 通過發(fā)給topic時指定partition(或者用一個partitioner 比如按key做hash來指定使用那個partition), 不同的key的消息可以發(fā)送到不同的partition, 相同key的message則可以保證發(fā)送到同一個partition去, topic里的信息可以靠一個確定的index來訪問, 就好像一個數(shù)據(jù)庫一樣,所以只要在data retention到期之前,consumer都可以用同一個index來訪問之前已經(jīng)訪問過的數(shù)據(jù)。
4、Kafka Transactional Messaging
前邊說過, Kafka Stream是建立在kafka分布式隊列功能上個一個library, 主要依靠kafka的Transactional Messaging來實現(xiàn)end2end exactly once msg processing。
Transactional Messaging是指用戶可以通過類似以下code來定義哪些對kafka topic的寫屬于一個transaction, 并進一步保證tx的atomic和Isolation。
producer.initTransactions();
?try {?
? ? // called? right before sending any records?
? ? producer.beginTransaction();
? ? //sending some messages...
? ? // when done? sending, commit the transaction?
? ? producer.commitTransaction();
} catch? (Exception e) {
? ? ?producer.close();
} catch? (KafkaException e) {
? ? producer.abortTransaction();??
}?
Kafka transaction保證了, beginTransactions之后的, 所有往不同Kafka topic里發(fā)送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作廢, 從而不為read-committed的consumer所見。
而kafka stream通過用kafka本身的分布式queue的功能來實現(xiàn)了state和記錄處理event進度的功能,因為:
所有的要發(fā)送的計算結(jié)果(由于可以允許計算發(fā)不同消息給多個下游,所以可能發(fā)給不同的topic和partition);
記錄input event stream的消費進度;
所有的state的所有更新。
這3點, Kafka Stream都是用寫消息到kafka topic里實現(xiàn)的。
1)自不必說,本來就是往topic里寫數(shù)據(jù)。
2)其實是寫當前consume position的topic;。(注意Kafka Stream的上下游消息傳遞考的是一個中間隱藏的Kafka topic, 上游往這個topic寫, 下游從這個topic讀, 上游不需要下游的ack,只要往topic里寫成功即可, 也不需要管下游已經(jīng)處理到那里了。
而下游則需要維護自己"處理到那里了"這個信息,儲存在consume position的topic, 這樣比如機器掛掉需要在另外的host上重啟計算節(jié)點,則計算節(jié)點可以從記錄consume position的topic里讀出自己處理到那里然后從失敗的地方重洗開始。
3)其實是寫一個內(nèi)部隱藏的state的change log的topic,和一個本地key value表(也就是本計算節(jié)點的state)。failover的時候, 之前的"本地"表丟失沒關(guān)系, 可以沖change log里恢復出失敗前確定commit的所有state。
(1)(2)(3)的topic都只是普通的Kafka topic。只不過(2)(3)由Kafka Stream自動創(chuàng)建和維護(一部分用來支持高層API的(1)也是自動創(chuàng)建)
開始計算時, 在從上游的topic里拿msg之前, Kafka Stream會啟動一個tx, 然后開始才開始計算, 此時tx coordinator會分配一個新的epoch id給這個producer并且以后跟tx coordinator通訊都要附帶這個epochId;
Kafka Stream的計算節(jié)點的上游信息都來自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有確定的某種sequenceId, 所以只要計算節(jié)點記錄好自己當前處理的sequenceId, 處理完一個信息就更新自己的sequenceId到下一個record, 且commit到可靠dataStore里, 就絕對不會重復處理上游event, 而只要沒有commit這個位置則可以無數(shù)次replay當前的record; (b.下游去重, c. 要求觸發(fā)本次計算的event可以replay);
在tx內(nèi)部,每從上游topic里讀一條信息就寫一條信息到記錄consume position的topic里, 每一個state的更改都會更新到本地的state(是一張表)里,且同時寫在隱藏的changelog里; 計算過程中需要往下游發(fā)信息則寫與下游聯(lián)系的topic;
計算結(jié)束后, commit本次的tx, 由Kafka Transactional Messaging來保證本次tx里發(fā)生的所有(1)往下游發(fā)的消息, (2) 記錄input event stream的消費進度,(3)所有的state的所有更新是一個原子操作, 由于結(jié)果都成功寫入kafka topic,所以達到計算結(jié)果的高可用性 (a.要求結(jié)果高可用, d+.要求記錄event處理的進度, 并保證儲存計算結(jié)果和state的更新不出現(xiàn)重復);
計算過程中出現(xiàn)failure(比如機器掛了), 那么當計算重啟,會重新運行initTransactions來注冊tx, 此時tx coordinator會分配一個新的epoch id給此producer, 并且從此以后拒絕老的epoch id的任何commit信息來防止zombie的計算節(jié)點; tx coordinator同時roll back(如果上一個tx已經(jīng)在prepare_commit狀態(tài), 繼續(xù)完成transaction, 具體看下邊Transactional Messaging這個章節(jié)); 如果rollback,那么input的處理進度, 狀態(tài)的更改和往下游發(fā)送的信息都會rollback, 那么計算可以重新開始,就好像沒有上次fail的失敗一樣; 如果上一個tx已經(jīng)prepare_commit, 那么完成所有信息的commit; 此時當initTransactions返回,當前計算會接著上一個tx完成的進度繼續(xù)計算;(e. state需要在replay event的時候rollback到處理event之前時的狀態(tài))
Idempotent producer
冪等producer主要解決這么一個問題: Kafka的消息producer, 也就是往Kafka發(fā)消息的client 如果不冪等, 那么因為Kafka的接受消息的broker和producer之間在什么是“重復信息”上沒有共識的話,則broker無法分辨兩個前后一模一樣的消息, 到底是producer的本意就是要發(fā)兩次,還是由于producer的重發(fā)(比如:producer在收到broker的"接受成功"的ack之前就掛了,所以不知道之前的消息有沒有成功被broker接收, 因此重啟后重發(fā)了此信息)。此時broker只能選擇接受消息,這就造成了同一個消息的多次接受。
同時我們也要解決zombie producer的問題: 如果我們保證producer高可用, 重啟我們認為fail掉的producer, 那么其實沒死的zombie producer的信息則會造成,重復且亂序的發(fā)布消息。(由于zombie的存在, 會有2個producer同時發(fā)布我們以為只有一個producer會按順序發(fā)布的消息,這樣就無法保證順序: 比如zombie在發(fā)送A, B, C...的時候, 新啟動的producer也開始發(fā)送A, B, C... )
Kafka的解法:
10用一個producer指定的固定不變的transactional.id(非自增id,叫producerName可能更好)來識別可能會在不同機器上重啟的同一個logical producer; 相當于給producer起了一個logical name。
2)注冊transaction.id來開始session, 而在session里此tx發(fā)來的消息都可以通過維護一個sequenceid來dedup。
3)非正常結(jié)束tx的話, 比如機器掛了, producer重啟, 那么就會再次注冊自己的transaction.id, 則標志前一個session失效, 而所有屬于上一個session的信息全部作廢(具體看下一節(jié)Atomic and Isolation), 這樣就可以做到producer的zombie fencing
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])
Atomic and Isolation
1)Producer Zombie fencing: 注冊transaction.id會申請高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何寫操作 (e.g. tx coordinator關(guān)閉tx);. Zombie fencing in https://www.confluent.io/blog/transactions-apache-kafka/
2)多個Tx coordinator跑在kafka broker里, 寫是按照tx.id hash給不同的Tx coordinator, 每一個tx coordintor負責subset的transactionlog的partition。
這樣保證同一個logic produce啟動的tx必定連接同一個tx coordinator。tx coordinator保證所有的狀態(tài)都在的高可用高一致性的寫在tx log里。(且用queue zombie fencing來保護自己的狀態(tài)一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再聯(lián)系,那么可以一直跟broker發(fā)垃圾信息... P39in [26])
3)Producer注冊新的tx之后,在給任意topic的任意partition發(fā)消息之前,先跟tx coordinator注冊這個partition。
4)當寫完畢,producer給tx coordinator發(fā)commit,tx coordinator執(zhí)行2PC,在transaction log里寫prepare_commit, 這樣就一定會commit了,因為producer 通知commit就代表所有的寫已經(jīng)寫成功了, 這一步其實只是把決定記下來。
5)Tx coordinator聯(lián)系所有的注冊過的topic的partition,寫一個commit marker進去。
6)當所有的marker寫完,在transaction log里記錄commit complete。
7)注意:當在第一步tx coordinator在發(fā)現(xiàn)新的重復transaction.id來注冊時,會檢查有沒有相同的transaction.id下未關(guān)閉的tx,有的話發(fā)起rollback,先在transaction log里記下rollback的決定,然后聯(lián)系所有的注冊過的topic的partition, 寫入一個ABORT marker。
而如果此tx的狀態(tài)已經(jīng)時prepare_commit了,那么有可能tx coordinator在下邊第6步聯(lián)系所有partition來commit中間掛掉了,那么要接著完成這個commit過程;即roll forward而不是roll back。
8)Read_commit等級的consumer需要等待transaction有結(jié)果,consumer library讀到任何與Transactional Messaging相關(guān)的信息,就開始進入cache階段,并不會運行任何consumer端的計算,只有當讀到commit mark,則把cache住的record依次交給consumer端的計算,而當讀到ABORT mark,則把相關(guān)tx的record全部filter掉。注意: pending的tx會block所有Read_commit等級的consumer對topic的讀。
在保證兩兩節(jié)點之間的EOMP來實現(xiàn)整個流的EOMP的模型里,如果我們某一個或多個節(jié)點的狀態(tài)和計算結(jié)果根本不記錄在高可用DataStore里,我們還是可以實現(xiàn)EOMP, 我們只需要(1)replay這個節(jié)點的上游來重算這個節(jié)點的狀態(tài)和發(fā)給下游的計算結(jié)果, (2)下游去重。
如果上游也沒計算結(jié)果記, 那么replay上游的上游即可, 如果上游的上游也沒記....一直追溯到記錄了計算結(jié)果的上游節(jié)點即可。
如果一直都沒有failure,那么比起Dataflow和Kafka Stream那種記錄所有計算結(jié)果的模型 我們少記錄一些額外的計算結(jié)果和狀態(tài)就減少了很多系統(tǒng)負載; 這就是重算與記錄計算結(jié)果模型的結(jié)合。
重算與記錄計算結(jié)果的結(jié)合
考慮 A,B,C, D 4個節(jié)點, A的計算結(jié)果傳給B, 而B則把一部分計算結(jié)果給C一部分給D, 如果B沒有記錄自己的output, 則Cfail掉之后需要replay上游的input,這就需要B的一些重算來重新制造C所需要的input, 即使B的input(即A)記錄了所有的計算結(jié)果, 我們還需要"恰巧可以產(chǎn)生這些歷史計算結(jié)果的"B的歷史狀態(tài),才能重算出C所需要的input。(所以B必須保存歷史狀態(tài)或者用某種方法重建自己的歷史狀態(tài)才能保證可以重算C所需要的input)
如果C的狀態(tài)也丟失了, 那么對上游的負擔則更重些, B需要重新計算來提供所有的歷史計算結(jié)果(即C的所有歷史input)來讓C重建自己的歷史狀態(tài)。
可以看到, 任意一個節(jié)點的某狀態(tài)S(n+1)是:
上一個歷史狀態(tài)S(n), 和;
從歷史狀態(tài)S0建立開始所接收到的信息M(n)。
同時作為輸入而得到的輸出; 而這個過程中又會向下游發(fā)出一些計算結(jié)果O(n+1)。
所以M(n) + S(n) => S(n+1) + O(n+1), 當下游crash重啟需要O(n+1)時, 我們則有2種選擇:
1、記錄O(n+1);
2、記錄O(n+1)但是記住, O(n+1)是根據(jù)什么數(shù)據(jù)生成的。
1是記錄計算結(jié)果, 2是重算。兩者并用的好處在于, 1可以異步batch進行而不需要節(jié)點必須等待O(n+1)記錄成功才往下游發(fā)送O(n+1)。而2保證了當1還沒有完成時, 系統(tǒng)也有足夠的信息可以重建O(n+1)。
這是一個鏈式反應(yīng), 當重算需要M(n)和S(n)時, 而如果M(n)并沒有存則需要上游重算M(n), 上游還沒存這些重算M(n)的信息則需要replay上游的上游來重算這些信息,這就是所謂的鏈式反應(yīng)...。最極端的情況是什么都沒存,那么需要從頭開始跑我們的stream程序。
可以看到, 如果沒有存中間計算結(jié)果或者狀態(tài), 那么當這個數(shù)據(jù)被下游重算需要的時候, 需要我們重算這個數(shù)據(jù), 這就會產(chǎn)生對上游的計算結(jié)果或者狀態(tài)的需求, 這就要求我們?nèi)绻淮嫦逻@些數(shù)據(jù), 我們就需要記住計算這個數(shù)據(jù)的數(shù)據(jù)依賴圖, 所以要么把"中間"數(shù)據(jù)和狀態(tài)存起來待用, 要么記住他們的數(shù)據(jù)依賴圖。
而這些記錄的中間結(jié)果只有當對其的所有依賴從計算圖中消失時, 我們才可以垃圾回收/刪除這些數(shù)據(jù)(比如所有基于某狀態(tài)的計算結(jié)果都已經(jīng)存下來了, 那么這個狀態(tài)的數(shù)據(jù)就可以刪除, 再比如某計算結(jié)果所引發(fā)的下游計算結(jié)果和狀態(tài)都已經(jīng)存下來了, 那么此計算結(jié)果的數(shù)據(jù)就可以刪除了),從而不會造成儲存數(shù)據(jù)爆炸。
這, 也就是Spark Streaming的解法。
Spark
Spark有三種Stream...
(1)快要被deprecate掉的DStreaming [10, 14]
(2)新一代為了彌補和Flink之間差距的, 支持event time的Structural Streaming(可惜還是有很多不足, 具體的不同和哪里有不足, 要留到對比各個系統(tǒng)對event time和windows操作的支持的對比, 也就是下篇來詳細描述了) [12,13]
(3)實驗中的Continuous Streaming(Spark Continuous Processing) [11, 20]
(3)還在實驗狀態(tài), 基本上是把底層都改掉來使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似還有很多問題需要解決所以目前不支持EOMP, 這里不多聊了。
根據(jù)Structural Streaming的論文[12], (2)和(1)使用了相似的方法來保證EOMP, 但是其實作者發(fā)現(xiàn)(2)比起(1)還是有一些性能上的改進[21],但是總體原則還是和(1)類似的利用一個重算關(guān)系圖lineage來維護各個狀態(tài)計算結(jié)果的依賴關(guān)系, 通過異步的checkpoint來截斷l(xiāng)ineage也就是各個節(jié)點狀態(tài)和計算結(jié)果復雜的關(guān)系(比如一個數(shù)據(jù)如果已經(jīng)checkpoint了, 那么它所依賴的所有狀態(tài)和計算結(jié)果都可以在關(guān)系圖里刪去, 因為replay如果依賴于這個數(shù)據(jù), 那么使用它的checkpoint即可, 而不需要知道這個數(shù)據(jù)是怎么算出來的, 如果還沒checkpoint成功, 則需要根據(jù)數(shù)據(jù)依賴圖來重算這個數(shù)據(jù)), 像這樣利用checkpoint, 就可以防止lineage無限增長。
但是維護關(guān)系圖需要利用micro-batch來平衡"關(guān)系維護"造成的cost, 否則每一條信息的process都產(chǎn)生一個新狀態(tài)和新計算結(jié)果的話, 關(guān)系圖會爆炸式增長(用micro-batch, 可能1000條信息會積累起來當作"一個信息"發(fā)給下游, 只需要在關(guān)系圖里記錄一個batch-id即可, 而不是1000個msg id, 對與狀態(tài)來說也是這樣,處理1000個msg之前的狀態(tài)分配一個id, 處理這1000個信息之后的狀態(tài)一個id, 而不需要記錄1000個狀態(tài)id, 同時他們之間的聯(lián)系線也從1000條降低為1條。這樣就大大減小了關(guān)系圖維護的負擔。
但這樣造成的結(jié)果是micro-batch會造成很高的端到端處理的latency, 因為micro-batch里的第一條信息要等待micro-batch里的最后一條信息來了之后才能一起傳給下游。
而這個等待是疊加的,當stream的層數(shù)越深,每一層的micro-batch的第一條信息都需要等待最后一條信息被處理完畢,相比在每一層都毫不等待,micro-batch造成的額外latency就會疊加式的增高。
注意, Spark Structured Stream提供了一種continuous mode[11,12,13,20]來替代micro-batch,解決了latency的問題,但是目前支持的operator很少,且不能做到exact once msg processing, 這里不多加討論了(不過將來有望做成和flink一樣的模式, 畢竟也用的Chandy-Lamport Distributed Snapshot algorithm) : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch會造成嚴重的latency問題, 而Dataflow和Kafka Stream的方案要求記錄每一個計算結(jié)果, 則會在大大增加系統(tǒng)負擔的同時也會有不小的latency附加。那么有沒有一種方法可以不記錄所有中間計算結(jié)果, 并且也不使用micro-batch呢?
我們來看看flink的藝術(shù);
Flink
如果我們不儲存流系統(tǒng)中間節(jié)點的計算結(jié)果在高可用DataStore里, 也不想維護復雜的數(shù)據(jù)依賴圖(需要micro-batch的根源), 那么當一個節(jié)點fail掉需要replay上游的input的時候,上游就必定需要replay自己的上游,且自己的狀態(tài)要rollback到?jīng)]有接收這些要replay的消息之前的狀態(tài);對上游的上游就有相同的要求,那么最終所有節(jié)點的上游最終會歸向數(shù)據(jù)源節(jié)點,并要求"重新replay"。總而言之2個要求:
數(shù)據(jù)源節(jié)點可以replay, 并產(chǎn)生層層的蝴蝶效應(yīng)的"每個節(jié)點對上游要求的replay";
所有的計算節(jié)點的狀態(tài),要恢復到?jīng)]有接收到"上游所replay的消息"之前的樣子(所以replay后可以回到現(xiàn)在的狀態(tài),且重新生成下游所需要的input, 即當前節(jié)點在處理這些replay消息時產(chǎn)生的計算結(jié)果集)。
全局一致點和全局一致狀態(tài)集
為了方便討論,我們定義2里所提到的global的狀態(tài)為一個全局穩(wěn)定點; 顯然,如果我們一條消息一條消息的處理,數(shù)據(jù)源節(jié)點等待直到所有流節(jié)點處理完這條消息所產(chǎn)生的蝴蝶效應(yīng)信息之后,才發(fā)出下一個消息B0,那么在消息B要發(fā)出但是沒發(fā)出之前,所有的節(jié)點的狀態(tài)就滿足我們對全局穩(wěn)定點的需要。
比如當我們持續(xù)處理B1,B2,B3...B100, 這時一個節(jié)點fail掉了,那么我們只要流系統(tǒng)的所有節(jié)點rollback他們的狀態(tài)到發(fā)出B0前的"全局穩(wěn)定點", 整個系統(tǒng)的計算和狀態(tài)就會干凈的回道任何節(jié)點都不曾被任何B0-B100所影響的狀態(tài), 那么此時從數(shù)據(jù)源節(jié)點replay B0, B1, B2... B100 成功, 這些消息就"exactly once process"掉了。所以,我們找到了第一個不需要micro-batch, 也不需要記錄中間節(jié)點計算結(jié)果,就能實現(xiàn)EOMP的方法:
每n條信息, 或者每一段時間, 數(shù)據(jù)源節(jié)點(或者流系統(tǒng)的第一個入口節(jié)點)停止向下游發(fā)送任何信息, 直到所有節(jié)點報告說有關(guān)這條信息的所有派生信息(由于這條信息引起的第一個計算節(jié)點的計算結(jié)果會發(fā)送給它的下游, 下游的計算結(jié)果又會發(fā)送給它的下游...等等這些都是派生信息)都已經(jīng)處理完畢, 此時把所有節(jié)點的狀態(tài)checkpointing在高可用DataStore里, 建立一個全局穩(wěn)定狀態(tài)集(由流系統(tǒng)中每個計算節(jié)點各自的全局穩(wěn)定狀態(tài)所組成), 數(shù)據(jù)源才開始繼續(xù)發(fā)送信息...這樣, 任意的節(jié)點fail掉, 我們只要在別的機器上重啟這個計算節(jié)點并download之前checkpoint的狀態(tài),流系統(tǒng)的所有節(jié)點也rollback到上一個全局穩(wěn)定狀態(tài)即可, 由于數(shù)據(jù)源發(fā)送數(shù)據(jù)的進度也屬于全局穩(wěn)定狀態(tài)集中的一員, 所以當數(shù)據(jù)源rollback自己的狀態(tài),則可以開始replay 全局穩(wěn)定點checkpoint之后才發(fā)送的信息,而此時所有節(jié)點都已經(jīng)rollback到一個"從沒見過這些信息和它們的派生信息"的狀態(tài)了,整個系統(tǒng)就好像從來沒有見過這些信息一樣, 從而實現(xiàn)即使failure發(fā)生,我們的系統(tǒng)也可以實現(xiàn)EOMP。
更進一步, 我們來看如何不停住數(shù)據(jù)源的信息接收,我們所需要處理的問題。
1)任意時間點的全局狀態(tài),都不是全局穩(wěn)定點: 如果所有節(jié)點都不等待后續(xù)節(jié)點有沒有處理完信息, 那么任意時間點, 在流的中間節(jié)點建立全局穩(wěn)定狀態(tài)的時候 ,流上游的節(jié)點已經(jīng)開始處理新的信息, 它們的全局穩(wěn)定狀態(tài)早已被新的信息所影響了, 而下游可能還沒收到建立全局穩(wěn)定狀態(tài)所需要的信息。
2)隨意指定的全局穩(wěn)定狀態(tài)集可能根本不存在, 比如數(shù)據(jù)源連續(xù)給A和B發(fā)出x,y兩條信息, 而A和B則需要把計算結(jié)果都發(fā)送給C,如果我們想定義全局穩(wěn)定狀態(tài)為所有節(jié)點"處理完x相關(guān)的消息之后", 但是"處理完y相關(guān)的信息之前"的狀態(tài)。
那么考慮這樣一個運行順序: A處理完x向C發(fā)出x-A, B處理完x, y后向C發(fā)出x-B, y-B, 然而由于網(wǎng)絡(luò)和處理速度的因素, C在還沒有收到x-A的情況下就處理完了x-B, y-B, 所以C的一個"干凈"的從未被y信息影響的狀態(tài),但包含了所有需要的x信息的穩(wěn)定狀態(tài), 在C的狀態(tài)變遷過程中是從來不存在的 (即, 處理完x-A和x-B,但是沒有處理y-B時的狀態(tài))。
問題1意味著我們不能用物理時間來建立全局一致狀態(tài)集, 那么既然流的不同節(jié)點接收到數(shù)據(jù)源任意消息x的派生消息的時間不同, 那么只要我們能讓所有節(jié)點分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有節(jié)點就可以在處理完x的派生消息之后把本地狀態(tài)復制一份儲存在高可用DataStore里, 作為全局一致狀態(tài)集的一員。
問題2意味著即使允許計算節(jié)點連續(xù)處理input而不必等待所有下游建立好全局一致狀態(tài)才發(fā)下一個計算結(jié)果, 計算節(jié)點也不能盲目的不加考慮的處理上游信息, 我們要使得計算節(jié)點的狀態(tài)變遷過程中, 至少全局一致狀態(tài)是可以出現(xiàn)的。
Flink的解法就是由一個高可用的coordinator連續(xù)發(fā)出不同的stage barrier(比如先給所有src發(fā)1,然后1分鐘后發(fā)2,2分鐘后發(fā)3..... 如此增長), 夾雜在發(fā)給數(shù)據(jù)源發(fā)出的數(shù)據(jù)流里, 所有的節(jié)點都必須忠實的轉(zhuǎn)發(fā)這個stage barrier, 這樣所有的節(jié)點的:
input都分為了接收到某barrier(設(shè)為barrier-a)之前的信息和收到barrier-a之后的信息,;
所有的發(fā)給下游的計算結(jié)果也分為自己發(fā)出barrier-a之前的信息和發(fā)出barrier-a之后的信息;
所有的狀態(tài)變遷也分為,用所有接收到barrier-a之前的信息, 所建立的狀態(tài), 和收到barrier-a之后被新的信息影響了的狀態(tài)。
那么如果所有節(jié)點都遵循2個原則:
只用"接收到barrier-a之前的所有信息", 來建立自己的本地狀態(tài),并備份在高可用DataStore里;
只使用"接收到barrier-a之前的所有信息"來計算結(jié)果并發(fā)送給下游之后, 才轉(zhuǎn)發(fā)barrier-a; 然后才開始處理"接收到barrier-a之后的信息"; 這樣就保證了自己在往下游發(fā)送barrier-a之前所發(fā)的所有計算結(jié)果, 都沒有被自己所收到的barrier-a之后的新消息所影響(自己發(fā)送的barrier-a之前的計算結(jié)果只和自己接收的barrier-a前的input集合相關(guān))。
而當所有的節(jié)點都保證"自己發(fā)送的barrier-a之前的計算結(jié)果只和自己接收的barrier-a前的input集合相關(guān)", barrier-a就成了系統(tǒng)系統(tǒng)的分隔點,而所有節(jié)點遵循原則-1所建立的本地狀態(tài)備份, 也絕對沒有被數(shù)據(jù)源發(fā)出的在barrier-a之后的信息和它們的派生信息所影響; 而這些所有本地狀態(tài)備份的全集,則組成了全局一致狀態(tài)集。
一個細節(jié), 當一個節(jié)點只有一個input channel的時候, 只要按順序處理input信息即可; 而當一個節(jié)點有多于一個input channel的時候, 一個input channel的barrier-a已經(jīng)接收到, 但是其他channel的barrier-a還沒有收到怎么辦呢?
從收到barrier-a的channel接收新的信息并處理可行么? 顯然不行, 這樣違反了原則-1, 因為"barrier-a之前的信息全集"還沒有湊齊(其他channel的barrier-a還沒有收到), 此時如果處理了任何屬于barrier-a后的"新"信息, 我們就再也無法在狀態(tài)變遷中得到一個"干凈"不受barrier-a后的"新"信息所影響的狀態(tài)了, 這意味著我們必須block 這個已經(jīng)收到barrier-a的channel;
我們可以向下游轉(zhuǎn)發(fā)barrier-a么? 顯然也不行, 這樣違反了原則-2, 理由相同, 我們還沒有收到"barrier-a之前的信息全集", 而從其他channel收到barrier-a之前還收到其他信息的話, 它們所產(chǎn)生的計算結(jié)果也必須在轉(zhuǎn)發(fā)barrier-a之前發(fā)送。
由1,2就很清楚可以推理出flink的算法了:
收到任意input channel 的barrier-a之后, block此channel;
收到所有input channel的barrier-a之后, 把當前狀態(tài)checkpoint并備份到高可用的DataStore里; (這里可以做到異步checkpoint并不會影響latency, 詳細介紹看后邊的異步checkpointing這一節(jié));
收到所有input channel的barrier-a之后, 并且處理完所有此前收到的信息并向下游發(fā)送計算結(jié)果完畢后, 向所有和自己相連的下游轉(zhuǎn)發(fā)barrier-a;
當所有節(jié)點都備份完成,我們就得到了一個全局一致狀態(tài)集, 或者說全局一致狀態(tài)快照; 系統(tǒng)的穩(wěn)定點就進步到了barrier-a, 如果下一個barrier是barrier-b, 那么在得到barrier-b的全局一致狀態(tài)集之前, 如果系統(tǒng)出現(xiàn)failure, 我們就可以通過重啟所有計算節(jié)點的方式, 讓所有節(jié)點reload barrier-a所記錄的狀態(tài)集, 從而實現(xiàn)把所有節(jié)點的狀態(tài)rollback到"上一個全局一致"的狀態(tài), 使得流系統(tǒng)可以重置到好像根本沒有看到過任何barrier-a到barrier-b之間的信息的一樣, 然后重跑這段信息;
通過干凈的rollback了可能造成的重復處理的痕跡, 使得所有信息的效果都只發(fā)生了一次, 所以我們得到了一個端到端的EOMP系統(tǒng)。
異步checkpoint可以使得, checkpoint本身不會block流本身的計算,增量checkpoint避免了,每次一點小變動都需要checkpoint全部的state,可以節(jié)省計算機資源(比如網(wǎng)絡(luò)壓力)
flink和spark這種需要checkpoint的系統(tǒng)都可以做到異步增量checkpoint, 且這個技術(shù)也很成熟了, 本文只選flink的方法[35]來簡單說明一下 , Spark的可以看[21]
1、Flink的異步增量checkpointing
Flink使用RocksDB 作為本地狀態(tài)儲存, RocksDB本質(zhì)上就是一個LSM tree, 對狀態(tài)的寫會寫在內(nèi)存的memtable, 一般是一個linked hashmap, 寫到一定大小就存到硬盤里變成sstable(sorted-string-table), 不再更改。
此后會開一個新的memtable來接受新的寫。這樣會按歷史時間來生成很多小文件, 讀的時候先讀memtable,如果里邊有想要的key對應(yīng)的value,必定是最新的,否則按歷史時間順來查sstable(sstable有自己的cache, 所以未必需要讀硬盤)。
對于flink來說, 當需要checkpoint的時候, 只需要把當時的memtable寫在硬盤里即可, 這是唯一一個需要block住當前計算的操作, 此后也只需要把從上個checkpoint開始, 新生成的sstable異步發(fā)送到高可用的遠程文件系統(tǒng)即可(比如S3, HDFS)。這樣就做到了異步(發(fā)送到高可用datastore是異步執(zhí)行的),和增量(只發(fā)送新增文件)。
注意, 由于太多的小文件的sstable會造成讀的性能問題, 所以RocksDB需要異步的compact這些小文件到一個大文件, 對此flink也需要做出一些應(yīng)對, 詳見[35], 例子給的非常清楚,這里不再贅述。
系統(tǒng)內(nèi)與系統(tǒng)外以上的討論都是關(guān)于中間件內(nèi)部如何實現(xiàn)EOMP, 但是由于end to end argument的影響, 中間件提供的保證再多, 沒有source的支持, 它也無法區(qū)分source(流系統(tǒng)的event來源)發(fā)來的2個內(nèi)容一樣的event, 到底是"同一個"信息的重發(fā), 還是"本意"就是想要中間件處理兩次的兩個"不同"event; 對sink(流系統(tǒng)計算結(jié)果的去處)來說,由于failure造成的重算,zombie的存在, 則需要sink能夠"融入"到流系統(tǒng)的EOMP體系中去。
對于source的要求基本就是重發(fā)和對消息提供能區(qū)分到底是不是一個event的eventId,一般就是Kafka那樣就OK, 比較簡單就不多討論了; 這里著重聊一下sink; Sink主要有兩種手段來配合流系統(tǒng)中間件的EOMP, 冪等和2階段提交(2PC)
1、冪等Sink
最簡單的來配合流系統(tǒng)EOMP的策略就是冪等, 由于是外部系統(tǒng), 所以重用我們的"兩節(jié)點EOMP模型"基本不可能, 因為基本不可能用一個tx來把要寫外部系統(tǒng)的操作和記錄已經(jīng)處理過這個操作用一個原子tx來commit, 這也是流系統(tǒng)為什么要支持2PC的原因。
由于冪等保證對同一個計算結(jié)果寫多次和寫一次一樣, 所以無論是什么流系統(tǒng), 無論系統(tǒng)是重算型, 還是記錄計算結(jié)果來避免重算型, 冪等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠冪等的sink來實現(xiàn)EOMP。
冪等的問題在于無法應(yīng)對需要重算, 且計算可以是non-deterministic的情況, 詳見: 后邊(Latency, 冪等和non-deterministic)一節(jié)的討論; 這也是Spark Streaming, 使用冪等sink的Flink無法支持non-deterministic計算的本質(zhì)原因。
相比之下, dataflow總是記錄計算結(jié)果來避免重算(即使重算也只會有一次重算的結(jié)果會影響下游), Kafka Stream支持tx可以保證只有一次計算結(jié)果可以被commit到Kafka Stream里, 如果sink也只讀committed上游kafka stream, 則可以保證即使計算是non-deterministic的, 也只會有唯一commit的計算結(jié)果被讀到(其他的計算結(jié)果沒有commit marker而被Kafka data comsume API忽略)從而影響sink的外部系統(tǒng)。
而Flink的2PC sink也做到了重算會直接導致sink的外部系統(tǒng)可以配合flink的global rollback, 所以只會有一次的計算結(jié)果被外部系統(tǒng)接受(commit)。
所以Spark Stream在4個流系統(tǒng)里, 是唯一一個完全無法支持non-deterministic計算的流系統(tǒng)。
2、Flink獨特的2PC Sink
2PC對很多熟悉數(shù)據(jù)庫的人來說應(yīng)該是臭名昭著了, 這是很復雜和很容易造成問題而需要極力避免的東西、但是時代在變化, 2PC在新時代也有了彌補自己問題的很多解法了,這里簡單介紹一下。
2PC協(xié)議由一個coordinator,和很多參與2PC的異構(gòu)系統(tǒng)組成,發(fā)起2PC的時候 coodinator要求所有人pre-commit,這是2PC的第一個P(phase),如果所有tx參與者都可以pre-commit并告知coordinator,則coordinator告訴所有人commit,否則告訴所有人abort,這是2PC的第二個P(phrase)
2PC最大的問題是它是一個blocking協(xié)議,blocking的點在于當coordinator和某一個2PC的參與者A掛了,其他參與者無法作出任何決定,只能等待coordinator或者死掉的那個參與者A上線,因為這時所有其他參與者都無法判斷以下兩種情況到底那種發(fā)生了,從而無法決定到底是commit還是abort。
coodinator已經(jīng)收到了所有人的pre-commit并告知參與者A commit,A commit后就掛了;
A并不能pre-commit,但是coodinator在告訴所有人需要abort之前就掛了。
在情況1. 所有其他參與者都應(yīng)該commit,在情況2,所有其他參與者都應(yīng)該abort;由于無法辨別到底是情況1. 還是2. 所有其他參與者必須block等待,這對很多數(shù)據(jù)庫來說意味著為此tx加的鎖都不能放掉,從而影響數(shù)據(jù)庫的其他不參與2PC的操作,甚至鎖死整個數(shù)據(jù)庫。而如果coordinator或者參與者A無法再上線或者狀態(tài)丟失,則需要非常復雜的人工操作來解決其他參與者應(yīng)該如何決策的問題。
雖然2PC有各種問題, 但是在consensus協(xié)議早已經(jīng)成功分布式系統(tǒng)的基石, 各種開源和標準實現(xiàn)可以被輕松獲得的今天, 用consensus協(xié)議來彌補2PC的問題已經(jīng)成為一個"已經(jīng)解決的問題", 如[25]4.2 The Paxos Commit Algorithm 中所說:
We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…
解決2PC問題的關(guān)鍵在于保持coordinator狀態(tài)的高可用性, 那么只要coordinator保證把commit或者abort的決定記錄在一個consensus cluster里即可,比如etcd或者zookeeper,這樣coordintor死了,重啟從consensus cluster里恢復狀態(tài)重新告知所有參與者到底應(yīng)該commit還是abort即可; 這也是為什么各種流行的分布式系統(tǒng)實現(xiàn)分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...
3、Flink的2PC Sink
2PC的第一個P的關(guān)鍵在于所有tx參與者在不知道其他參與者狀態(tài)的情況下,承諾未來一定可以前進commit成功或者干凈的回退abort。當前的tx參與者準備好了,且同意commit,2PC的第二個P的關(guān)鍵點在于整體系統(tǒng)的”唯一決定”統(tǒng)一的推進或者回退各個參與者的狀態(tài)。
而Flink的global state其實可以看做一個2PC,當一個節(jié)點收到所有的上游的barrier-n時,這個“契機”可以看做收到了coordinator的可不可以precommit的問詢,而當localstate已經(jīng)在remote 存好之后,當前節(jié)點就可以告訴coordinator它準備好了,這可以看做回復precommit(如果此節(jié)點在發(fā)給precommit)。
而當所有的節(jié)點都通知coordinator“準備好了”之后,coordinator就可以記錄下barrier-n的global state完整checkpoint的這個事實,這相當于一個不需要發(fā)給tx參與者的commit。
這是由于當failover的時候,是由coordinator告訴所有節(jié)點應(yīng)該從哪個checkpoint點來恢復本地狀態(tài),所以各個節(jié)點的localstate到底是commit了還是rollback了,可以完全由“有沒有記錄下barrier-n的global state完整checkpoint成功”這個metadata推算出來,所以也就不需要單獨給各個節(jié)點發(fā)commit/abort信息來讓各個節(jié)點commit或者abort了。
當系統(tǒng)狀態(tài)只涉及到flink的內(nèi)部狀態(tài)時(flink提供的stateApi所提供的statestore), 如果一個某節(jié)點X在回復precommit之后掛了,coordinator還是可以選擇commit,因為組成global state的節(jié)點X的local state已經(jīng)完整的存儲在remote的datastore里了。
但是如果涉及到外部狀態(tài),比如sink需要把計算結(jié)果存儲到一個非flink控制的數(shù)據(jù)庫中去時,flink的sink節(jié)點就相當于這個外部數(shù)據(jù)庫的client,需要連接外部數(shù)據(jù)庫并把數(shù)據(jù)存入外部數(shù)據(jù)庫;要使得外部數(shù)據(jù)庫的狀態(tài)和flink的狀態(tài)保持一致,則需要sink把外部數(shù)據(jù)庫的狀態(tài)引入到flink global state的2PC里,而coordinator在決定commit或者abort的時候,必須通知sink來執(zhí)行外部狀態(tài)的commit或者abort,因為coordinator是不知道外部狀態(tài)到底是什么,也無法簡單的用通知sink從不同的globalstate點恢復來代替2PC的commit/abort通知。
同時sink收到barrier-n時,sink要保證外部數(shù)據(jù)庫里與barrier-(n-1)到barrier-n之間信息相關(guān)的數(shù)據(jù)更改,處于一種“在任何情況下都一定可以commit成功,但是還沒有真的commit,所以外部數(shù)據(jù)庫的消費者不可見這些狀態(tài),且可以rollback的,可進可退的狀態(tài)”。[40]給出了如何用文件實現(xiàn)的一個例子;我這里給出一個如何使用支持transaction的數(shù)據(jù)庫的例子。
首先為了避免產(chǎn)生歧義, 我們定義:
1)flink-precommit ack為 barrier-n流到各個節(jié)點(包括sink), 各個節(jié)點完成local snapshot checkpoint后發(fā)給coordinator的ack, sink則是完成“某個操作”后發(fā)給coordinator的ack, 這個操作需要把外部系統(tǒng)(比如數(shù)據(jù)庫)置于一種, 保證任何情況下都可以服從coordinator的最終決定的狀態(tài), 一個既可以commit(如果coordinator最終決定commit), 又可以rollback(如果coordinator決定abort)的狀態(tài), 且數(shù)據(jù)不為外部系統(tǒng)的consumer所見。
2)定義flink-commit為coordinator收到所有人的pre-commit ack后的的最終commit決定。
3)定義db-commit就是普通的外部數(shù)據(jù)庫的commit。
①當程序開始,sink立刻開一個外部數(shù)據(jù)庫的transaction,當sink收到上游的所有的barrier-1,則立刻db-commit當前transaction然后回復coordinator flink-precommit成功(flink-precommit ack),因為此時如果不db-commit,一旦回復coordinator flink-precommit之后,這個sink掛了,那么外部數(shù)據(jù)庫一般就會自動rollback;此時就算sink在其他機器上重啟,我們也丟失了所有要最終flink-commit的數(shù)據(jù); 而如果這個sink的crash是發(fā)生在coordinator收到所有節(jié)點的flink-precommit ack并最終決定flink-commit之后, 所有其他節(jié)點(比如另外一個sink)的狀態(tài)可能都commit了(所以無法簡單rollback); 而只有此sink的所有數(shù)據(jù)都無法恢復, 這就破壞了global consistency。
②但是上邊我們在flink-precommit階段就db-commit了外部數(shù)據(jù)庫的transaction; 這時會有兩個問題:?
第一, 我們暴露了只是應(yīng)該precommit的數(shù)據(jù)(這些數(shù)據(jù)不應(yīng)被數(shù)據(jù)庫的外部consumer所見);
第二, 如果有一個其他節(jié)點不同意commit而發(fā)給coordinator abort的決定, 那么coordinator則會決定abort, 所以我們的sink則需要服從rollback的決定, 但是我們已經(jīng)db-commit了的數(shù)據(jù), 而一般數(shù)據(jù)庫都不支持rollback已經(jīng)commit的數(shù)據(jù), 這就造成了問題。
為了解決這兩個問題, 這時我們需要設(shè)計一個和外部數(shù)據(jù)庫的數(shù)據(jù)消費者的數(shù)據(jù)“屏蔽協(xié)議”。比如利用一個字段來表示當前數(shù)據(jù)只是“precommit”,所有的外部數(shù)據(jù)庫的讀寫者都應(yīng)該忽略這些數(shù)據(jù)(而只有當這個字段是committed才能讀寫)。
這樣當flink的coordinator通知flink-commit時,我們用另外一個外部數(shù)據(jù)庫的tx來把所有涉及到的precommit的數(shù)據(jù)的這個字段改為committed即可, 這就解決了第一個問題。對于第二個問題來說, 如果最終flink coordinator決定abort, 我們把此字段設(shè)為abort并利用一個異步垃圾回收的程序把所有標記為abort的數(shù)據(jù)清理掉即可。
③這樣設(shè)計的關(guān)鍵是, 即使sink precommit ack之后掛了, 我們要flink-commit的數(shù)據(jù)也不會丟。所以其實flink-precommit ack時, sink把數(shù)據(jù)寫在任何其他可以保證數(shù)據(jù)高可用的地方都行(只要sink fail掉重啟之后還能找到它), 未必需要是同一個數(shù)據(jù)庫的同一個表。如果采取這種策略, 那么在flink-commit時則需要重新把要db-commit的數(shù)據(jù)從存的地方讀出來, 然后重新寫入到真正要寫的數(shù)據(jù)庫并db-commit。
④flink提供了一個TwoPhaseCommitSinkFunction,[40]里有詳細描述如何簡單的extends這個interface來實現(xiàn)一個可以和flink的global consistency配合的sink節(jié)點的邏輯,本文不再贅述。
需要注意的一點是,當sink收到coordinator的flink-commit指令之后,運行sink的db-commit邏輯,在外部數(shù)據(jù)庫的db-commit更改完畢(比如把要commit的數(shù)據(jù)的status的值從precommit改為committed)后,但是flink記住sink已經(jīng)完成commit之前(flink在跑完sink的commit函數(shù)后會記住這個sinki已經(jīng)commit了, 所以不再重復call sink的commit, 否則flink就會一直重試commit), 此時,一旦sink掛了,那么在另外的機器重啟的sink,flink無法得知外部數(shù)據(jù)庫已經(jīng)commit成功了,所以flink會再次重試commit函數(shù)來嘗試commit。從而造成重復commit,這也是[40]中提到的commit必須設(shè)計為冪等操作的原因。
注意1: 可以使用2PC作為sink的關(guān)鍵是, 你的sink可以保證在ack pre-commit之后, 保證無論任何情況都可以成功commit; 這不是說你的sink所連接的外部系統(tǒng)支持tx就可以的, 需要application設(shè)計者根據(jù)情況具體設(shè)計。[1]的P213頁, 就描述了sink是用kafka transaction記錄計算結(jié)果到kafka,但是即使用了transaction也可能丟數(shù)據(jù)的一種edge case。而[41] Kafka 0.11 and newer=>Caveats 里也有提到。
丟失數(shù)據(jù)的原因就在于, kafka sink的默認實現(xiàn):FlinkKafkaProducer011, 在precommit的時候沒有真的commit數(shù)據(jù), 因此當kafka sink fail掉沒有及時重啟, 一旦kafka tx超時, 所有tx里的數(shù)據(jù)都會丟失, 而此時如果coordinator已經(jīng)決定commit就絕不會再重發(fā)數(shù)據(jù)(source也已經(jīng)commit發(fā)出的消息的index),從而kafka sink的此次tx的所有數(shù)據(jù)永久丟失。
這里提供的DB版本的sink實現(xiàn)思路, 在precommit階段就commit數(shù)據(jù), 來保證“無論如何數(shù)據(jù)都不會丟”, 但是用app level的flag屏蔽外部可見; 這樣做的原因就是為了克服類似kafka sink的這種缺陷。
注意2: 使用2PC Sink的Flink應(yīng)該是可以應(yīng)對non-deterministic計算的, 因為一旦failure發(fā)生, 所有之前的狀態(tài)和對sink的寫入都會被rollback; 但是這樣的話, Flink在sink端就變成了micro-batch模型, batch大小取決于發(fā)barrier的頻率; 但是即使這樣, 由于只有sink需要聚集一個batch才能做一次2PC, 但是中間節(jié)點往下游發(fā)送計算結(jié)果還是即算即發(fā)的, 所以比起Spark這種所有中間計算都是micro-batch,micro-batch造成的額外latency會疊加式的增高的模型, 端到端的latency應(yīng)該還是會要小一些。
Latency, 冪等和non-deterministic利用冪等的sink可以做到實時記錄計算結(jié)果, 達到最小的end to end latency。因為sink根本不需要等待barrier, 來一條計算結(jié)果就向外部系統(tǒng)commit一條記錄就好, 而由冪等保證了就算整個系統(tǒng)開始重算, 在sink端也會表現(xiàn)出每個source端的event只產(chǎn)生了一次效果的結(jié)果。
但是冪等是很難克服non-deterministic計算的。因為non-deterministic計算使得同一個source發(fā)出的event引起千變?nèi)f化的"蝴蝶效應(yīng)" (比如第一次計算event生成的Key是A, 第二次重算生成的Key是B, 如果下一個節(jié)點是partitionByKey, 那么這里的2次計算結(jié)果就會發(fā)送給了完全不同的下游節(jié)點, 考慮幾百次不確定計算引起的不同蝴蝶效應(yīng), 等計算結(jié)果到達各個sink節(jié)時, 計算的key和value甚至結(jié)果的個數(shù)和在sink節(jié)點的distribution都完全不同了, 那么sink也就完全無法利用冪等來屏蔽掉同一個event replay所造成的"蝴蝶效應(yīng)"了)
相比之下, 如果整個流系統(tǒng)的計算都是確定性的, 那么無論在source端replay多少次同一個event, 它所產(chǎn)生的"蝴蝶效應(yīng)"在sink端也必定相同, 則application設(shè)計者則可以很容易設(shè)計出冪等操作來屏蔽掉重復的計算結(jié)果。
如果業(yè)務(wù)里無法去除non-determnistic的計算, 那么你只能選擇Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持冪等的Spark和利用冪等sink的Flink無法支持non-determnistic的業(yè)務(wù)計算。
REFERENCEStream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications
Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
Lightweight Asynchronous Snapshots for Distributed Dataflows
Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing
Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
MillWheel: Fault-Tolerant Stream Processing at Internet Scale
Distributed Snapshots: Determining Global States of Distributed Systems (Chandy-Lamport)
State Management in Apache Flink R Consistent Stateful Distributed Stream Processing
Discretized Streams: Fault-Tolerant Streaming Computation at Scale
Continuous Processing in Structured Streaming Design Sketch
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
Structured Streaming Programming Guide 2.4.3
Spark Streaming Programming Guide2.4.3
Watermarks, Tables, Event Time, and the Dataflow Model
Kafka Streams’ Take on Watermarks and Triggers
Streams Architecture Kafka
Enabling Exactly Once in Kafka Streams
Transactions in Apache Kafka
Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3
State Management in Spark Structured Streaming
Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers
Big Data: Principles and best practices of scalable realtime data systems
Making Sense of Stream Processing
Consensus on Transaction Commit
Exactly Once Delivery and Transactional Messaging in Kafka=>docs.google.com/documen
End-to-End Arguments in System Design
Transactional Messaging in Kafka
Akka Split Brain Resolver
Unreliable Failure Detectors for Reliable Distributed Systems
The Weakest Failure Detector for Solving Consensus
Exactly once Semantics are Possible: Here’s How Kafka Does it
24/7 Spark Streaming on YARN in Production
Monitoring Back Pressure (flink)
Managing Large State in Apache Flink: An Intro to Incremental Checkpointing
Impossibility of Distributed Consensus with One Faulty Process (AKA, FLP impossibility)
Kubernetes in Action
Akka:Auto-Downing(DO NOT USE)
ZooKeeper: Distributed Process Coordination
An Overview of End-to-End Exactly-Once Processing in Apache Flink
Kafka producers and fault tolerance
想知道更多?掃描下面的二維碼關(guān)注我
加技術(shù)群入口(備注:Tech):
免費星球入口:
免費資料入口:后臺回復“666”
朕已閱?
總結(jié)
以上是生活随笔為你收集整理的压箱底总结:流系统端到端一致性对比的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 下次遇到嚣张的候选人就先这么问:系统变慢
- 下一篇: 忘掉Java并发,先听完这个故事...