异步重试_异步重试模式
異步重試
 當(dāng)您有一段經(jīng)常失敗且必須重試的代碼時,此Java 7/8庫提供了豐富且簡潔的API以及針對此問題的快速且可擴展的解決方案: 
現(xiàn)在,您可以運行任意代碼塊,并且?guī)鞂槟卦囋摯a塊,以防它拋出SocketException :
final CompletableFuture<Socket> future = executor.getWithRetry(() ->new Socket("localhost", 8080) );future.thenAccept(socket ->System.out.println("Connected! " + socket) );請仔細看! getWithRetry()不會阻止。 它立即返回CompletableFuture并異步調(diào)用給定的函數(shù)。 您可以一次收聽該Future甚至是多個Future ,并同時進行其他工作。 因此,這段代碼的作用是:嘗試連接到localhost:8080 ,如果由于SocketException失敗,它將在500毫秒后重試(帶有一些隨機抖動),每次重試后的延遲加倍,但不超過10秒。
等效但更簡潔的語法:
executor.getWithRetry(() -> new Socket("localhost", 8080)).thenAccept(socket -> System.out.println("Connected! " + socket));這是您可能期望的示例輸出:
TRACE | Retry 0 failed after 3ms, scheduled next retry in 508ms (Sun Jul 21 21:01:12 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 1 failed after 0ms, scheduled next retry in 934ms (Sun Jul 21 21:01:13 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 2 failed after 0ms, scheduled next retry in 1919ms (Sun Jul 21 21:01:15 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Successful after 2 retries, took 0ms and returned: Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]Connected! Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]想象一下,您連接到兩個不同的系統(tǒng),一個系統(tǒng)速度很慢 ,第二個系統(tǒng)不可靠并且經(jīng)常失敗:
CompletableFuture<String> stringFuture = executor.getWithRetry(ctx -> unreliable()); CompletableFuture<Integer> intFuture = executor.getWithRetry(ctx -> slow());stringFuture.thenAcceptBoth(intFuture, (String s, Integer i) -> {//both done after some retries });當(dāng)緩慢而又不可靠的系統(tǒng)最終沒有任何失敗地進行答復(fù)時,異步執(zhí)行thenAcceptBoth()回調(diào)。 類似地(使用CompletableFuture.acceptEither() ),您可以同時異步調(diào)用兩個或多個不可靠的服務(wù)器,并在重試幾次后第一個成功時會收到通知。
我對此不夠強調(diào)–重試是異步執(zhí)行的,并且有效地使用了線程池,而不是盲目地睡眠。
基本原理
通常我們被迫重試給定的代碼段,因為它失敗了,我們必須再次嘗試,通常會稍有延遲以節(jié)省CPU。 這項要求非常普遍,并且很少有現(xiàn)成的通用實現(xiàn),其中RetryTemplate是通過RetryTemplate類在Spring Batch中提供重試支持 。 但是幾乎沒有其他類似的方法( [1] , [2] )。 所有這些嘗試(我敢打賭,你們中的許多人自己都實現(xiàn)了類似的工具!)遇到了同樣的問題-它們正在阻塞,從而浪費了大量資源,并且擴展性不好。
 這本身并不壞,因為它使編程模型更加簡單-庫負責(zé)重試,而您只需要等待比平常更長的返回值即可。 但是,這不僅會造成泄漏的抽象(由于重試和延遲,通常非常快的方法突然變得很慢),而且浪費了寶貴的線程,因為此類工具將在重試之間花費大部分時間。 因此 
 創(chuàng)建了Async-Retry實用程序,該實用程序針對Java 8 (現(xiàn)有Java 7 backport )并解決了上述問題。 
主要的抽象是RetryExecutor ,它提供了簡單的API:
public interface RetryExecutor {CompletableFuture<Void> doWithRetry(RetryRunnable action);<V> CompletableFuture<V> getWithRetry(Callable<V> task);<V> CompletableFuture<V> getWithRetry(RetryCallable<V> task);<V> CompletableFuture<V> getFutureWithRetry(RetryCallable<CompletableFuture<V>> task); }不必擔(dān)心RetryRunnable和RetryCallable –為方便起見,它們允許使用已檢查的異常,并且在大多數(shù)情況下,無論如何我們都會使用lambda表達式。
請注意,它返回CompletableFuture 。 我們不再假裝調(diào)用錯誤方法很快。 如果庫遇到異常,它將使用預(yù)先配置的退避延遲重試我們的代碼塊。 調(diào)用時間將從幾毫秒飛漲到幾秒鐘。 CompletableFuture清楚地表明了這一點。 而且,它不是愚蠢的java.util.concurrent.Future我們都知道– Java 8中的CompletableFuture非常強大 ,最重要的是–默認情況下是非阻塞的。
如果您畢竟需要阻止結(jié)果,只需在Future對象上調(diào)用.get() 。
基本API
該API非常簡單。 您提供了一個代碼塊,該庫將多次運行它,直到它正常返回為止,而不是引發(fā)異常。 它也可能在重試之間設(shè)置可配置的延遲:
RetryExecutor executor = //...executor.getWithRetry(() -> new Socket("localhost", 8080));一旦成功連接到localhost:8080將解析返回的CompletableFuture<Socket> 。 (可選)我們可以使用RetryContext來獲取額外的上下文,例如當(dāng)前正在執(zhí)行的重試:
executor.getWithRetry(ctx -> new Socket("localhost", 8080 + ctx.getRetryCount())).thenAccept(System.out::println);此代碼比看起來更聰明。 在第一次執(zhí)行期間ctx.getRetryCount()返回0 ,因此我們嘗試連接到localhost:8080 。 如果失敗,則下一次重試將嘗試localhost:8081 ( 8080 + 1 ),依此類推。 而且,如果您意識到所有這些都是異步發(fā)生的,那么您可以掃描多臺計算機的端口,并收到有關(guān)每個主機上第一個響應(yīng)端口的通知:
Arrays.asList("host-one", "host-two", "host-three").stream().forEach(host ->executor.getWithRetry(ctx -> new Socket(host, 8080 + ctx.getRetryCount())).thenAccept(System.out::println));對于每個主機, RetryExecutor將嘗試連接到端口8080,并嘗試使用更高的端口。
getFutureWithRetry()需要特別注意。 我想重試已經(jīng)返回CompletableFuture<V> :例如異步HTTP調(diào)用的結(jié)果:
private CompletableFuture<String> asyncHttp(URL url) { /*...*/}//...final CompletableFuture<CompletableFuture<String>> response = executor.getWithRetry(ctx -> asyncHttp(new URL("http://example.com"))); 將asyncHttp()傳遞給getWithRetry()將產(chǎn)生CompletableFuture<CompletableFuture<V>> 。 與它一起工作不僅很尷尬,而且還很麻煩。 該庫將僅調(diào)用asyncHttp()并僅在失敗時重試,但在返回時不重試 
 CompletableFuture<String>失敗。 解決方案很簡單: 
在這種情況下, RetryExecutor將理解從asyncHttp()返回的內(nèi)容實際上只是一個Future并且將(異步)等待結(jié)果或失敗。 該庫功能更強大,因此讓我們深入了解:
配置選項
通常,您可以配置兩個重要因素: RetryPolicy ,用于控制是否應(yīng)進行下一次重試;以及Backoff (可以有選擇地增加后續(xù)重試之間的延遲)。
默認情況下, RetryExecutor在每個Throwable上無限重復(fù)用戶任務(wù),并在RetryExecutor重試之間增加1秒的延遲。
創(chuàng)建
RetryExecutor默認實現(xiàn)是AsyncRetryExecutor ,您可以直接創(chuàng)建:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();RetryExecutor executor = new AsyncRetryExecutor(scheduler);//...scheduler.shutdownNow();唯一需要的依賴項是JDK的標(biāo)準(zhǔn)ScheduledExecutorService 。 在許多情況下,一個線程就足夠了,但是如果您要同時處理數(shù)百個或更多任務(wù)的重試,請考慮增加池大小。
注意, AsyncRetryExecutor不會關(guān)閉ScheduledExecutorService 。 這是一個有意識的設(shè)計決策,將在后面進行解釋。
AsyncRetryExecutor幾乎沒有其他構(gòu)造函數(shù),但是在大多數(shù)情況下,更改類的行為最方便的方法是with*()方法鏈接調(diào)用。 您將看到大量以此方式編寫的示例。 稍后,我們將僅使用executor引用而不定義它。 假設(shè)它是RetryExecutor類型。
重試政策
例外情況
默認情況下,從用戶任務(wù)拋出的每個Throwable (特殊AbortRetryException除外)都會導(dǎo)致重試。 顯然,這是可配置的。 例如,在JPA中,您可能想重試由于OptimisticLockException而失敗的事務(wù)-但其他所有異常都應(yīng)立即失敗:
executor.retryOn(OptimisticLockException.class).withNoDelay().getWithRetry(ctx -> dao.optimistic());其中dao.optimistic()可能會引發(fā)OptimisticLockException 。 在這種情況下,您可能不希望重試之間有任何延遲,以后再說。 如果您不喜歡在每個Throwable上重試的默認設(shè)置,只需使用retryOn()來限制它:
executor.retryOn(Exception.class)當(dāng)然,也可能需要相反的做法–中止重試并在拋出某些異常的情況下立即失敗而不是重試。 就這么簡單:
executor.abortOn(NullPointerException.class).abortOn(IllegalArgumentException.class).getWithRetry(ctx -> dao.optimistic());顯然,您不想重試NullPointerException或IllegalArgumentException因為它們指示編程錯誤,而不是瞬時失敗。 最后,您可以結(jié)合使用重試和中止策略。 如果發(fā)生任何retryOn()異常(或子類),則用戶代碼將重試,除非它應(yīng)該abortOn()指定的異常。 例如,我們想重試每個IOException或SQLException但是如果遇到FileNotFoundException或java.sql.DataTruncation (順序無關(guān)),則中止:
executor.retryOn(IOException.class).abortIf(FileNotFoundException.class).retryOn(SQLException.class).abortIf(DataTruncation.class).getWithRetry(ctx -> dao.load(42));如果這還不夠,您可以提供將在每次失敗時調(diào)用的自定義謂詞:
executor.abortIf(throwable ->throwable instanceof SQLException &&throwable.getMessage().contains("ORA-00911"));最大重試次數(shù)
中斷重試“循環(huán)”的另一種方法(請記住此過程是異步的,沒有阻塞循環(huán) )是通過指定最大重試次數(shù):
executor.withMaxRetries(5)在極少數(shù)情況下,您可能希望禁用重試,而幾乎不利用異步執(zhí)行。 在這種情況下,請嘗試:
executor.dontRetry()重試之間的延遲(退避)
有時需要在失敗后立即重試(請參閱OptimisticLockException示例),但是在大多數(shù)情況下,這是一個壞主意。 如果您無法連接到外部系統(tǒng),請稍等片刻,然后再嘗試下一次嘗試。 您可以節(jié)省CPU,帶寬和其他服務(wù)器的資源。 但是有很多選擇要考慮:
- 我們應(yīng)該以固定的間隔重試還是增加每次失敗后的延遲 ?
 - 輪候時間是否應(yīng)該有上限和下限?
 - 我們是否應(yīng)該添加隨機“抖動”來延遲時間以及時分散許多任務(wù)的重試?
 
該庫回答了所有這些問題。
重試間隔固定
默認情況下,每次重試之前都有1秒的等待時間。 因此,如果初始嘗試失敗,則將在1秒后執(zhí)行第一次重試。 當(dāng)然,我們可以將默認值更改為200毫秒:
executor.withFixedBackoff(200)如果我們已經(jīng)在此處,則默認情況下,執(zhí)行用戶任務(wù)后將應(yīng)用退避。 如果用戶任務(wù)本身消耗一些時間,則重試的頻率將降低。 例如,重試延遲為RetryExecutor毫秒,而用戶任務(wù)失敗所需的平均時間約為50毫秒, RetryExecutor將每秒重試約4次(50毫秒+ RetryExecutor毫秒)。 但是,如果要將重試頻率保持在更可預(yù)測的水平,則可以使用fixedRate標(biāo)志:
executor.withFixedBackoff(200).withFixedRate()這類似于ScheduledExecutorService “固定速率”與“固定延遲”方法。 順便說一句,不要期望RetryExecutor會非常精確,這是最好的,但是在很大程度上取決于前面提到的ScheduledExecutorService準(zhǔn)確性。
重試之間的間隔呈指數(shù)增長
它可能是一個活躍的研究主題,但是總的來說,您可能希望隨著時間的推移擴展重試延遲,假設(shè)如果用戶任務(wù)多次失敗,我們應(yīng)該減少嘗試次數(shù)。 例如,假設(shè)我們從100ms的延遲開始,直到進行第??一次重試為止,但是如果該嘗試也失敗了,我們應(yīng)該再等待兩次(200ms)。 再過400毫秒,800毫秒……您就會明白:
executor.withExponentialBackoff(100, 2)這是一個指數(shù)函數(shù),可以快速增長。 因此,將最大退避時間設(shè)置在某個合理的水平(例如10秒)非常有用:
executor.withExponentialBackoff(100, 2).withMaxDelay(10_000) //10 seconds隨機抖動
在嚴(yán)重停機期間經(jīng)常觀察到的一種現(xiàn)象是系統(tǒng)趨于同步。 想象一個繁忙的系統(tǒng)突然停止響應(yīng)。 成百上千的請求失敗并被重試。 這取決于您的退避,但默認情況下,所有這些請求都會在一秒鐘后重試,而一秒鐘會產(chǎn)生大量流量。 最后,此類故障會傳播到其他系統(tǒng),這些系統(tǒng)又會進行同步。
為避免此問題,隨著時間的推移擴展重試,使負載平坦化是很有用的。 一個簡單的解決方案是添加隨機抖動來延遲時間,以便并非所有請求都計劃在完全相同的時間重試。 您可以在均勻抖動(隨機值從-100ms到100ms)之間進行選擇:
executor.withUniformJitter(100) //ms…和比例抖動,將延遲時間乘以隨機因子,默認情況下為0.9到1.1(10%):
executor.withProportionalJitter(0.1) //10%您還可以對延遲時間設(shè)置嚴(yán)格的下限,以避免安排較短的重試時間:
executor.withMinDelay(50) //ms實施細節(jié)
該庫是在考慮Java 8的情況下構(gòu)建的,以利用lambda和新的CompletableFuture抽象(但存在具有Guava依賴項的Java 7 port )。 它在下面使用ScheduledExecutorService來運行任務(wù)并計劃將來的重試-這樣可以最大程度地利用線程。
但是真正有趣的是,整個庫是完全不變的,根本沒有單個可變字段。 起初這可能是違反直覺的,例如以以下簡單代碼示例為例:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();AsyncRetryExecutor first = new AsyncRetryExecutor(scheduler).retryOn(Exception.class).withExponentialBackoff(500, 2);AsyncRetryExecutor second = first.abortOn(FileNotFoundException.class);AsyncRetryExecutor third = second.withMaxRetries(10);似乎所有with*()方法或retryOn() / abortOn()方法retryOn()現(xiàn)有執(zhí)行程序變異。 但是事實并非如此,每次配置更改都會創(chuàng)建一個新實例 ,而舊實例則保持不變。 因此,例如,當(dāng)first執(zhí)行者將重試FileNotFoundException , second和third執(zhí)行者則不會。 但是,它們都共享同一scheduler 。 這就是AsyncRetryExecutor不關(guān)閉ScheduledExecutorService (甚至沒有任何close()方法)的原因。 由于我們不知道有多少個AsyncRetryExecutor副本指向同一調(diào)度程序,因此我們甚至不嘗試管理其生命周期。 但是,這通常不是問題(請參見下面的Spring集成 )。
您可能想知道,為什么這么笨拙的設(shè)計決定? 有以下三個原因:
- 在編寫并發(fā)代碼時,不變性可以大大降低多線程錯誤的風(fēng)險。 例如, RetryContext保留重試次數(shù)。 但是,我們無需更改變量,而只需創(chuàng)建具有遞增但final計數(shù)器的新實例(副本)。 沒有比賽條件或能見度。
 - 如果給您現(xiàn)有的RetryExecutor幾乎完全是您想要的,但是您需要進行一些細微調(diào)整,則只需調(diào)用executor.with...()并獲取一個新副本。 您不必擔(dān)心使用同一執(zhí)行程序的其他地方(請參閱: Spring集成以獲取更多示例)
 - 如今,功能編程和不可變數(shù)據(jù)結(jié)構(gòu)非常流行 。
 
注意: AsyncRetryExecutor 未標(biāo)記為final ,您可以通過將其子類化并添加可變狀態(tài)來打破不變性。 請不要這樣做,子類只允許更改行為。
依存關(guān)系
該庫需要Java 8和SLF4J進行記錄。 Java 7端口還取決于Guava 。
Spring整合
如果您即將在Spring中使用RetryExecutor ,請隨時使用,但配置API可能對您不起作用。 Spring通過大量的設(shè)置來促進(或用于促進)可變服務(wù)的約定。 在XML中,您定義bean并在其上調(diào)用setter(通過<property name="..."/> )。 該約定假定存在變異設(shè)置器。 但是我發(fā)現(xiàn)這種方法在某些情況下容易出錯并且違反直覺。
假設(shè)我們?nèi)侄x了org.springframework.transaction.support.TransactionTemplate bean并將其注入到多個位置。 大。 現(xiàn)在有一個請求,它的超時要求略有不同:
@Autowired private TransactionTemplate template;后來在同一個班級:
final int oldTimeout = template.getTimeout(); template.setTimeout(10_000); //do the work template.setTimeout(oldTimeout);此代碼在許多級別上都是錯誤的! 首先,如果發(fā)生故障,我們將永遠不會恢復(fù)oldTimeout 。 好了, finally救了。 但還要注意我們?nèi)绾胃娜止蚕淼腡ransactionTemplate實例。 誰知道不知道更改配置的其他幾個bean和線程將要使用它?
即使您確實想全局更改事務(wù)超時,也足夠公平,但是這樣做仍然是錯誤的方法。 private timeout字段不是volatile ,因此對其進行的更改對于其他線程可能可見,也可能不可見。 真是一團糟! 同樣的問題出現(xiàn)在許多其他類(如JmsTemplate 。
你知道我要去哪里嗎? 只需創(chuàng)建一個不變的服務(wù)類,并在需要時通過創(chuàng)建副本來安全地對其進行調(diào)整。 現(xiàn)在,使用此類服務(wù)??同樣簡單:
@Configuration class Beans {@Beanpublic RetryExecutor retryExecutor() {return new AsyncRetryExecutor(scheduler()).retryOn(SocketException.class).withExponentialBackoff(500, 2);}@Bean(destroyMethod = "shutdownNow")public ScheduledExecutorService scheduler() {return Executors.newSingleThreadScheduledExecutor();}}嘿! 進入21世紀(jì),我們在Spring不再需要XML。 Bootstrap也很簡單:
final ApplicationContext context = new AnnotationConfigApplicationContext(Beans.class); final RetryExecutor executor = context.getBean(RetryExecutor.class); //... context.close();如您所見,將現(xiàn)代的,不可變的服務(wù)與Spring集成非常簡單。 順便說一句,如果您在設(shè)計自己的服務(wù)時沒有準(zhǔn)備好進行如此大的更改,請至少考慮構(gòu)造函數(shù)注入 。
到期
 該庫包含大量的單元測試。 但是,尚未在任何生產(chǎn)代碼中使用它,并且該API可能會更改。 當(dāng)然,我們鼓勵您提交錯誤,功能請求和提取請求 。 它是在考慮到Java 8的情況下開發(fā)的,但是Java 7 backport存在一些更冗長的API和強制性Guava依賴關(guān)系( ListenableFuture而不是 
 Java 8的CompletableFuture )。 
 GitHub上的完整源代碼。 
翻譯自: https://www.javacodegeeks.com/2013/08/asynchronous-retry-pattern.html
異步重試
總結(jié)
以上是生活随笔為你收集整理的异步重试_异步重试模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: ddos攻击器软件和肉机(ddos攻击器
 - 下一篇: linux 语言环境变量(linux 语