干货 | 携程基于Quasar协程的NIO实践
作者簡介
Ryan,攜程Java開發(fā)工程師,對高并發(fā)、網絡編程等領域有濃厚興趣。
IO密集型系統(tǒng)在高并發(fā)場景下,會有大量線程處于阻塞狀態(tài),性能低下,JAVA上成熟的非阻塞IO(NIO)技術可解決該問題。目前Java項目對接NIO的方式主要依靠回調,代碼復雜度高,降低了代碼可讀性與可維護性。近年來Golang、Kotlin等語言的協(xié)程(Coroutine)能達到高性能與可讀性的兼顧。
本文利用開源的Quasar框架提供的協(xié)程對系統(tǒng)進行NIO改造,解決以下兩個問題:
1)提升單機任務的吞吐量,保證業(yè)務請求突增時系統(tǒng)的可伸縮性。
2)使用更輕量的協(xié)程同步等待IO,替代處理NIO常用的異步回調。
一、Java異步編程與非阻塞IO
本文改造的系統(tǒng)處理來自前臺的任務,通過HTTP請求對端服務,還通過RPC調用內部服務。當業(yè)務高峰時,系統(tǒng)會遇到瞬時并發(fā)任務量數(shù)十倍激增的情況,系統(tǒng)的線程數(shù)量急劇增加造成性能下降。為此,不得不擴容以保證業(yè)務高峰時期的性能。
??
? ? ? ? ? ? ? ? ? ? ? ? ?
基于epoll的NIO框架Netty在一些框架級別的應用中已經得到了廣泛使用,但在快速迭代的業(yè)務系統(tǒng)中的應用依然有一定的局限性。NIO 消除了線程的同步阻塞,意味著只能異步處理IO的結果,這與業(yè)務開發(fā)者順序化的思維模式有一定差異。當業(yè)務邏輯復雜以及出現(xiàn)多次遠程調用的情況下,多級回調難以實現(xiàn)和維護。
1.1?Java中的異步工具
Java項目大多使用JDK8,除線程外可以獲得的異步的編程支持包括CompletableFuture,以及開源的RxJava、Vert.x等反應式編程框架等。這些工具使用了基于響應式編程的鏈式調用逐級傳遞事件,未從根本解決回調問題。
如下為將一段簡單的邏輯判斷使用CompletableFuture進行異步改造后的對比。原始版本使用getA方法獲得第一步的請求結果,根據(jù)其相應選擇使用getB1還是getB2獲取第二步的響應作為結果。
HttpResponse?a?=?getA();HttpResponse b ; if(a.getBody().equals("1")){b=getB1(); } else{b=getB2(); }String ans=b.getBody();首先將三個獲取響應的方法改為異步。此處假設getB1與getB2內部已經具有復雜邏輯,且不屬于同一領域,不適合合并為一個方法。
private CompletableFuture<HttpResponse> getA(); private CompletableFuture<HttpResponse> getB1(); private CompletableFuture<HttpResponse> getB2();然后使用CompletableFuture的鏈式調用,將兩個步驟組合起來:
String ans = getA().thenCompose(a -> {if (a.getBody().equals("1")) {return getB1();} else {return getB2();}}).get().getBody();使用CompletableFuture的鏈式回調后,代碼變得不友好。RxJava等框架同樣具有這個問題。這類反應式的編程工具更適合于數(shù)據(jù)流的傳遞。對于if/else、switch/case,乃至while/for、break/continue這類過程控制語句,實現(xiàn)與維護的難度都很大。業(yè)務系統(tǒng)需要類似于線程的同步等待,同時具有低資源消耗的編碼工具,配合 NIO使用。當時使用NIO時,由于可以不占用線程,可以使用一種資源消耗更小的協(xié)程來等待。
1.2?協(xié)程
協(xié)程是一種進程自身來調度任務的調度模式。協(xié)程與線程不同之處在于,線程由內核調度,而協(xié)程的調度是進程自身完成的。協(xié)程只是一種抽象,最終的執(zhí)行者是線程,每個線程只能同時執(zhí)行一個協(xié)程,但大量的協(xié)程可以只擁有少量幾個線程執(zhí)行者,協(xié)程的調度器負責決定當前線程在執(zhí)行那個協(xié)程,其余協(xié)程處于休眠并被調度器保存在內存中。
和線程類似,協(xié)程掛起時需要記錄棧信息,以及方法執(zhí)行的位置,這些信息會被協(xié)程調度器保存。協(xié)程從掛起到重新被執(zhí)行不需要執(zhí)行重量級的內核調用,而是直接將狀態(tài)信息還原到執(zhí)行線程的棧,高并發(fā)場景下,協(xié)程極大地避免了切換線程的開銷。下圖展示了協(xié)程調度器內部任務的流轉。
協(xié)程中調用的方法是可以掛起的。不同于線程的阻塞會使線程休眠,協(xié)程在等待異步任務的結果時,會通知調度器將自己放入掛起隊列,釋放占用的線程以處理其他的協(xié)程。異步任務完畢后,通過回調將異步結果告知協(xié)程,并通知調度器將協(xié)程重新加入就緒隊列執(zhí)行。
1.3?Quasar任務調度原理
Quasar(https://github.com/puniverse/quasar)是一個開源的Java協(xié)程框架,通過利用Java instrument技術對字節(jié)碼進行修改,使方法掛起前后可以保存和恢復JVM棧幀,方法內部已執(zhí)行到的字節(jié)碼位置也通過增加狀態(tài)機的方式記錄,在下次恢復執(zhí)行可直接跳轉至最新位置。以如下方法為例,該方法分為兩步,第一步為initial初始化,第二部為通過NIO獲取網絡響應。
public String instrumentDemo(){initial();String ans = getFromNIO();return ans; }Quasar會在initial前增加一個flag字段,表明當前方法執(zhí)行的位置。第一次執(zhí)行方法時,檢查到flag為0,修改flag為1并繼續(xù)往下執(zhí)行initial方法。執(zhí)行getFromNIO方法前插入字節(jié)碼指令將棧幀中的數(shù)據(jù)全部保存在一個Quasar自定義的棧結構中,在執(zhí)行getFromNIO后,掛起協(xié)程,讓出線程資源。直至NIO異步完成后,協(xié)程調度器將第二次執(zhí)行該方法,檢測到flag為1,將會調用jump指令跳轉到returnans語句前,并將保存的棧結構還原到當前棧中,最后調用人return ans語句,方法執(zhí)行完畢。
二、系統(tǒng)異步IO改造
在項目中添加Quasar依賴后,可以使用Fiber類新建協(xié)程。建立的方法與線程類似。
new Fiber(()->{//方法體 }).start();2.1?整合Netty與Quasar
系統(tǒng)使用的Http框架是基于Netty的async-http-client(https://github.com/AsyncHttpClient/async-http-client),該框架提供了異步回調和CompletableFuture兩種對響應的異步處理方式。
CompletableFuture自JDK8推出,與之前的Future類最大的不同在于,提供了異步任務跨線程的通知和控制機制。即,任務的等待者可以在CompletableFuture注冊任務完成或異常時的回調,而執(zhí)行者也可以通過它通知等待者。Quaasr框架對它也做了支持,提供了API用于在協(xié)程中等待CompletableFuture的結果。調用后,協(xié)程將掛起,直至future狀態(tài)為已完成。
AsyncCompletionStage.get(future)通過CompletableFuture作為通知中介,我們可以將AsyncHttpClient與Quasar做整合,掛起協(xié)程等待IO結果。
//創(chuàng)建HttpClient AsyncHttpClient httpClient = Dsl.asyncHttpClient(); //創(chuàng)建請求 Request request = createRequest(); //將網絡請求交給HttpClient執(zhí)行 CompletableFuture<Response> future = httpClient.executeRequest(request) .toCompletableFuture(); //通過Quasar掛起協(xié)程 Response response = AsyncCompletionStage.get(future); //獲取網絡結果后,通過future傳遞response并喚醒協(xié)程重新執(zhí)行 deal(response);過程可由下圖表示。
?
Quasar框架AsyncCompletionStage.get內部完成的工作相當于,在HttpClient返回的future上注冊回調,回調的內容是“IO操作完成后通知調度器喚醒協(xié)程”,這樣將NIO異步回調全部操作封裝在協(xié)程調度器中,用戶代碼看起來是同步等待的形式,避免了自行實現(xiàn)回調處理帶來的繁瑣,解決了前文所述的回調地獄。
2.2?聲明掛起方法
Quasar需要織入字節(jié)碼接管掛起方法的調度,在項目主pom下添加quasar-maven-plugin插件,該插件將在編譯后的class文件中修改字節(jié)碼。
<plugin><groupId>com.vlkan</groupId><artifactId>quasar-maven-plugin</artifactId><version>0.7.9</version><executions><execution><goals><goal>instrument</goal></goals></execution></executions> </plugin>Quasar通過識別方法是否拋出了該框架定義的SuspendExecution異常決定是否修改字節(jié)碼。Quasar框架在AsyncCompletionStage.get方法上聲明了SuspendExceution異常,該異常是捕獲異常,但僅作為識別掛起方法的聲明,在運行時不會實際拋出。使用者必須逐層拋出該異常直至新建協(xié)程的一層。當方法內部存在try/catch語句時,也必須拋出該異常。
public void startFiber() throws ExecutionException, InterruptedException {Fiber<Void> fiber = new Fiber<Void>(() -> {//不用繼續(xù)拋出異常Response response = waitNextLayer1();deal(response);}).start(); }private Response waitNextLayer1() throws SuspendExecution {return waitNextLayer2(); }private Response waitNextLayer2() throws SuspendExecution {CompletableFuture<Response> future = httpClient.executeRequest(request) .toCompletableFuture();try {// Quasar框架工具類拋出SuspendExecutionreturn AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }2.3?異步RPC調用
目前主流的RPC框架都基于NIO實現(xiàn),支持異步回調,有的RPC框架已經直接提供了返回CompletableFuture或ListenableFuture(Guava工具類提供)的異步接口,通過使用ComplatableFuture,可以按前文類似的方法將Quasar與RPC框架結合起來。當RPC框架沒有該返回類型時,一般會提供如下類似的帶泛型的異步回調接口:
interface Callback<TResponse> {void callback(TResponse TResponse, Exception e); }這種情況,可以使用者自己創(chuàng)建ComplatableFuture,在回調中設置其狀態(tài),并調用AsyncCompletionStage.get等待這個future。
CompletableFuture<Response> future=new CompletableFuture<>(); //調用hello接口的異步API new RpcClient().helloAsync(request, new Callback<Response>() {public void callback(Response response, Exception e) {if (e == null) future.complete(response);else future.completeExceptionally(e);} }); //在此處調用Quasar的API,掛起直至RPC調用完成 Response response = AsyncCompletionStage.get(future);上述代碼依然具有異步回調不直觀的缺點,通過JDK8的函數(shù)式接口可以實現(xiàn)一個通用的調用模板,將異步回調變?yōu)橥降却男问健?/p> @FunctionalInterface private interface RpcAsyncCall<TRequest, TResponse> {void request(TRequest request, Callback<TResponse> callback); } public <TRequest, TResponse> TResponse waitRpc(RpcAsyncCall<TRequest, TResponse> call, TRequest request) throws SuspendExecution {CompletableFuture<TResponse> future = new CompletableFuture<>();call.request(request, (response, e) -> {if (e == null) future.complete(response);else future.completeExceptionally(e);});try {//使用Quasar等待Future結果return AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }
最后的調用可簡化一行代碼,該方法適用于所有該Rpc框架提供的異步接口。
Response response= waitRpc(new RpcClient()::helloAsync, request);2.4?阻塞操作的處理
Quasar協(xié)程使用的時候有一定的限制,由于調度器線程池大小固定,在協(xié)程中不能阻塞線程,執(zhí)行線程將被占用。對于某些暫時只能依靠阻塞IO的調用,如數(shù)據(jù)庫,消息隊列等,無法使用協(xié)程等待其結果,當這些阻塞操作量不大的情況下,可使用另一個可伸縮的線程池等待結果,避免對協(xié)程調度器的影響。
public void waitBlocking() throws SuspendExecution {//從DB獲取結果String ans = waitBlocking(this::selectFromDB); }private ExecutorService threadPool = Executors.newCachedThreadPool();private <T> T waitBlocking(Supplier<T> supplier) throws SuspendExecution {CompletableFuture<T> future = new CompletableFuture<>();threadPool.submit(() -> {T ans = supplier.get();future.complete(ans);});try {return AsyncCompletionStage.get(future);} catch (Exception e) {return null;} }2.5?并發(fā)工具的使用
協(xié)程對并發(fā)鎖的使用有比較大的限制,需要使用者理解線程鎖與協(xié)程的調度機制。在synchronized同步塊的內部,不能包含掛起協(xié)程的語句。當持有鎖的協(xié)程掛起后會讓出線程資源,由于鎖的可重入性,另一個運行在同一個線程上的協(xié)程再加鎖時同樣會成功。另一方面,協(xié)程掛起后恢復執(zhí)行時,也可能會在另一個線程上運行。出現(xiàn)兩個線程操作共享資源的異常。同時未持有鎖的線程釋放時,會出現(xiàn)IllegalMonitorStateException異常。
但如果同步塊的內部沒有掛起協(xié)程的語句,則線程鎖的機制仍然有效。線程的在執(zhí)行過程中可能切換,而協(xié)程的調度在每個執(zhí)行線程上是串行的,協(xié)程持有的鎖在不包含掛起操作時,會在占用線程執(zhí)行完畢直到退出同步塊為止,不會發(fā)生鎖失效的情況。
JDK并發(fā)包中的工具可分為兩類,一類是Lock、Semaphore、CountDownLatch等具有線程可重入性的工具,不能在未釋放資源前使用掛起協(xié)程的操作,而另一類則是原子變量、并發(fā)容器等不會讓出線程的工具,仍可正常使用,但要注意高并發(fā)的情況下鎖的性能。此外,在使用并發(fā)工具的阻塞方法,如await時,可能導致協(xié)程的執(zhí)行線程中發(fā)生阻塞。
三、總結
系統(tǒng)運行在4核心的主機上,線程池構成如下。
業(yè)務邏輯運行在Quasar的協(xié)程調度線程池中,線程池大小為CPU核數(shù)。HTTP請求與RPC調用均通過內部的NIO線程池管理。此外定義了一個core size為8的可伸縮的線程池用于少量消息隊列、DB等阻塞IO的操作。其余的線程是系統(tǒng)中引入的其他組件所新建的線程,正常情況下不會成為系統(tǒng)性能的瓶頸。
改造后,在業(yè)務高峰流量激增數(shù)十倍的情況下線程數(shù)量依然穩(wěn)定,而CPU利用率也從平均5%以下提升至10%-60%,在瞬時與高峰流量下能保持穩(wěn)定。集群CPU核數(shù)在保留一定的業(yè)務冗余以應對業(yè)務高峰的情況下,縮減至1/5。
3.1?限制與風險
Quasar協(xié)程不是Java的語言標準,沒有JVM層面的支持,使用時必須手動拋出異常聲明每一個掛起方法,對代碼有一定的侵入性。使用不當時,可能出現(xiàn)異常。
代碼的try/catch時可能同時捕獲SuspendExecution異常,從而忘記標記方法,此方法字節(jié)碼不會被修改,結合Quasar的原理不難看出,當沒有織入字節(jié)碼時,掛起方法恢復執(zhí)行,無法還原方法棧幀和執(zhí)行狀態(tài),將會出現(xiàn)語句被重復執(zhí)行、空指針等錯誤。運行時空指針、死循環(huán)的癥狀,排查的重點是是否漏加SuspendExecution標記。
在新線程而不是新協(xié)程中使用掛起方法時,會出現(xiàn)同樣的問題。Thread的構造方法中傳入的是Runnable接口對象,其run方法沒有聲明SuspendExecution異常,run內部的語句不會被織入字節(jié)碼,造成上述異常。
3.2?總結與展望
協(xié)程使得NIO能夠更好地應用在Java中,比回調方法更易讀易維護。對系統(tǒng)的改造集中在底層通信封裝和對方法的標記上,業(yè)務邏輯無需修改。雖然具有一定的代碼侵入性和理解成本,但這種學習成本能逐漸被代碼的可維護性優(yōu)勢抵消。
異步編程最佳的實現(xiàn)方式是:“Codes Like Sync,Works Like Async”,即以同步的方式編碼,達到異步的效果與性能,兼顧可維護性與可伸縮性。OpenJDK 在2018年創(chuàng)建了Loom 項目(https://wiki.openjdk.java.net/display/loom),目標是在JVM上實現(xiàn)輕量級的線程,并解除JVM線程與內核線程的映射。相信會給Java生態(tài)帶來巨大的改變。
總結
以上是生活随笔為你收集整理的干货 | 携程基于Quasar协程的NIO实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring 自定义注解玩法大全,从入门
- 下一篇: 传说中的CAFEBABE到底在哪儿?