QMQ顺序消息设计与实现
背景
在MQ里,順序消息的意思是消費消息的順序和消息發送時(單機發送)的順序保持一致。比如ProducerA按照順序發送msga, msgb, msgc三條消息,那么consumer消費的時候也應該按照msga, msgb, msgc來消費。
對于順序消息,在我們實際使用中發現,大部分業務系統并不需要或者并不依賴MQ提供的順序機制,這些業務本身往往就能處理無序的消息,比如很多系統中都有狀態機,是否消費消息必須根據狀態機當前的狀態。
但是在一些場景中順序消息也有其必要性:比如日志收集和依賴binlog同步驅動業務等。就這兩個場景而言,同樣是順序消息但對順序的需求卻不一定一樣:比如日志收集中我們一般認為對順序的要求比較弱,即絕大多數時是有序即可,遇到一些極端情況,比如Server宕機,容量調整的時候我們可以暫時容忍一些無序。但是對于一個依賴MySQL binlog同步來驅動的業務,短暫的無序都將會導致整個業務的錯亂。
分析現有的一些MQ后發現,它們并不能在所有情況下提供可靠的順序支持。現在市面上的MQ基本上都是以partition - based模型來提供順序支持。我們以Kafka為例:topic分為一個或多個partition,partition可以理解為一個順序文件,producer發送消息的時候,按照一定的策略選擇partition,比如partition = hash(key) % partition num來選擇該消息發送給哪個partition,那么具有相同key的消息就會落到相同的partition上,而consumer消費的時候一個consumer獨占地綁定在一個partition上。這樣一來,消息就是順序消費的了:
但是這種模型存在一些問題:
partition的個數就是消費的并行度,那么如果現在consumer處理不過來需要增加consumer則需要對應地增加partition。而根據上面的描述partition的個數一旦改變,則順序將無法保證(partition = hash(key) % partition num 公式里partition num發生了改變,則選擇的partition也會發生變化)。
所以我們一般在業務上線之前,就要做出合理的容量規劃,預先創建出足夠的partition,但有的時候容量規劃是困難的,實踐中往往是預先分配大量的partition,比如幾百甚至幾千,然而大量的partition對性能以及運維都帶來麻煩。
擴容partition后,如果高峰期已過,想進行縮容則基本上不可行(比如Kafka就不允許減少partition),除了縮容帶來順序變化外,還有一點是怎么保證被縮容的partition上的消息已經完全消費完成了呢?
partition的移動問題,partition如果分配在某臺broker上之后再移動就很麻煩,一旦這臺broker容量不足,需要進行負載均衡就很困難了,這可能需要在不同的機器上傳輸大量的數據。
對可用性的挑戰,順序發送的時候某個key的消息必須總是發送給指定的partition的,如果一旦某臺server掛掉,或者正常的停機維護,那么位于這臺server的partition就不能收消息了,但是也不能發送給其他partition,否則順序就會錯亂。
雖然我們可以通過多副本機制(Replication)來確保即使該partition所在機器出現故障時候仍然有其他副本提供服務,但是一般選舉出一個新的副本通常需要花費幾秒到幾分鐘不等(比如早期的Kafka版本Leader遷移是串行執行的,在分區特別多的時候,選舉出新的leader可能需要分鐘級時間),在此期間發送到該partition的所有消息都無法發送。
堆積問題,如果預分配時候的partition過少,這個時候堆積了大量的消息,那么即使擴容也沒有辦法了:
所以我們認為現有的一些所謂順序消息機制并不是簡單可依賴的。你以為MQ給你提供了順序保障,但實際上在一些時候并不是這樣,那么這個時候使用方為了應對這種異常情況就需要做出各種應對措施,增加了使用的復雜度。而我們希望提供一種簡單可依賴的順序消息,也就是使用方可以放心的將順序保證交給MQ。
方案設計
首先我們來分析無法保證順序的根源是什么。我們選擇partition所使用的公式是 partition = hash(key) % partition num。正是因為partition num發生了變化導致公式的結果發生了變化,進而打破了順序保證。
其實對于這個公式我們可能并不陌生,除了在MQ中使用,我們在數據庫分庫分表中往往也有這種套路。
在數據庫分庫分表中我們會通過一個分區鍵計算其分區,然后得到表名或庫名(如下偽代碼所示,user_id是分表鍵,總共分為100張表):
而且在分庫分表中前期因為業務量不大,我們往往不會分很多庫(或者我們也分了多個庫,但是這些庫都落在相同的機器上),但是為了后期添加分庫方便(擴容)我們會預先分出很多表。比如我們前期分成100張表,但是這100張表都在相同的庫里,待到業務增長之后,單庫無法支撐,我們會將100張表劃分到不同的DB里。
比如我們將表0 - 50落在DB1, 50 - 100落到DB2,這樣我們的處理能力就翻倍了,但是因為程序里還是按照100進行分表的,所以對應用沒有感知。
這種機制相當于引入了一個中間層,程序面對的是的分表,最后這個表是落在什么DB上通過中間層進行映射過去就可以了。
那么其實我們是可以借鑒這種思路應用在MQ的擴容縮容中的。為此我們引入了logic partition的概念。也就是Producer發送消息的時候,我們并不決定它發送到哪個具體的Server上的具體的partition里(后文將其稱之為物理partition, physical partition)。我們只是先得到logic partition,使用這個計算公式: logic partition = hash(key) % logic partition num。而logic partition num我們會固定住,永不改變。比如我們將logic partition num固定為1000。但是這里跟分庫分表中的分1000張表不同,logic partition僅僅是邏輯上的,不存在任何存儲實體,所以即使分配的再大也沒有性能上的開銷。計算得到logic partition后,我們根據logic partition的映射再來決定該消息應該落到具體哪個physical partition上。我們會根據logic partition的范圍進行映射,比如logic partition 0 - 500 映射到 physcial partition 1上,500 - 1000 映射到physcial partition 2上。
接下來我們來看看這種措施如何應對本文開頭所提出的一序列問題呢:
擴容 在這里擴容其實就是對physical partition的分裂過程。比如開始時我們創建了兩個分區: physical partition 1, physical partition 2,因為消費不過來,我們要將physcial partiton 1擴容,那么我們將會得到 logic partiton 0 - 250 映射到physical partition 3,logic partition 250 - 500 映射到physical partition 4(注:范圍的分裂不一定是平均的,比如我們也可以按照[0 - 200)和[200 - 500)進行劃分 )。
縮容 縮容其實就是對physical partition的合并過程,我們將physical partiton 3和physical partition 4合并得到physical partition 5。那么現在logic partiton 0 - 500就映射到physical partition 5。
負載均衡 負載均衡其實就是logic partition到physical partiton的重新映射過程。也就是原來0 - 500 映射到 physical partition 5,現在我們將其映射到physical partition 6,而physical partition 6可以分配在一臺空閑的Server上。不僅如此,重新映射也可以解決可用性問題:一臺server停機維護時將落在上面的logic partition進行重新映射,分配到另外一臺Server上即可,這樣我們就可以打造Always writtable ordered message queue。
這里借鑒分庫分表中的預先分表的方法,提出logic partition的抽象層解決物理partition擴容縮容時無法保證順序的問題。但是實際實現時候我們會發現MQ的這種logic partition分法要比數據庫中分表復雜得多。因為MQ是的消費是持續性的,也就是我可以讀取歷史數據。數據庫中分庫分表一旦調整之后,那么它呈現的就是最終視圖,而MQ里昨天我們可能還只有一個physical partition,今天我們劃分為兩個,那么我們消費昨天的數據和今天的數據的時候如何進行無縫的切換呢?
我們先簡單總結一下上面對擴容縮容移動的描述:
-
擴容即對physical partition按照logic partition的范圍進行分裂的過程
-
縮容即按照logic partition的范圍對physical partition進行合并的過程
-
移動即改變logic partition與physical partition的映射的過程
雖然我們從Database的分庫分表思想中學習到了logic partition,但是Message Queue和Database究竟是兩種不同的模型。在DB里,reader是無狀態的,也就是每次讀取傳入的查詢條件都是獨立的。而MQ的reader(consumer)當前的讀取位置(offset)是依賴上次的讀取位置,一旦partition發生改變,則這個offset將無法繼續保持,那消費就會錯亂了,順序也無從談起。另外因為數據量太大,我們在執行擴容縮容移動的時候并不想對數據進行移動。
接下來以實際的例子來進行說明,下面是一個擴容的實例。order.changed這個主題,原來分配了P1, P2兩個分區,現在因為容量不夠,需要對P2進行擴容(分裂)。也就是將physical partition P2進行分裂,分裂成P3, P4兩個分區。分裂的原則是按照logic partition的范圍進行,logic partition [500, 1000)原來映射到P1,現在logic partition [500, 750)映射到P3, [750, 1000)映射到P4。也就是分裂以后producer發送新的消息就會按照新的映射關系將消息append到P1, P3或P4,P2不再接收新的消息了。
接下來具體描述一下實現步驟。在QMQ里有個metaserver的組件,它管理所有元數據信息,比如某topic分配到哪些partition上(我們將其稱之為路由):
metaserver還管理partition分配在哪些server上,以及logic partition與physical partition的映射關系。
在需要對P2進行分裂的時候,metaserver會發送一條消息給P2所在的server,這條消息會被append到P2上,該消息稱之為指令消息(command message),對客戶端不可見,也就是業務代碼不會消費到這條消息。P2收到這條指令消息后將不再接收新的消息了,所有業務消息均被拒絕,那么這條指令消息就是P2上的最后一條消息,相當將P2關閉了。
metaserver發送完指令消息后會變更對應topic的路由信息:
注意看上面的表格的特點,這個路由信息表與眾不同的地方在于它有一個version字段。對于producer而言它總是獲取最新版本的路由信息,也就是路由發生變更后,producer就會獲得更高版本的路由信息,然后向這些分區上發送消息。
但是對于consumer來講,它必須將前面的消息消費完成才能消費后面的,否則順序就亂了。比如前面分裂的示例,P2分裂為P3, P4了,這個時候P3, P4并不是立即對consumer可見的(只要對consumer不可見,就沒有consumer來消費它)。只有當consumer消費到指令消息時,才會觸發consumer的路由變更。并且指令消息里攜帶了路由的版本信息,假設路由已經發生了多次變更,consumer消費到某個指令消息的時候,只會將consumer的路由變更到該指令的下一個版本,而不會跳到其他版本,這里觸發路由變更的時候會使用樂觀鎖去更新版本(偽代碼):
總結起來就是producer總是使用最新版本的路由,而consumer使用指定版本的路由,路由的版本由指令消息進行同步。
其實這個流程中最有趣的不是擴容(分裂)和縮容(合并),而是移動。比如我們現在發現P4分區所在機器負載比較高或磁盤就要滿了,現在給集群加了幾臺機器,怎么做能在繼續保持順序的基礎上又能將負載分散過去呢?那么只需要發送一個移動的指令消息給P4,然后P4就會關閉,然后變更路由,order.changed的路由現在是P1, P3, P5,這次路由變更分區的個數沒有發生改變,改變的只是logic partition和physical partition的映射關系:
因為P5是新分區,所以他可以分配在新機器上了。而且這個特性可以用在提高順序消息的可用性上,比如需要對某臺server停機,那么我們只需要對其上面所有分區發送移動指令即可。
另外,在實現的時候我們還增加了如下約束條件:
-
版本必須是連續遞增的
-
每次只能執行一項變更,比如只能對一個partition分裂,不能對多個partition進行分裂
-
對logic partition范圍的每次操作必須是連續的,比如合并的時候只能將[0, 100) 與[100, 200)合并,而不能將[0, 100)與[200, 300)合并
-
路由變更必須是本次變更分區所有的消費者都確認執行到指令消息才能觸發。比如將多個分區合并的時候,必須是這幾個分區都消費到了指令消息的時候觸發。
總結
上面以示例的方式描述了QMQ如何進行擴容(分裂),那么只需要按照這個步驟進行,consumer在沒有將更早的消息消費完成的情況下就不會拿到更新的路由。
至于如何確保順序的消費這些分區的消息那就跟其他MQ一樣了,只需要將分區分配給指定的consumer實例,只允許指定的實例獨占消費該分區即可。
QMQ是去哪兒網開源的分布式消息中間件,在去哪兒網內部應用十分廣泛,提供了很獨特的存儲模型,延時消息,事務消息等。點擊原文鏈接就會跳到github地址(https://github.com/qunarcorp/qmq),歡迎給我們提交PR, Star。
總結
以上是生活随笔為你收集整理的QMQ顺序消息设计与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互联网项目中MySQL应该选什么事务隔离
- 下一篇: 无责任书评:每个Java程序员都应该深入