全面升级 —— Apache RocketMQ 5.0 SDK 的新面貌
作者 | 凌楚
引言
長久以來,RocketMQ 易于部署、高性能、高可用的架構,支撐了數十年來集團內外海量的業務場景。時至今日,為了迎接如今云原生時代的新挑戰,我們重磅推出了 RocketMQ 5.0 新架構。
在 5.0 新架構中,我們更新了整個 RocketMQ 的網絡拓撲模型,著眼于將更上層的業務邏輯從 broker 中剝離到無狀態的 proxy ,這樣獨立的計算節點可以無損地承擔日后的升級發布任務,與此同時將 broker 解放出來承擔純粹的存儲任務,為未來打造更強的消息存儲引擎做好鋪墊。通信層方面,出于標準化,多語言的考慮我們摒棄了 RocketMQ 使用多年的 RemotingCommand 協議,采用了 gRPC 來實現客戶端與服務端之間的通信邏輯。
針對于用戶側,我們希望盡可能少的叨擾客戶進行升級,維持邏輯輕量,易于維護,可觀測性良好,能夠可以達到“一次性把事情做對”。
目前,保證了接口完全兼容的,基于 RocketMQ 5.0 的商業化版本 Java SDK 已經在公有云 release 完成,開源版本也即將 release。SDK 將同時支持云上 proxy 架構的云上版本和開源版本的 Broker。下面將展開敘述 RocketMQ 5.0 新架構下的 SDK 做了哪些迭代與演進。
全面異步化
1、異步的初衷
由于涉及諸多的網絡 IO,因此 RocketMQ 對消息發送開放了同步和異步兩套 API 提供給用戶使用。舊有架構從 API 針對于同步和異步維護了兩套類似的業務邏輯,非常不利于迭代。考慮到這一點,此次新架構 SDK 希望在底層就可以將它們統一起來。
以消息發送為例,一個完整的消息發送鏈路包括獲取:
其中從遠端獲取 topic 對應的路由是一個重 IO 操作,而發送消息本身也是一個重 IO 操作。在以往的發送實現中,即使是異步發送,對于路由的獲取也是同步的,路由的獲取本身并沒有計入用戶的發送耗時中,用戶本身是可以自主設置消息發送的超時時間的,而由于本身消息的發送是同步的,無法做到超時時間的精準控制,而在使用異步 Future 之后,可以非常方便地通過控制 Future 的超時時間來做到。
2、異步統一所有實現
本質上 RocketMQ 里所有的重 IO 操作都可以通過異步來進行統一。得益于 gRPC 本身提供了基于 Future 的 stub,我們將網絡層的 Future 一層層串聯到最終的業務層。當用戶需要同步 API 時,則進行同步等待;當用戶需要異步 API 時,則在最外層的 Future 添加回調進行監聽。
實際上基于 Future 設計的思想是貫穿整個客戶端實現的。譬如,消息消費也是通過唯一的基于 Future 的實現來完成的:
針對于順序消息消費失敗這種需要本地 suspend 一段時間重新投遞的情況,消費接口增加了延時參數。然而無論是普通消息還是順序消息,都只會返回含有消費狀態的 Future 。上層再針對含有消費狀態的 Future 來進行消息的 ACK/NACK 。特別地,針對于服務端向客戶端投遞特定消息進行消費驗證的場景,也是調用當前 Future 接口,再對消費結果進行包裝向服務端響應消費結果。
RocketMQ 本身的發送和消費過程中充斥著大量的異步邏輯,使用 Future 使得大量的接口實現得到了精簡和統一。尤其在我們的基于 gRPC 新架構協議的 IDL 中,為了保持簡單全部都是使用 unary rpc(非流式),使得我們全部可以使用 gRPC 的 Future stub 來完成通信請求。
可觀測性增強
上面這張圖來自于 Peter Bourgon 2017 年的一篇重要博文,系統且詳細地闡述了 metrics、tracing 和 logging 三者之間的特征與定義,以及他們之間的關聯。
- Metrics:具體聚合同類數據的統計信息,用于預警和監控。
- Tracing:關聯和分析同一個調用鏈上的元數據,判斷具體調用鏈上的異常和阻塞行為。
- Logging:記錄離散的事件來分析程序的行為。
云原生時代,可觀測性是云產品的核心競爭力之一。因而可觀測性增強的基調是整個新架構開發之初就已經確定的。舊有架構客戶端邏輯復雜的同時,可觀測性的缺失也導致我們在面臨客戶工單時更加缺乏足夠直觀簡便的手段,因此新架構中我們圍繞 Tracing、Logging 和 Metrics 這三個重要方面進行了全方位的可觀察性提升。
1、全鏈路 Tracing
Tracing 體現在消息中間件中,最基本的,即對每條消息本身的發送、拉取、消費、ACK/NACK、事務提交、存儲、刪除等過程進行全生命周期的監控記錄,在 RocketMQ 中最基本的實現就是消息軌跡。
舊有的消息軌跡采用私有協議進行編解碼,對于消息生命周期的觀測也僅限于發送、消費和事務相關等階段。沒有和開源規范進行統一,也不具備消息自身的軌跡和用戶鏈路的 trace 共享上下文的能力。
新的實現中,擁抱了最新的 CNCF OpenTelemetry 社區協議規范,在客戶端中嵌入了一個 OTLP exporter 將 tracing 數據批量發送至 proxy,proxy 側的方案則比較多樣了,既可以本身作為一個 collector 將數據進行整合,也可以轉發至其他的 collector,proxy 側也會有相對應的 tracing 數據,會和客戶端上報來的 tracing 數據合并進行處理。
由于采用開源標準的 OTLP exporter 和協議,使得用戶自己定義對應的 collector 地址成為可能。在商業版本中我們將用戶客戶端的 tracing 數據和服務端的 tracing 數據進行收集整合后進行托管存儲,開源版本中用戶也可以自定義自己的 collector 地址將 tracing 數據上報到自己的平臺進行分析和處理。
針對于整個消息的生命周期,我們重新設計了所有的 span 拓撲模型。以最簡單的消息發送、接受、消費和 ACK/NACK 過程為例:
其中:
- Prod :Produce, 表示消息的發送,即起始時間為消息開始發送,結束時間為收到消息發送結果(消息內部重試會單獨進行記錄一條 span);
- Recv :Receive,表示消息的接收,即起始時間為客戶端發起接受消息的請求,結束時間為收到對應的響應;
- Await :表示消息到達客戶端直到消息開始被消費;
- Proc :Process,表示消息的消費過程;
- Ack/Nack 表示消息被 Ack/Nack 的過程。
這個過程,各個 Span 之間的關系如下:
商業版的 ONS 在管控側也對新版本 Trace 進行了支持,針對于用戶關心的消息生產耗時、具體消費狀況、消費耗時、等待耗時,消費次數等給出了更加詳盡的展示。
通過 SLS 的 trace 服務觀察生產者和消費者 span 的拓撲關系(link 關系沒有進行展示,因此圖中沒有 receive 相關的 span):
OpenTelemetry 關于 messaging span 相關的 specification 也在社區不斷迭代,這涉及到具體的 tracing 拓撲,span 屬性定義(即 attribute semantic conventions)等等。我們也在第一時間將 RocketMQ 相關的內容向社區 OpenTelemetry specification 發起了初步的?Pull Request,并得到了社區的收錄和肯定。也得益于 OpenTelemetry specification 詳盡和規范的定義,我們在 tracing 數據增加了包括且不限于程序運行時、操作系統環境和版本等(即 resource semantic conventions)大量有利于線上問題發現和排查的信息。
關于 tracing context propagation ,我們采用了 W3C 的標準對 trace context 進行序列化和反序列化在客戶端和服務端之間來回傳遞,在下個版本中也會提供讓用戶自定義 trace context 的接口,使得用戶可以很方便地關聯 RocketMQ 和自己的 tracing 數據。
新架構中我們針對于消息生命周期的不同節點,暴露了很多 hook point ,tracing 的邏輯也基于這些 hook point 進行實現,因此也能保持相對獨立。在完整的新架構推向開源之后,整個 tracing 的相關邏輯也會被抽取成專門的 instrumentation library 貢獻給 openTelemetry 社區。
2、準確多樣的 Metrics
Tracing 更多地是從調用鏈的角度去觀察消息的走向,更多的時候對于有共性的數據,我們希望可以有聚合好的 Metrics ?和對應 dashboard 可以從更加宏觀的角度來進行觀測。如果說 tracing 可以幫助更好更快地發現問題和定位問題,那么 Metrics 則提供了重要的多維觀察和預警手段。
在收集到足夠多的 tracing 數據之后,服務端會對這些數據進行二次聚合,計算得出用戶發送、等待以及消費時間等數據的百分位數,對很多毛刺問題能很好地做出判斷。
3、規范化的 Logging
我們在開發實踐中嚴格地按照 Trace、Debug、Info、Warn、Error 的級別進行日志內容的定義,譬如 Trace 級別就會對每個 RPC 請求和響應,每條消息從進入客戶端到進行記錄,Error 級別的日志一旦被打印,必然是值得我們和客戶關注的。在去除大量冗余信息的同時,關鍵節點,譬如負載均衡,發送失敗重試等關鍵鏈路也補全了大量信息,單行日志的信息密度大大增加。
另外,關于日志模塊的實現,RocketMQ 原本是自行開發的,相比較于 logback,log4j2 等外部實現而言,功能相對單一,二次開發成本也相對較高。選型時沒有使用 logback 根本上其實只是想要避免與用戶日志模塊沖突的問題,在調研了諸多方案之后,選擇了 shade logback 的方式進行了替換。這里的 shade 不僅僅只是替換了包名和坐標,同時也修改了 logback 官方的日志配置文件名和諸多內部環境參數。
比如默認配置文件:
| 庫 | 默認配置文件 |
| standard logback | logback.groovy/logback-test.xml/logback.xml |
| logback for rocketmq | rocketmq.logback.groovy/rocketmq.logback-test.xml/rocketmq.logback.xml |
如果用戶在引用 rocketmq 的同時自己也引入了 logback ,完整的配置文件和環境參數的隔離保證了兩者是相互獨立的。特別的,由于新架構 SDK 中引入了 gRPC,我們將 gRPC 基于 JUL 的日志橋接到了 slf4j ,并通過 logback 進行輸出。
// redirect JUL logging to slf4j.// see https://github.com/grpc/grpc-java/issues/1577SLF4JBridgeHandler.removeHandlersForRootLogger();SLF4JBridgeHandler.install();消費模型的更新
RocketMQ 舊有架構的消費模型是非常復雜的。topic 中的消息本身按照 MessageQueue 進行存儲,消費時客戶端按 MessageQueue 對消息進行拉取、緩存和投遞。
ProcessQueue 與 RocketMQ 中的?MessageQueue 一一對應,也基本上是客戶端消費端邏輯中最為復雜的結構之一。在舊架構的客戶端中,拉取到消息之后會先將消息緩存到 ProcessQueue 中,當需要消費時,會從 ProcessQueue 中取出對應的消息進行消費,當消費成功之后再將消息從 ProcessQueue 中 remove 走。其中重試消息的發送,位點的更新在這個過程中穿插。
1、設計思路
在新客戶端中, pop 消費模式的引入使得單獨處理重試消息和位點更新的邏輯被去除。用戶的消費行為變為
因為拉取到的消息在客戶端內存是會先進行緩存,因此還要在消費和拉取的過程中計算消息緩存的大小來對程序進行保護,因此新客戶端中每個 ProcessQueue 分別維護了兩個隊列:cached messages 和 pending messages 。消息在到達客戶端之后會先放在 cached messages 里,準備消費時會從 cached messages 移動到 pending messages 中,當消息消費結束并被 Ack 之后則會從 pending messages 中移除。
新架構的客戶端精簡了 ProcessQueue 的實現,封裝性也做到了更好。對于消費者而言,最為核心的接口其實只有四個。
public interface ProcessQueue { /** * Try to take messages from cache except FIFO messages. * * @param batchMaxSize max batch size to take messages. * @return messages which have been taken. */ List<MessageExt> tryTakeMessages(int batchMaxSize); /** * Erase messages which haven been taken except FIFO messages. * * @param messageExtList messages to erase. * @param status consume status. */ void eraseMessages(List<MessageExt> messageExtList, ConsumeStatus status); /** * Try to take FIFO message from cache. * * @return message which has been taken, or {@link Optional#absent()} if no message. */ Optional<MessageExt> tryTakeFifoMessage(); /** * Erase FIFO message which has been taken. * * @param messageExt message to erase. * @param status consume status. */ void eraseFifoMessage(MessageExt messageExt, ConsumeStatus status);}對于普通消費者(非順序消費)而言,ProcessQueue#tryTakeMessages 將從 Cached messages 中取出消息(取出之后消息會自動從 Cached messages 移動至Pending messages),當消息消費結束之后再攜帶好對應的消費結果去調用ProcessQueue#eraseMessages ,對于順序消費者而言,唯一不同的是對應的方法調用替換成ProcessQueue#tryTakeFifoMessage?和ProcessQueue#eraseFifoMessage 。
而 ProcessQueue#tryTakeMessages?和 ProcessQueue#tryTakeFifoMessage 本身已經包含了消費限流和順序消費時為了保證順序對隊列上鎖的邏輯,即做到了:一旦?ProcessQueue#tryTakeMessages/ProcessQueue#tryTakeFifoMessage 可以取到消息,那么消息一定是滿足被消費條件的。當消費者獲取到消費結果之后,再帶上消費結果執行ProcessQueue#eraseMessage?和ProcessQueue#eraseFifoMessage ,erase 的過程會完成消息的 ACK/NACK 和順序消費時隊列解鎖的邏輯。
簡化之后,上層的消費邏輯基本上只需要負責往消費線程中提交消費任務即可了,任何說得上是 'Process' 的邏輯都在新的 ProcessQueue 完成了閉環。
兼容性與質量保障
整個新架構的 SDK 依賴了protocol buffers, gRPC-java, openTelemetry-java 等諸多類庫。在簡化 RocketMQ 本身代碼的同時也帶來了一些兼容性問題。RocketMQ 本身保持著對 Java 1.6 的兼容性,然而:
- gRPC-java 在 2018 年的 1.15 版本之后不再支持 Java1.6;
- openTelemetry-java 只支持 Java8 及以上版本。
在此期間,我們也調研了 AWS、Azure 等友商相關 SDK 的現狀,發現放棄對 Java 1.6 的支持已經是業內標準做法。但囿于老客戶固守 Java 1.6 的情況,我們也進行了一些改造:
- 對 protocol buffers 的代碼進行了 Java 1.6 的等義替換,并通過了 protocol buffers 所有的單測;
- 對 gRPC 的代碼進行了 Java 1.6 的等義替換,并通過了 gRPC 所有的單測;
- 對于 openTelemetry ,在進行等義替換的同時進行了大量的功能性測試;
單測方面,目前客戶端保證了 75% 以上的行覆蓋率,不過相比較優秀的開源項目還有比較長的距離,這一點我們也會在后續的迭代中不斷完善。
最后
RocketMQ 5.0 是自開源以來架構升級最大的一次版本,具體實現過程還有非常多的細節沒有披露,礙于篇幅無法面面俱到,后續開源過程中也歡迎大家在社區中提出更多更寶貴的意見。
相關鏈接
- Pull Requesthttps://github.com/open-telemetry/opentelemetry-specification/pull/1904
- Metrics, Tracing, and Logginghttps://peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html
- Apache rocketmq apishttps://github.com/apache/rocketmq-apis
- OpenTelemetry specificationhttps://github.com/open-telemetry/opentelemetry-specification
阿里巴巴云原生消息中間件團隊招聘中,強烈歡迎大家自薦和推薦!
有意者請聯系:
@凌楚 (yangkun.ayk@alibaba-inc.com)?
@塵央 (xinyuzhou.zxy@alibaba-inc.com)
點擊下方鏈接,查看更多招聘詳情!
https://job.alibaba.com/zhaopin/position_detail.htm?spm=a2obv.11410903.0.0.674944f6oxzDCj&positionId=134677
原文鏈接:https://developer.aliyun.com/article/797655?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的全面升级 —— Apache RocketMQ 5.0 SDK 的新面貌的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MSE | 阿里巴巴云原生网关三位一体的
- 下一篇: 低代码这么火,它的人才认证你考了吗?