快手基于RocketMQ的在线消息系统建设实践
作者:黃理
黃理,10多年軟件開發和架構經驗,熱衷于代碼和性能優化,開發和參與過多個開源項目。曾在淘寶任業務架構師多年,當前在快手負責在線消息系統建設工作。
?
為什么建設在線消息系統
在引入RocketMQ之前,快手已經在大量的使用Kafka了,但并非所有情況下Kafka都是最合適的,比如以下場景:
- 業務希望個別消費失敗以后可以重試,并且不堵塞后續其它消息的消費。
- 業務希望消息可以延遲一段時間再投遞。
- 業務需要發送的時候保證數據庫操作和消息發送是一致的(也就是事務發送)。
- 為了排查問題,有的時候業務需要一定的單個消息查詢能力。
為了應對以上這類場景,我們需要建設一個主要面向在線業務的消息系統,作為Kafka的補充。在考察的一些消息中間件中,RocketMQ和業務需求匹配度比較高,同時部署結構簡單,使用的公司也比較多,于是最后我們就采用了RocketMQ。
部署模式和落地策略
在一個已有的體系內落地一個開軟軟件,通常大概有兩種方式:
?
方式一,在開源軟件的基礎上做深度修改,很容易實現公司內需要的定制功能。但和社區開源版本分道揚鑣,以后如何升級?
方式二,盡量不修改社區版本(或減少不兼容的修改),而是在它的外圍或者上層進一步包裝來實現公司內部需要的定制功能。
?
注:上圖方式一的圖畫的比較極端,實際上很多公司是方式一、方式二結合的。
?
我們選擇了方式二。最早的時候,我們使用的是4.5.2版本,后來社區4.7版本大幅減小了同步復制的延遲,正好我們的部署模式就是同步復制,于是就很輕松的升級了4.7系列,享受了新版本的紅利。
?
在部署集群的時候,還會面臨很多部署策略的選擇:
???????大集群 vs 小集群
???????選擇副本數
???????同步刷盤 vs 異步刷盤
???????同步復制? vs 異步復制
???????SSD vs 機械硬盤
大集群會有更好的性能彈性,而小集群具有更好的隔離型,此外小集群可以不需要跨可用區/IDC部署,所以會有更好的健壯性,我們非??粗胤€定性,因此選擇了小集群。集群同步復制異步刷盤,首選SSD。
客戶端封裝策略
如上所述,我們沒有在Rocketmq里面做深度修改,所以需要提供一個SDK來提供公司內的需要的定制功能,這個SDK大概是這樣的:
??
對外只提供最基本的API,所有訪問必須經過我們提供的接口。簡潔的API就像冰山的一個角,除了對外的簡單接口,下面所有的東西都可以升級更換,而不會破壞兼容性。
?
業務開發起來也很簡單,只要需要提供Topic(全局唯一)和Group就可以生產和消費,不用提供環境、Name Server地址等。SDK內部會根據Topic解析出集群Name Server的地址,然后連接相應的集群。生產環境和測試環境環境會解析出不同的地址,從而實現了隔離。
?
上圖分為3層,第二層是通用的,第三層才對應具體的MQ實現,因此,理論上可以更換為其它消息中間件,而客戶端程序不需要修改。
?
SDK內部集成了熱變更機制,可以在不重啟client的情況下做動態配置,比如下發路由策略(更換集群name server的地址,或者連接到別的集群去),Client的線程數、超時時間等。通過maven強制更新機制,可以保證業務使用的SDK基本上是最新的。
集群負載均衡 & 機房災備
所有的Topic默認都分配到兩個可用區,生產者和消費者會同時連接至少兩個獨立集群(分布在不同的可用區),如下圖:
?
生產者同時連接兩個集群,如果可用區A出現故障,流量就會自動切換到可用區B的集群2去。我們開發了一個小組件來實現自適應的集群負載均衡,它包含以下能力:
???????千萬級OPS
???????靈活的權重調整策略
???????健康檢查支持/事件通知
???????并發度控制(自動降低響應慢的服務器的請求數)
???????資源優先級(類似Envoy,實現本地機房優先,或是被調服務器很多的時候選取一個子集來調用)
???????自動優先級管理
???????增量熱變更
實際上它并不僅僅用于消息生產者,而是一個通用的主調方負載均衡類庫,可以在github上找到:
https://github.com/PhantomThief/simple-failover-java
核心的SimpleFailover接口和PriorityFailover類沒有傳遞第三方依賴,非常容易整合。
多樣的消息功能
延遲消息
延遲消息是非常重要的業務功能,不過RocketMQ內置的延遲消息只能支持幾個固定的延遲級別,所以我們又開發了單獨的Delay Server來調度延遲消息:
上圖這個結構沒有直接將延遲消息發到Delay Server,而是更換Topic以后存入RocketMQ。這樣的好處是可以復用現有的消息發送接口(以及上面的所有擴展能力)。對業務來說,只需要在構造消息的時候額外指定一個延遲時間字段即可,其它用法都不變。
事務消息
RocketMQ 4.3版本以后支持了事務消息,可以保證本地事務和消費發送同時成功或者失敗,對于一些業務場景很有幫助。事務消息的用法和原理有很多資料,這里就不細述了。但關于事務消息的實踐網上資料較少,我們可以給出一些建議。
?
首先,事務消息功能一直在不斷完善,應該使用最新的版本,至少是4.6.1以后的版本,可以避免很多問題。
?
其次,事務消息性能是不如普通消息的,它在內部實際上會生成3個消息(一階段1個,二階段2個),所以性能大約只有普通消息的1/3,如果事務消息量大的話,要做好容量規劃?;夭檎{度線程也只有1個,不要用極限壓力去考驗它。
?
最后有一些參數注意事項。在broker的配置中:
- transientStorePoolEnable這個參數必須保持默認值false,否則會有嚴重的問題。
- endTransactionThreadPoolNums是事務消息二階段處理線程大小,sendMessageThreadPoolNums則指定一階段處理線程池大小。如果二階段的處理速度跟不上一階段,就會造成二階段消息丟失導致大量回查,所以建議endTransactionThreadPoolNums應該大于sendMessageThreadPoolNums,建議至少4倍。
- useReentrantLockWhenPutMessage設置為true(默認值是false),以免線程搶鎖出現嚴重的不公平,導致二階段處理線程長時間搶不到鎖。
- transactionTimeOut默認值6秒太短了,如果事務執行時間超過6秒,就可能導致消息丟失。建議改到1分鐘左右。
?
生產者client也有一個注意事項,如果有多組broker,并且是2副本(有1個Slave),應該打開retryAnotherBrokerWhenNotStoreOK,以免某個Slave出現故障以后,大量消息發送失敗。
?
分布式對賬監控
除了比較一些常規的監控手段以外,我們開發了一個監控程序做分布式對賬。可以發現我們的集群以及我們提供的SDK是否有異常。
?
?
具體做法是在每個Broker上都建立一個監控專用的Topic,監控程序使用我們自己提供的SDK框架來連接集群(就像我們的業務用戶那樣),監控生產者會給每個集群發送少量消息。然后檢查發送是否成功:
| 發送成功 | 成功 | 
| 刷盤超時 | |
| Slave超時 | |
| Slave不可用 | |
| 發送失敗 | 具體錯誤碼 | 
生產者只對這些結果進行打點,不判斷是否正常,具體到監控(或者演練)場景可以配置不同的報警規則。
?
消費者收到了消息會通過TCP旁路Ack生產者,生產者這邊會做分布式對賬,將對賬結果打點:
- 收到消息
- 消息丟失(或超時未收到消息)
- 重復收到消息
- 消息生成到最終消費的時間差
- Ack生產者失敗(由消費者打點)
同樣監控程序只負責打點,報警規則可另外配置。
?
這套機制也可以用于分布式性能壓測和故障演練。在做壓測的時候,每個消息都Ack的話,對生產者的內存壓力很大,因為它發出去的消息,需要在內存中保留一段時間(直到到達這個消息的對賬時間),這段時間消費者Ack或者重復Ack都需要記錄。所以我們實現了按比例抽樣對賬的功能,開啟以后只有需要對賬的消息才會在內存中保留一段時間。
?
順便說一下,我們做壓測時,合格的標準是異步生產不失敗、消費不延遲、每一個消息都不丟失。這樣做是為了保證壓測時能給出更加準確的,可供線上系統參考的性能數字,而不是制造理想條件,追求一個大的數字。比如異步生產比同步生產更脆弱(壓測client如果同步生產,broker抖動的時候,同步client會被堵塞導致發送速度降低,于是降低了broker壓力,消息發送不容易失敗,但是會看到發送速率在波動),更貼近生產環境的實際情況,我們就選擇異步生產來評估。
?
性能優化
Broker默認的參數在我們的場景下(SSD、同步復制、異步刷盤)不是最優的,有的參數也許在大多數場景下都不是最優的。我們列出一些重要的參數,供大家參考:
?
| 參數 | 默認值 | 說明 | 
| flushCommitLogTimed | False | 默認值不合理,異步刷盤這個參數應該設置成true,導致頻繁刷盤,對性能影響極大 | 
| deleteWhen | 04 | 幾點刪除過期文件的時間,刪除文件時有很多磁盤讀,這個默認值是合理的,有條件的話還是建議低峰刪除 | 
| sendMessageThreadPoolNums | 1 | 處理生產消息的線程數,這個線程干的事情很多,建議設置為2~4,但太多也沒有什么用。因為最終寫commit log的時候只有一個線程能拿到鎖。 | 
| useReentrantLockWhenPutMessage | False | 如果前一個參數設置比較大,這個最好設置為True,避免高負載下自旋鎖空轉消耗CPU。 | 
| sendThreadPoolQueueCapacity | 10000 | 處理生產消息的隊列大小,默認值可能有點小,比如5萬TPS(異步發送)的情況下,卡200ms就會爆。設置比較小的數字可能是擔心有大量大消息撐爆內存(比如100K的話,1萬個的消息大概占用1G內存,也還好),具體可以自己算,如果都是小消息,可以把這個數字改大。可以修改broker參數限制client發送大消息。 | 
| brokerFastFailureEnable | True | Broker端快速失敗(限流),和下面兩個參數配合。這個機制可能有爭議,client設置了超時時間,如果client還愿意等,并且sendThreadPoolQueue還沒有滿,不應該失敗,sendThreadPoolQueue滿了自然會拒絕新的請求。但如果client設置的超時時間很短,沒有這個機制可能導致消息重復。可以自行決定是否開啟。理想情況下,能根據client設置的超時時間來清理隊列是最好的。 | 
| waitTimeMillsInSendQueue | 200 | 200ms很容易導致發送失敗,建議改大,比如1000 | 
| osPageCacheBusyTimeOutMills | 1000 | Page cache超時時間,如果內存比較多,比如32G以上,建議改大點 | 
?
?
得益于簡單、幾乎0依賴的部署模式,使得我們部署小集群的成本非常低;不對社區版本進行魔改,保證我們可以及時升級;統一SDK入口方便集群維護和功能升級;通過復合小集群+自動負載均衡實現多機房多活;充分利用RocketMQ的功能比如事務消息、延遲消息(增強)來滿足業務的多樣性需求;通過自動的分布式對賬,對每一個Broker以及我們的SDK進行正確性監控;本問也進行了一些性能參數的分享,但寫的比較簡單,基本只說了怎么調,但沒能細說為什么,以后我們會另寫文章詳述。目前RocketMQ已經應用在公司在大多數業務線,期待將來會有更好的發展!
?
掃碼了解更多技術干貨與客戶案例:
原文鏈接:https://developer.aliyun.com/article/782584?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的快手基于RocketMQ的在线消息系统建设实践的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 【直播提醒】荷小鱼:K12 在线教育应用
- 下一篇: 世纪联华的 Serverless 之路
