提升不止一点点,Dubbo 3.0 预览版详细解读
Dubbo 自 2011 年 10 月 27 日開源后,已被許多非阿里系的公司使用,其中既有當當網、網易考拉等互聯網公司,也不乏中國人壽、青島海爾等大型傳統企業。更多用戶信息,可以訪問Dubbo @GitHub,issue#1012: Wanted: who's using dubbo。
自去年 12 月開始,Dubbo 3.0 便已正式進入開發階段,并備受社區和廣大 Dubbo 用戶的關注,本文將為您詳細解讀 3.0 預覽版的新特性和新功能。
下面先解答一下兩個有意思的與 Dubbo 相關的疑問。
- 為什么 Dubbo 一開源就是 2.0 版本?之前是否存在 1.0 版本?
筆者曾做過 Dubbo 協議的適配兼容,Dubbo 確實存在過 1.x 版本,而且從協議設計和模型設計上都與 2.0 的開源版本協議是完全不一樣的。下圖是關于 Dubbo 的發展路徑:
- 阿里內部正在使用 Dubbo 開源版本嗎?
是的,非常確定,當前開源版本的 Dubbo 在阿里巴巴被廣泛使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是兼容了 Dubbo 使用方式和 Remoting 協議。當然,我們現在正在做 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來做內核的統一。所以,Dubbo 是得到大規模線上系統驗證的分布式服務框架,這一點毋容置疑。
Dubbo 3.0 預覽版的要點
Dubbo 3.0 在設計和功能上的新增支持和改進,主要是以下四方面:
- Dubbo 內核之 Filter 鏈的異步化
這里要指出的是,3.0 中規劃的異步去阻塞和 2.7 中提供的異步是兩個層面的特性。2.7 中的異步是建立在傳統 RPC 中 request – response 會話模型上的,而 3.0 中的異步將會從通訊協議層面由下向上構建,關注的是跨進程、全鏈路的異步問題。通過底層協議開始支持 streaming 方式,不單單可以支持多種會話模型,還可以在協議層面開始支持反壓、限流等特性,使得整個分布式體系更具有彈性。綜上所述,2.7 關注的異步更局限在點對點的異步(一個 consumer 調用一個 provider),3.0 關注的異步化,寬度上則關注整個調用鏈上的異步,高度上則向上又可以包裝成 Rx 的編程模型。有趣的是,Spring 5.0 發布了對 Flux 的支持,隨后開始解決跨進程的異步問題。
- 功能方面是 reactive(響應式)支持
最近幾年,?reactive programming這個詞語的熱度迅速提升,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現Reactive Stream 的 rx 接口,從而能讓用戶享受到RP帶來的響應性提升,甚至面向 RP 的架構升級。當然,我們希望 reactive 不單單能夠帶來事件(event)驅動的應用集成方式的升級,也希望在 Load Balance(選擇最優的服務節點),fault tolerance(限流降級時最好做到自適應)等方面發揮其積極價值。
- 云原生/ ServiceMesh 方向的探索
我們定下的策略是進入 Envoy 社區來實現 Dubbo 融入 mesh 的理念思想,目前 Dubbo 協議已經被 Envoy 支持。當然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務治理方面的工作需要繼續在數據面建設,另外,控制面板的建設在社區也沒有提上日程。
- 融合并支持阿里內部
Dubbo 3.0 定下了內外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產系統中部署,相信通過大流量、大規模的考驗,Dubbo 用戶可以獲得一個性能、穩定、服務治理實踐各方面俱佳的核心,用戶在生產系統中采用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。
Filter 鏈的異步化設計
Dubbo 最強大的一處設計是其在 Filter 鏈上的抽象設計,通過其擴展機制的開放性支持,用戶可以對 Dubbo 做功能增強,并允許各個擴展點被定制來是否保留。
Dubbo 的 Filter 定義如下:
@SPIpublic interface Filter {/*** do invoke filter.* <p>* <code>* // before filter* Result result = invoker.invoke(invocation);* // after filter* return result;* </code>** @param invoker service* @param invocation invocation.* @return invoke result.* @throws RpcException* @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)*/Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;}按照“調用一個遠程服務的方法就像調用本地的方法一樣”這種說法,這個直接返回 Result 響應的方式是非常好的,用起來是簡單直接,問題是時代變換到了需要關注體驗,需要走 Reactive 響應式的時代,也回到基本點:invoke一個 invocation 需要經過網絡在不同的進程處理,天然就是異步的過程,也就是發送請求(invocation)與接收響應(Result)本身是兩個不同的事件,是需要兩個過程方法來在 Filter 鏈處理。那么如何改造這個關鍵的 SPI 呢?有兩種方案:
第一種,把 invoke 的返回值改成 CompletableFuture, 好處是一目了然,Result 不在建議同步獲取了;但基礎接口的簽名一改會導致代碼改造量巨大,同時也會讓原有的 SPI 擴展不在支持。
第二種,Result 接口直接繼承 CompletationStage,是代表了響應的異步計算。這樣能進避免第一種的劣勢。所以,3.0.0 Preview 版本對內部調用鏈路實現做了一次重構:基于 CompletableFuture 實現了框架內部的全異步調用,而在外圍編程上,同時支持同步、異步調用模式。
值得注意的是,此次重構僅限于框架內部實現,對使用方沒有任何影響即接口上保持完全兼容。要了解 Dubbo 異步 API 如何使用,請參考《如何基于 Dubbo 實現全異步的調用鏈》,這篇文章將著重對實現思路和原理做一些簡單介紹。此次重構的要點有:
- 框架內部采用全異步調用模型,僅在外圍做同步、異步適配;
- 內置Filter鏈支持異步回調;
基本工作流程
首先我們來看一個通用的跨網絡異步調用的線程模型:
通信框架異步發送請求消息,請求消息發送成功后,返回代表業務結果的 CompletableFuture 給業務線程。之后對于 Future 的處理,根據調用類型會有所區別:
接下來具體看一下一次異步 Dubbo RPC 請求的調用流程:
- 如果方法是 CompletableFuture 簽名,則返回 Future;
- 如果方法是普通同步簽名,則返回對象默認值,Future 可通過 RpcContext 拿到;
6. 調用方在拿到代表異步業務結果的 Future 后,可選擇注冊回調監聽器,以監聽真正的業務結果返回。
同步調用和異步調用基本上是一致的,并且也是走的回調模式,只是在鏈路返回之前做了一次阻塞 get 調用,以確保在收到實際結果時再返回。Filter 在注冊 Listener 時由于 Future 已處于 complete 狀態,因此會同時觸發回調 onResponse()/onError()。
關于流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 調用中代表返回結果,在 3.0 中 Result 自身增加了代表狀態的接口,類似 Future 現在 Result 可以代表一次未完成的調用。
要讓 Result 具備代表異步返回結果的能力,有兩中方式來實現:
1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 接口。
public interface Result extends CompletionStage {}2. 讓 Result 實例持有 Future 實例,與 1 的區別即是設計中選用“繼承”還是“組合”。
public class AsyncRpcResult implements Result {private CompletableFuture<RpcResult> resultFuture;}同時,為了讓 Result 更直觀的體現其異步結果的特性,也為了方便面向 Result 接口編程,我們可以考慮為Result增加一些異步接口:
public interface Result extends Serializable {Result thenApplyWithContext(Function<Result, Result> fn);<U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);Result get() throws InterruptedException, ExecutionException;}Filter SPI
Filter 是 Dubbo 預置的攔截器擴展 SPI,用來做請求的預處理、結果的后處理,框架本身內置了一些攔截器實現,而從用戶層面,我相信這個 SPI 也應該是被擴展最多的一個。在 3.0 版本中,Filter 回歸單一職責的設計模式,將回調接口單獨提取到 Listener 中。
@SPIpublic interface Filter {Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;interface Listener {void onResponse(Result result, Invoker<?> invoker, Invocation invocation);void onError(Throwable t, Invoker<?> invoker, Invocation invocation);}}以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞調用請求。
同時,增加了一個新的回調接口 Listener,每個 Filter 實現可以定義自己的 Listenr 回調器,從而實現對返回結果的異步監聽,參考以下是為 MonitorFilter 增加的 Listener 回調實現:
class MonitorListener implements Listener {@Overridepublic void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);getConcurrent(invoker, invocation).decrementAndGet(); // count down}}@Overridepublic void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);getConcurrent(invoker, invocation).decrementAndGet(); // count down}}}泛化調用異步接口支持
為了更直觀的做異步調用,泛化接口新增了?CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)接口:
public interface GenericService {/*** Generic invocation** @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is* required, e.g. findPerson(java.lang.String)* @param parameterTypes Parameter types* @param args Arguments* @return invocation return value* @throws GenericException potential exception thrown from the invocation*/Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {Object object = $invoke(method, parameterTypes, args);if (object instanceof CompletableFuture) {return (CompletableFuture<Object>) object;}return CompletableFuture.completedFuture(object);}}這樣,當我們想做異步調用時,就可以直接這樣使用:
CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);更具體用例請參見《泛化調用示例》
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call
異步與性能
組要注意的是,框架內部的異步實現本身并不能提高單次調用的性能,相反,由于線程切換和回調邏輯的存在,異步反而可能會導致單次調用性能的下降,但是異步帶來的優勢是能減少對資源的占用,提升整個系統的并發程度和吞吐量,這點對于 RPC 這種需要處理網絡延遲的場景非常適用。更多關于異步化設計的好處,請參考其他異步化原理介紹相關文章。
響應式編程支持
響應式編程讓開發者更方便地編寫高性能的異步代碼,很可惜,在之前很長一段時間里,dubbo 并不支持響應式編程,簡單來說,dubbo 不支持在 rpc 調用時使用 Mono/Flux 這種流對象(reative-stream 里流的概念),給用戶使用帶來了不便。(關于響應式編程更詳細的信息請參見這里:http://reactivex.io/)。
RSocket 是一個開源的支持 reactive-stream 語義的網絡通信協議,他將 reative 語義的復雜邏輯封裝起來了,使得上層可以方便實現網絡程序。(RSocket詳細資料請參見這里:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 對響應式編程進行了簡單的支持,用戶可以在請求參數和返回值里使用 Mono 和 Flux 類型的對象。下面我們給出使用范例,(范例源碼可以在這里獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定義接口如下:
public interface DemoService {Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2);}然后實現該 demo 接口:
public class DemoServiceImpl implements DemoService {@Overridepublic Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {return m1.zipWith(m2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});}@Overridepublic Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {return f1.zipWith(f2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});}}然后配置并啟動服務端,注意協議名字填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"><!-- provider's application name, used for tracing dependency relationship --><dubbo:application name="demo-provider"/><!-- use registry center to export service --><dubbo:registry address="zookeeper://127.0.0.1:2181"/><!-- use dubbo protocol to export service on port 20880 --><dubbo:protocol name="rsocket" port="20890"/><!-- service implementation, as same as regular local bean --><bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/><!-- declare the service interface to be exported --><dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/></beans> public class RsocketProvider {public static void main(String[] args) throws Exception {new EmbeddedZooKeeper(2181, false).start();ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});context.start();System.in.read(); // press any key to exit}}然后配置并啟動消費者消費者如下, 注意協議名填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"><!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),don't set it same as provider --><dubbo:application name="demo-consumer"/><!-- use registry center to discover service --><dubbo:registry address="zookeeper://127.0.0.1:2181"/><!-- generate proxy for the remote service, then demoService can be used in the same way as thelocal regular interface --><dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/></beans> public class RsocketConsumer {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});context.start();DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxywhile (true) {try {Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));monoResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).block();Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));fluxResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).blockLast();} catch (Throwable throwable) {throwable.printStackTrace();}}}}可以看到配置上除了協議名使用 rsocket 以外其他并沒有特殊之處。
實現原理
以前用戶并不能在參數或者返回值里使用 Mono/Flux 這種流對象(reative-stream 里的流的概念)。因為流對象自帶異步屬性,當業務把流對象作為參數或者返回值傳遞給框架之后,框架并不能將流對象正確的進行序列化。
dubbo 基于 RSocket 實現了 reative 支持。RSocket 將 reative 語義的復雜邏輯封裝起來了,給上層提供了簡潔的抽象如下:
/*** Fire and Forget interaction model of {@code RSocket}.** @param payload Request payload.* @return {@code Publisher} that completes when the passed {@code payload} is successfully* handled, otherwise errors.*/Mono<Void> fireAndForget(Payload payload);/*** Request-Response interaction model of {@code RSocket}.** @param payload Request payload.* @return {@code Publisher} containing at most a single {@code Payload} representing the* response.*/Mono<Payload> requestResponse(Payload payload);/*** Request-Stream interaction model of {@code RSocket}.** @param payload Request payload.* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.*/Flux<Payload> requestStream(Payload payload);/*** Request-Channel interaction model of {@code RSocket}.** @param payloads Stream of request payloads.* @return Stream of response payloads.*/Flux<Payload> requestChannel(Publisher<Payload> payloads);我們只需要在此基礎上添加我們的 rpc 邏輯即可。
- 從客戶端視角看,框架建立連接之后,只需要將請求信息編碼到 Payload 里,然后通過 requestStream 方法即可向服務端發起請求。
- 從服務端視角看,rsocket 收到請求之后,會調用我們實現的 requestStream 方法,我們從 Payload 里解碼得到請求信息之后,調用業務方法,然后拿到 Flux 類型的返回值即可。
- 需要注意的是業務返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我們需要通過 map operator 攔截業務數據,將 BizDO 編碼為 Payload 才可以遞交給我 RSocket。而 RSocket 會負責數據的傳輸和 reative 語義的實現。
經過上面的分析,我們知道了 Dubbo 如何基于 RSocket 實現了響應式編程的支持。有了響應式編程支持,業務可以更加方便的實現異步邏輯。
小結
當前 Dubbo 3.0 將提供具備當代特性(如響應性編程)的相關支持,同時汲取阿里內部 HSF 的設計長處來實現兩者的融合,當前預覽版的很多地方還在探討中,希望大家能夠積極反饋,我們都會虛心學習并參考。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的提升不止一点点,Dubbo 3.0 预览版详细解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 物联网落地三大困境破解
- 下一篇: “练好内功坚持被集成”,阿里云发布Saa