异步http 超时_具有CompletableFuture的异步超时
異步http 超時(shí)
有一天,我重寫了執(zhí)行不佳的多線程代碼,該代碼在Future.get()某個(gè)時(shí)刻被阻塞:
這實(shí)際上是一個(gè)用Java編寫的Akka應(yīng)用程序,具有1000個(gè)線程的線程池(原文如此!)–所有這些線程都在此get()調(diào)用中被阻止。 否則系統(tǒng)無法跟上并發(fā)請求的數(shù)量。 重構(gòu)后,我們擺脫了所有這些線程,只引入了一個(gè),大大減少了內(nèi)存占用。 讓我們簡化一下并顯示Java 8中的示例。第一步是引入CompletableFuture而不是普通的Future (請參閱提示9 )。 很簡單,如果:
- 您可以控制如何將任務(wù)提交給ExecutorService :只需使用CompletableFuture.supplyAsync(..., executorService)而不是executorService.submit(...)
- 您處理基于回調(diào)的API:使用Promise
否則(如果您已經(jīng)阻塞了API或Future<T> ),將有一些線程被阻塞。 這就是為什么現(xiàn)在有這么多異步API誕生的原因。 假設(shè)我們以某種方式重寫了代碼以接收CompletableFuture :
public void serve() throws InterruptedException, ExecutionException, TimeoutException {final CompletableFuture<Response> responseFuture = asyncCode();final Response response = responseFuture.get(1, SECONDS);send(response); }顯然,這并不能解決任何問題,我們必須利用新的React式編程風(fēng)格:
public void serve() {final CompletableFuture<Response> responseFuture = asyncCode();responseFuture.thenAccept(this::send); }這在功能上是等效的,但現(xiàn)在serve()應(yīng)該立即運(yùn)行(沒有阻塞或等待)。 只要記住, this::send將在完成responseFuture的同一線程中執(zhí)行。 如果您不想在某個(gè)地方重載某些任意線程池或send()昂貴,請考慮為此使用單獨(dú)的線程池: thenAcceptAsync(this::send, sendPool) 。 很好,但是我們失去了兩個(gè)重要的屬性:錯(cuò)誤傳播和超時(shí)。 因?yàn)槲覀兏牧薃PI,所以錯(cuò)誤傳播很難。 當(dāng)serve()方法退出時(shí),異步操作可能尚未完成。 如果您關(guān)心異常,請考慮返回responseFuture或其他替代機(jī)制。 至少,請記錄異常,因?yàn)榉駝t它將被吞噬:
final CompletableFuture<Response> responseFuture = asyncCode(); responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null; });請注意上面的代碼: exceptionally()嘗試從故障中恢復(fù) ,并返回替代結(jié)果。 它在這里有效,但是如果您將thenAccept() exceptionally()與thenAccept() ,即使發(fā)生故障也將調(diào)用send() ,但使用null參數(shù)(或者我們從exceptionally()返回的值exceptionally() :
responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;}).thenAccept(this::send); //probably not what you think丟失1秒超時(shí)的問題非常微妙。 我們的原始代碼等待(阻塞)最多1秒鐘,直到Future完成。 否則拋出TimeoutException 。 我們失去了此功能,甚至超時(shí)的更糟糕的單元測試也不方便并且經(jīng)常被跳過。 為了在不犧牲事件驅(qū)動精神的前提下實(shí)現(xiàn)超時(shí),我們需要一個(gè)額外的構(gòu)建塊:在給定時(shí)間之后始終失敗的未來:
public static <T> CompletableFuture<T> failAfter(Duration duration) {final CompletableFuture<T> promise = new CompletableFuture<>();scheduler.schedule(() -> {final TimeoutException ex = new TimeoutException("Timeout after " + duration);return promise.completeExceptionally(ex);}, duration.toMillis(), MILLISECONDS);return promise; }private static final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1,new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());這很簡單:我們創(chuàng)建一個(gè)諾言 (沒有基礎(chǔ)任務(wù)或線程池的未來),并在給定java.time.Duration之后使用TimeoutException完成它。 如果您get()某個(gè)地方get()這樣的未來,則阻塞至少這么長時(shí)間后,將拋出TimeoutException 。 實(shí)際上,它將是ExecutionException包裝TimeoutException ,沒有辦法解決。 請注意,我僅使用一個(gè)線程使用固定scheduler線程池。 這不僅是出于教育目的:“在這種情況下,“ 1個(gè)線程對于任何人都應(yīng)該足夠 ”” [1] 。 failAfter()本身是沒有用的,但將其與我們的responseFuture結(jié)合起來,我們就有了解決方案!
final CompletableFuture<Response> responseFuture = asyncCode(); final CompletableFuture<Response> oneSecondTimeout = failAfter(Duration.ofSeconds(1)); responseFuture.acceptEither(oneSecondTimeout, this::send).exceptionally(throwable -> {log.error("Problem", throwable);return null;});這里發(fā)生了很多事情。 在通過我們的后臺任務(wù)接收到responseFuture ,我們還創(chuàng)建了一個(gè)“合成的” oneSecondTimeout未來,它將永遠(yuǎn)不會成功完成,但總是在1秒后失敗。 現(xiàn)在,我們通過調(diào)用acceptEither合并兩者。 該運(yùn)算符將針對第一個(gè)完成的將來( responseFuture或oneSecondTimeout執(zhí)行代碼塊,而只是忽略較慢的代碼的結(jié)果。 如果asyncCode()在1秒鐘內(nèi)完成,則會調(diào)用this::send并且oneSecondTimeout異常將被忽略。 然而! 如果asyncCode()確實(shí)很慢,則oneSecondTimeout啟動。 但是由于失敗并帶有異常,因此將調(diào)用exceptionally錯(cuò)誤處理程序,而不是this::send 。 您可以認(rèn)為send()或exceptionally將被調(diào)用,而不是兩者都被調(diào)用。 當(dāng)然,如果我們有兩個(gè)正常完成的“普通”期貨,則將以第一個(gè)的響應(yīng)調(diào)用send() ,并丟棄后者。
這不是最干凈的解決方案。 一個(gè)干凈的人會包裝原始的未來,并確保它在給定的時(shí)間內(nèi)完成。 此類運(yùn)算符可在com.twitter.util.Future (Scala;稱為com.twitter.util.Future ( within() )中使用,但是在scala.concurrent.Future丟失(可能是受前者啟發(fā))。 讓我們留下Scala并為CompletableFuture實(shí)現(xiàn)類似的運(yùn)算符。 它以一個(gè)Future作為輸入,并返回一個(gè)在基礎(chǔ)底層完成時(shí)完成的Future。 但是,如果完成基礎(chǔ)的未來花費(fèi)的時(shí)間太長,則會引發(fā)異常:
public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) {final CompletableFuture<T> timeout = failAfter(duration);return future.applyToEither(timeout, Function.identity()); }這導(dǎo)致了最終,清潔和靈活的解決方案:
final CompletableFuture<Response> responseFuture = within(asyncCode(), Duration.ofSeconds(1)); responseFuture.thenAccept(this::send).exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;});希望您喜歡這篇文章,因?yàn)槟梢钥吹絁ava的React式編程已不再是未來的事情(無雙關(guān))。
翻譯自: https://www.javacodegeeks.com/2014/12/asynchronous-timeouts-with-completablefuture.html
異步http 超時(shí)
總結(jié)
以上是生活随笔為你收集整理的异步http 超时_具有CompletableFuture的异步超时的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手机内存总不够用手机内存总不够用怎么解决
- 下一篇: 手机是如何设置蓝牙功能手机是如何设置蓝牙