javascript
Reactor 3快速上手——响应式Spring的道法术器
在1.3.2節簡單介紹了不同類型的調度器Scheduler,以及如何使用publishOn和subscribeOn切換不同的線程執行環境。
下邊使用一個簡單的例子再回憶一下:
@Testpublic void testScheduling() {Flux.range(0, 10) // .log() // 1.publishOn(Schedulers.newParallel("myParallel")) // .log() // 2.subscribeOn(Schedulers.newElastic("myElastic")).log() // 3.blockLast();}通過以上三個log()的輸出,可以發現,對于如下圖所示的操作鏈:
- publishOn會影響鏈中其后的操作符,比如第一個publishOn調整調度器為elastic,則filter的處理操作是在彈性線程池中執行的;同理,flatMap是執行在固定大小的parallel線程池中的;
- subscribeOn無論出現在什么位置,都只影響源頭的執行環境,也就是range方法是執行在單線程中的,直至被第一個publishOn切換調度器之前,所以range后的map也在單線程中執行。
這一節我們了解一下它的實現機制。
2.4.1 調度器
調度器相當于Reactor中的ExecutorService,不同的調度器定義不同的線程執行環境。Schedulers工具類提供的靜態方法可搭建不同的線程執行環境。
Schedulers類已經預先創建了幾種常用的不同線程池模型的調度器:使用single()、elastic()和parallel()方法創建的調度器可以分別使用內置的單線程、彈性線程池和固定大小線程池。如果想創建新的調度器,可以使用newSingle()、newElastic()和newParallel()方法。這些方法都是返回一個Scheduler的具體實現。
看一下Scheduler都有哪些行為:
public interface Scheduler extends Disposable {// 調度執行Runnable任務task。Disposable schedule(Runnable task);// 延遲一段指定的時間后執行。Disposable schedule(Runnable task, long delay, TimeUnit unit);// 周期性地執行任務。Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);// 創建一個工作線程。Worker createWorker();// 啟動調度器void start();// 以下兩個方法可以暫時忽略void dispose();long now(TimeUnit unit)// 一個Worker代表調度器可調度的一個工作線程,在一個Worker內,遵循FIFO(先進先出)的任務執行策略interface Worker extends Disposable {// 調度執行Runnable任務task。Disposable schedule(Runnable task);// 延遲一段指定的時間后執行。Disposable schedule(Runnable task, long delay, TimeUnit unit);// 周期性地執行任務。Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);} }如圖所示,Scheduler是領導,Worker是員工,每個Scheduler手中有若干Worker。接到任務后,Scheduler負責分派,Worker負責干活。
在Scheduler中,每個Worker都是一個ScheduledExecutorService,或一個包裝了ScheduledExecutorService的對象。所以,Scheduler擁有的并不是線程池,而是一個自行維護的ScheduledExecutorService池。
所謂“自行維護”,主要有三點:
2.4.2 切換執行環境
再回到publishOn和subscribeOn方法。
在Reactor中,對于數據流的處理,實際上是一系列方法調用和基于事件的回調,包括subscribe、onSubscribe、request,以及onNext、onError、onComplete。拿出2.1節的圖幫助理解:
當調用.subscribe()方法時,會形成從上游向下游的數據流,數據流中的元素通過onNext* (onError|onComplete)攜帶“順流而下”。同時,Reactor使用者看不到的是,還有一條從下游向上游的“訂閱鏈”,request就是沿著這個鏈向上反饋需求的。
publishOn方法能夠將onNext、onError、onComplete調度到給定的Scheduler的Worker上執行。所以如上圖場景中,再.map和.filter中間增加一個publisheOn(Schedulers.elastic())的話,.filter操作的onNext的過濾處理將會執行在ElasticScheduler的某個Worker上。
subscribeOn方法能夠將subscribe(會調用onSubscribe)、request調度到給定的Scheduler的Worker上執行。所以在任何位置增加一個subscribeOn(Schedulers.elastic())的話,都會借助自下而上的訂閱鏈,通過subscribe()方法,將線程執行環境傳遞到“源頭”,從而Flux.just會執行在ElasticScheduler上。繼而影響到其后的操作符,直至遇到publishOn改變了執行環境。
此外,有些操作符本身會需要調度器來進行多線程的處理,當你不明確指定調度器的時候,那些操作符會自行使用內置的單例調度器來執行。例如,Flux.delayElements(Duration)?使用的是?Schedulers.parallel()調度器對象:
@Testpublic void testDelayElements() {Flux.range(0, 10).delayElements(Duration.ofMillis(10)).log().blockLast();}從輸出可以看到onNext運行在不同的線程上:
[ INFO] (main) onSubscribe(FluxConcatMap.ConcatMapImmediate) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(0) [ INFO] (parallel-2) onNext(1) [ INFO] (parallel-3) onNext(2) [ INFO] (parallel-4) onNext(3) ...2.4.3 為數據流配置Context
在Reactor中,基于Scheduler的線程調度確實非常簡單好用,但是還有個問題需要解決。
我們以往在編寫多線程的代碼時,如果涉及到只在線程內部使用的值,可能會使用ThreadLocal進行包裝。
但是在響應式編程中,由于線程環境經常發生變化,這一用法就失去作用了,并且甚至帶來bug。比如,使用 Logback 的 MDC 來存儲日志關聯的 ID 就屬于這種情況。
自從版本 3.1.0,Reactor 引入了一個類似于 ThreadLocal 的高級功能:Context。它作用于一個 Flux 或一個 Mono 上,而不是應用于一個線程(Thread)。也就是其生命周期伴隨整個數據流,而不是線程。
相對來說,用戶使用Context并不多,對此感興趣或有此需求的話,請看我翻譯的相關文檔,可以對Reactor內部實現尤其是Subscription有更深的理解。
2.4.4 并行執行
如今多核架構已然普及,能夠方便的進行并行處理是很重要的。
對于一些能夠在一個線程中順序處理的任務,即使調度到ParallelScheduler上,通常也只由一個Worker來執行,比如:
@Testpublic void testParallelFlux() throws InterruptedException {Flux.range(1, 10).publishOn(Schedulers.parallel()).log().subscribe();TimeUnit.MILLISECONDS.sleep(10);}輸出如下:
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (parallel-1) | onNext(1) [ INFO] (parallel-1) | onNext(2) [ INFO] (parallel-1) | onNext(3) [ INFO] (parallel-1) | onNext(4) [ INFO] (parallel-1) | onNext(5) [ INFO] (parallel-1) | onNext(6) [ INFO] (parallel-1) | onNext(7) [ INFO] (parallel-1) | onNext(8) [ INFO] (parallel-1) | onNext(9) [ INFO] (parallel-1) | onNext(10) [ INFO] (parallel-1) | onComplete()有時候,我們確實需要一些任務能夠“均勻”分布在不同的工作線程上執行,這時候就需要用到ParallelFlux。
你可以對任何Flux使用parallel()操作符來得到一個ParallelFlux。不過這個操作符本身并不會進行并行處理,而只是將負載劃分到多個執行“軌道”上(默認情況下,軌道個數與CPU核數相等)。
為了配置ParallelFlux如何并行地執行每一個軌道,需要使用runOn(Scheduler),這里,Schedulers.parallel() 是比較推薦的專門用于并行處理的調度器。
@Testpublic void testParallelFlux() throws InterruptedException {Flux.range(1, 10).parallel(2).runOn(Schedulers.parallel()) // .publishOn(Schedulers.parallel()).log().subscribe();TimeUnit.MILLISECONDS.sleep(10);}輸出如下:
[ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (main) onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber) [ INFO] (main) request(unbounded) [ INFO] (parallel-1) onNext(1) [ INFO] (parallel-2) onNext(2) [ INFO] (parallel-1) onNext(3) [ INFO] (parallel-2) onNext(4) [ INFO] (parallel-1) onNext(5) [ INFO] (parallel-2) onNext(6) [ INFO] (parallel-1) onNext(7) [ INFO] (parallel-2) onNext(8) [ INFO] (parallel-1) onNext(9) [ INFO] (parallel-2) onNext(10) [ INFO] (parallel-1) onComplete() [ INFO] (parallel-2) onComplete()可以看到,各個元素的onNext “均勻”分布執行在兩個線程上,最后每個線程上有獨立的onComplete事件,這與publishOn調度到ParallelScheduler上的情況是不同的。
?
https://blog.51cto.com/liukang/2090163
總結
以上是生活随笔為你收集整理的Reactor 3快速上手——响应式Spring的道法术器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 日本的中体诗(I)
- 下一篇: 常用易忘CSS小技巧