固定速率与固定延迟– RxJava常见问题解答
如果您使用的是純Java,從版本5開始,我們有一個方便的調度程序類,該類允許以固定速率或固定延遲運行任務:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService;ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);基本上,它支持兩種類型的操作:
scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS); scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS); scheduleAtFixedRate()將確保每秒精確調用doStuff()且初始延遲為2秒。 當然,垃圾回收,上下文切換等仍然會影響精度。 scheduleWithFixedDelay()看起來很相似,但是它考慮了doStuff()處理時間。 例如,如果doStuff()運行doStuff()毫秒,則固定速率將僅等待800毫秒,直到下一次重試。 另一方面, scheduleWithFixedDelay()總是在重試之間等待相同的時間(在本例中為1秒)。 在不同情況下,兩種行為當然都是可取的。 僅記住,當doStuff()的速度慢于1秒時, scheduleAtFixedRate()不會保留所需的頻率。 即使我們的ScheduledExecutorService有10個線程, doStuff()也絕不會被同時調用并且不會與之前的執行重疊。 因此,在這種情況下,速率實際上將小于配置的速率。
<h1”> RxJava中的計劃
使用interval()運算符,使用RxJava模擬scheduleAtFixedRate()非常簡單。 有幾點警告:
Flowable.interval(2, 1, SECONDS).subscribe(i -> doStuff());如果doStuff()的速度慢于1秒,則會發生不良情況。 首先,我們使用Schedulers.computation()線程池,它是從interval()運算符繼承的默認池。 這是一個壞主意,該線程池僅應用于CPU密集型任務,并在整個RxJava之間共享。 一個更好的主意是使用您自己的調度程序(或至少使用io() ):
Flowable.interval(2, 1, SECONDS).observeOn(Schedulers.io()).subscribe(i -> doStuff());observeOn()從開關computation()由用于調度interval()到io()調度器。 由于subscribe()方法永遠不會被設計并發調用, doStuff()永遠不會并發調用,就像scheduleAtFixedRate() 但是, interval()運算符非常努力地保持恒定的頻率。 這意味著如果過一會兒doStuff()的速度慢于1秒,我們應該期望MissingBackpressureException …RxJava基本上告訴我們訂戶速度太慢,但是interval() (根據設計)不會變慢。 如果您允許(甚至期望) doStuff()并發執行重疊,則修復起來非常簡單。 首先,您必須使用非阻塞式Completable包裝阻塞式doStuff() 。 從技術上講, Flowable Single或Maybe也可以工作,但是由于doStuff()為void ,所以Completable聽起來不錯:
import io.reactivex.Completable; import io.reactivex.schedulers.Schedulers;Completable doStuffAsync() {return Completable.fromRunnable(this::doStuff).subscribeOn(Schedulers.io()).doOnError(e -> log.error("Stuff failed", e)).onErrorComplete(); }捕獲并吞下異常很重要,否則單個錯誤將導致整個interval()中斷。 doOnError()允許記錄日志,但它通過下游傳遞異常。 另一方面, doOnComplete()僅吞下異常。 現在,我們可以在每個間隔事件中簡單地運行此操作
Flowable.interval(2, 1, SECONDS).flatMapCompletable(i -> doStuffAsync()).subscribe();如果您不subscribe()循環將永遠不會開始-但這是RxJava101。請注意,如果doStuffAsync()花費一秒鐘以上的時間來完成,我們將得到重疊的并發執行。 這沒有什么錯,您只需要意識到這一點。 但是,如果您真正需要的是固定延遲怎么辦?
修復了RxJava中的延遲
在某些情況下,您需要固定的延遲時間:任務不應重疊,并且我們應在兩次執行之間保持一定的喘息時間。 不管周期性任務有多慢,都應始終保持恒定的時間暫停。 interval()運算符不適合實現此要求。 但是,事實證明,RxJava中的解決方案非常簡單。 想一想:您需要睡一會兒,運行一些任務,然后在完成此任務后重復。 讓我再說一遍:
- 睡一會兒(有一些timer() )
- 運行一些任務,等待它complete()
- repeat()
而已!
Flowable.timer(1, SECONDS).flatMapCompletable(i -> doStuffAsync()).repeat().subscribe();一秒鐘后, timer()運算符發出一個事件( Long類型的0 )。 我們使用此事件來觸發doStuffAsync() 。 當我們的東西做,全碼流完成-但我們想重復! 好吧, repeat()運算符就是這樣做的:當它從上游收到完成通知時,它會重新訂閱。 重新訂閱基本上意味著:再等待1秒鐘, doStuffAsync() –依此類推。
翻譯自: https://www.javacodegeeks.com/2017/09/fixed-rate-vs-fixed-delay-rxjava-faq.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的固定速率与固定延迟– RxJava常见问题解答的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 皇室战争4399电脑版(皇室战争4399
- 下一篇: 三星5830报价(三星s5660手机价格