检测和测试停滞的流– RxJava常见问题解答
假設您有一個流以不可預測的頻率發布事件。 有時您可以預期每秒會有數十條消息,但是偶爾幾秒鐘都看不到任何事件。 如果您的流是通過Web套接字,SSE或任何其他網絡協議傳輸的,則可能會出現問題。 靜默時間過長(停頓)可以解釋為網絡問題。 因此,我們經常不時發送人工事件( ping ),以確保:
- 客戶還活著
- 讓客戶知道我們還活著
舉一個更具體的例子,假設我們有一個Flowable<String>流,它會產生一些事件。 如果沒有事件超過一秒鐘,則應發送占位符"PING"消息。 當靜默時間更長時,應該每秒發出一個"PING"消息。 我們如何在RxJava中實現這樣的要求? 最明顯但不正確的解決方案是將原始流與ping合并:
Flowable<String> events = //... Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);mergeWith()運算符至關重要:它接受真正的events ,并將它們與恒定的ping流合并。 當然,當不存在真實事件時,將顯示"PING"消息。 不幸的是,它們與原始流完全無關。 這意味著即使有很多正常事件,我們也會繼續發送ping命令。 而且,當靜默開始時,我們不會在一秒鐘后精確發送"PING" 。 如果您對這種機制感到滿意,則可以在此處停止閱讀。
一種更復雜的方法需要發現持續超過1秒的靜音。 我們可以使用timeout()運算符。 不幸的是,它會產生TimeoutException并從上游退訂-行為過于激進。 我們只想收到某種通知。 事實證明,可以使用debounce()運算符。 通常,此操作員會推遲新事件的發出,以防萬一有新事件出現,從而覆蓋了舊事件。 所以,如果我說:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS);這意味著delayed流僅在1秒內未跟隨其他事件時才會發出事件。 如果events流保持足夠快的速度產生事件,那么技術上delayed可能永遠不會發出任何東西。 我們將使用delayed流通過以下方式發現沉默:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.map(ev -> "PING"); Flowable<String> eventsWithPings = Flowable.merge(events, pings);請記住, mergeWith()和它的static merge()對應物之間沒有區別。 所以我們到了某個地方。 如果流繁忙,則delayed流將永遠不會收到任何事件,因此不會發送"PING"消息。 但是,當原始流不發送任何事件超過1秒時, delayed接收到最后一次看到的事件,將其忽略并轉換為"PING" 。 聰明,但壞了。 此實現僅在發現停頓后才發送一個"PING" ,而不是每秒發送一次定期ping。 很容易修復! 除了將最后一次看到的事件轉換為單個"PING"我們還可以將其轉換為周期性ping序列:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING")); Flowable<String> eventsWithPings = Flowable.merge(events, pings);您能看到缺陷在哪里嗎? 每當原始流中出現一點沉默時,我們就會每秒發出一次ping 。 但是,一旦出現真正的事件,我們應該停止這樣做。 我們沒有。 上游的每個停頓都會導致新的無限ping流出現在最終的合并流中。 我們必須以某種方式告訴pings流,因為原始流發出了真正的事件,所以它應該停止發出ping 。 猜猜是什么,有takeUntil()運算符可以做到這一點!
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)); Flowable<String> eventsWithPings = Flowable.merge(events, pings);花一點時間完全掌握上面的代碼片段。 每當原始流上超過1秒沒有任何反應時, delayed流就會發出一個事件。 pings流發射的序列"PING"每秒從發射每個事件的事件delayed 。 但是,一旦事件出現在events流上,便會終止pings流。 您甚至可以將所有這些定義為單個表達式:
Flowable<String> events = //... Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));可測性
好的,我們已經編寫了所有這些內容,但是我們應該如何測試事件驅動代碼的這個三層嵌套的Blob? 我們如何確保ping在正確的時間出現并在靜音結束后停止? 如何模擬各種與時間相關的場景? RxJava具有許多殺手級功能,但是測試時間流逝可能是最大的功能。 首先,讓我們的ping代碼更具可測試性和通用性:
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}此實用程序方法采用任意的T流并添加ping ,以防該流在較長時間內不產生任何事件。 我們在測試中像這樣使用它:
PublishProcessor<String> events = PublishProcessor.create(); TestScheduler clock = new TestScheduler(); Flowable<String> eventsWithPings = withPings(events, clock, "PING");哦,男孩, PublishProcessor , TestScheduler ? PublishProcessor是一個有趣的類,它是一個亞型Flowable (所以我們可以使用它作為一個普通的流)。 另一方面,我們可以使用其onNext()方法強制發出事件:
events.onNext("A");如果有人收聽events流,他將立即收到"A"事件。 這clock是怎么回事? RxJava中以任何方式處理時間的每個運算符(例如debounce debounce() , interval() , timeout() , window() )都可以采用可選的Scheduler參數。 它充當時間的外部來源。 特殊的TestScheduler是我們完全控制的人為時間來源。 也就是說,只要我們不顯式調用advanceTimeBy()時間就保持靜止:
clock.advanceTimeBy(999, MILLISECONDS);999毫秒不是巧合。 Ping在1秒鐘后開始精確顯示,因此在999毫秒后將不可見。 現在是時候揭示完整的測試用例了:
@Test public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING"); }看起來像一堵墻,但這實際上是我們邏輯的完整測試方案。 它可以確保ping在1000毫秒后精確顯示,在寂靜時間很長的情況下會重復執行,而在出現真正的事件時會重復執行。 但最重要的部分是:該測試是100%可預測的并且非常快。 沒有Awaitility ,忙等待,輪詢,間歇性測試失敗和緩慢。 我們完全控制的人工時鐘可確保所有這些組合流均按預期工作。
翻譯自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html
總結
以上是生活随笔為你收集整理的检测和测试停滞的流– RxJava常见问题解答的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IntelliJ中的远程调试Wildfl
- 下一篇: 京东商城网下载电脑版(京东网上商城电脑版