reactor官方文档译文(1)Reactor简介
原文地址:http://projectreactor.io/docs/reference/
Reactor簡介
Reactor是一個基礎(chǔ)庫,用在構(gòu)建實時數(shù)據(jù)流應(yīng)用、要求有容錯和低延遲至毫秒、納秒、皮秒的服務(wù)。
— PrefaceTL;DR
什么是Reactor?
讓我們大致了解一下Reactor。在你使用喜歡的搜索敲入一些關(guān)鍵詞如Reactive、spring Reactive、Asynchronous java或者僅僅是"What the heck is Reactor?".簡而言之,Reactor是一個輕量級的JVM基礎(chǔ)庫,它可以幫助我們構(gòu)建的服務(wù)和應(yīng)用高效而異步的傳遞消息。
? ?
高效的含義是什么呢?傳遞一個消息從A到B時GC產(chǎn)生的內(nèi)存很小或者完全沒有。當(dāng)消費者處理消息的速度低于生產(chǎn)者產(chǎn)生消息的速度時產(chǎn)生了溢出時,必須盡快處理。盡可能的提供無鎖的異步流。據(jù)以往的經(jīng)驗來看,我們知道異步編程是困難的,特別是當(dāng)一個平臺提供了很多選項如JVM。
Reactor瞄準(zhǔn)絕大部分場景中真正的無阻塞,并且提供了一組比原生Jdk的java.util.concurrent庫更高效的API。Reactor也提供了一個可選性(不建議使用):
阻塞等待:如Future.get()。
? ? ?Unsafe數(shù)據(jù)獲取:如ReentrantLock.lock()。
異常拋出:如try ..catch ...finally
同步阻塞:如 syschronized
Wrapper配置(GC壓力):例如 new Wrapper<T>(event)
讓我們先使用一個純正的Executor方法:
private ExecutorService threadPool = Executors.newFixedThreadPool(8);final List<T> batches = new ArrayList<T>();Callable<T> t = new Callable<T>() { //1public T run() {synchronized(batches) { //2T result = callDatabase(msg); //3batches.add(result);return result;}} };Future<T> f = threadPool.submit(t); //4 T result = f.get() //51.分配回調(diào)方法---可能會導(dǎo)致gc壓力。
2.Synchronization將強制對每個線程停止檢查。
3. 存在消費者的消費能力低于生產(chǎn)者生產(chǎn)能力的隱患。
4. 使用線程池將task傳遞到目標(biāo)線程--肯定通過FutureTask給gc造成壓力。
5. 阻塞直至callDatabase()響應(yīng)。
從上述的簡單示例中,容易看出擴展性會受到嚴(yán)重的影響。
不斷分配的對象將導(dǎo)致gc停止工作,特別是耗時比較多的大任務(wù)時。當(dāng)一個gc停止工作時將會從降低全局的性能。
隊列默認(rèn)情況下長度是不受限制的。任務(wù)會堆積到數(shù)據(jù)庫中。
? ??后臺日志不是一個內(nèi)存泄露的地方,但是副作用就比較煩人了:在gc暫停工作時需要掃描更多對象;損失數(shù)據(jù)重要bit的風(fēng)險;等等。
經(jīng)典鏈接Queue分配節(jié)點時產(chǎn)生的內(nèi)存壓力。
使用阻塞方式應(yīng)答請求時發(fā)生惡性循環(huán)。
阻塞方式應(yīng)答導(dǎo)致生產(chǎn)者效率慢下來。實際上,因為需要提交更多任務(wù)時等待響應(yīng),流程變成了基本的同步方式。
同數(shù)據(jù)存儲的通信異常將以不友好的形式傳遞到生產(chǎn)者,通過線程邊界來分離工作,這使容錯的協(xié)商變的比較容易。
完全的、真正的非阻塞比較難以實現(xiàn)---特別是有比較時髦名稱的分布式系統(tǒng)中如微服務(wù)架構(gòu)。然而,Reactor卻沒有妥協(xié),它試圖利用可用的最佳模式來使開發(fā)者不必覺得像是在寫一個數(shù)學(xué)論文而僅僅是一個微服務(wù)(nanservice)。
沒有什么比較光更快的了(除了流言蜚語和病毒貓視頻),在某些方面,延遲是每個真實世界的系統(tǒng)必須關(guān)注的。為此:
反應(yīng)器提供了一個框架,可以幫助你減輕惡心的延遲引起的副作用,在應(yīng)用程序中使用最小的開銷:使用一些靈活的結(jié)構(gòu),通過在啟動時預(yù)先分配在運行時的分配數(shù)據(jù)結(jié)構(gòu)來避免分配問題。限制主消息傳送結(jié)構(gòu),因而不會導(dǎo)致任務(wù)無限的累積。
利用流行的模式例如Reactive和事件驅(qū)動架構(gòu)來提供一個包含應(yīng)答的非阻塞的、端對端流。
實現(xiàn)了最新的Reactive流標(biāo)準(zhǔn),通過不發(fā)送多于當(dāng)前容量的請求來使受限的結(jié)構(gòu)更有效率。
使用這些概念到進(jìn)程間通信,提供了理解控制流的非阻塞IO驅(qū)動。
對開發(fā)者暴露功能API,幫助開發(fā)者使用一個無副作用的方式組織代碼,也幫助你確定在什么場景下你是線程安全和具有容錯性的。
項目簡介:
該項目始于2012年,孕育時間較長。2013年出現(xiàn)Reactor1.x版本。該版本成功部署到不同的組織,不僅有開源組織如MeltDown、還有商業(yè)機構(gòu)如Pivotal RTI。2014年我們實現(xiàn)了新的"Reactive流標(biāo)準(zhǔn)",并在2015年的4月開始了版本2.0的大規(guī)模重構(gòu)目標(biāo)。Reactive流標(biāo)準(zhǔn)拉近了分發(fā)機制的鴻溝:控制多少線程傳遞多少數(shù)據(jù)。
同時我們也決定重新調(diào)整我們的一些事件驅(qū)動和任務(wù)協(xié)調(diào)API的來應(yīng)對日益流行、記錄的reactive擴展。
Reactor由Pivotal贊助支持,有兩個核心提交者。因為Pivotal同時也是spring框架的東家,我們的很多同事也是不同spring項目的核心貢獻(xiàn)者,所以我們也提供從Reactor到spring的集成同時也支持spring框架的一些重要功能如spring消息模塊的STOMP代理。也就是說,我們不會強迫僅僅想使用Reactor的人去適應(yīng)spring。我們保留了一個大容量Reactive的內(nèi)嵌工具。事實上,Reactor的目標(biāo)之一是在你解決異步和功能性問題時保持公正的態(tài)度。
Reactor遵循Apache 2.0 licensed?,可以通過?GitHub獲取。
使用要求:
Reactor需要jdk7及以上版本。
但完整的功能組合表達(dá)式需要java8的lambdas支持。
作為后備,支持spring clojure和groovy的擴展。
Reactor需要jvm支持Unsafe方式獲取(如:android不支持)時才能表現(xiàn)最全的功能。
當(dāng)Unsafe獲取不支持是所有基于RingBuffer的特定將不能工作。
Reactor打包成傳統(tǒng)的jar形式存在于maven中央庫中,可以使用你喜歡的工具來拉取這個依賴包。
架構(gòu)總覽:
Reactor基本代碼劃分為幾個子模塊,這樣你可以單獨使用某一模塊而拋棄不需要的模塊。
下述是一些使用Reactor模塊和其它混合的Reactive技術(shù)示例,完成異步目標(biāo):
-
Spring XD + Reactor-Net (Core/Stream) : 使用Reactor 作為Sink/Source IO 驅(qū)動.
-
Grails | Spring + Reactor-Stream (Core) : 使用Stream和 Promise作為后臺處理程序。
-
Spring Data + Reactor-Bus (Core) : 生產(chǎn)數(shù)據(jù)庫事件(Save/Delete/…?).
-
Spring集成Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.
-
RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor
-
RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compose input data with RxJava and gate with Async IO drivers.
?Reactive stream
Reactive stream是一個新的標(biāo)準(zhǔn),不同的廠商和技術(shù)組織包括Netflix,oracle,Pivotal,TypeSafe支持這個標(biāo)準(zhǔn)。該標(biāo)準(zhǔn)有望被java9或者以后的版本的標(biāo)準(zhǔn)收錄進(jìn)去。
該標(biāo)準(zhǔn)的目標(biāo)是提供一種同步或者異步的具有控制流機制的數(shù)據(jù)序列。這個標(biāo)準(zhǔn)是輕量級的,第一個目標(biāo)是JVM。它提供了4個java接口,一個Tck和一系列的示例。根據(jù)需要,4個java接口的實現(xiàn)非常直接,這個項目的內(nèi)涵是Tck對操作的校驗。
?
圖三 Reactive stream協(xié)約
?
Reactive Streams接口 org.reactivestreams.Pubslisher: 數(shù)據(jù)源(從0到N個數(shù)據(jù),n是任意的).它提供兩個可選的中斷事件:error和completion。 org.reactivestreams.Subscriber: 數(shù)據(jù)序列的消費者(從0到N個數(shù)據(jù),n是任意的).它在初始化時接收一個Subscription,subscription獲取Subscriber將要處理多少數(shù)據(jù)。 與其它數(shù)據(jù)時序信號交互的其它回調(diào)有: next (新消息)和可選的completion/error.org.reactivestreams.Subscription:在初始化時傳給Subscriber的一個小的追蹤器。.它控制著我們準(zhǔn)備消費掉多少數(shù)據(jù)和什么時候停止消費(取消).
org.reactivestreams.Processor: 既是Subscriber和Publisher的組件標(biāo)識!
圖四:Reactive Stream發(fā)布協(xié)約
通過傳遞Subscriber,一個請求數(shù)據(jù)從subscriber到publisher有兩種方式: 無限制的: 在訂閱時, 僅調(diào)用Subscription的request(Long.MAX_VALUE)方法.有限制的: 在訂閱時, 保留subscription的引用,并且當(dāng)subscriber準(zhǔn)備處理數(shù)據(jù)時調(diào)用request(long)方法。通常, 在訂閱時Subscribers將請求一組初始數(shù)據(jù)或者甚至1個數(shù)據(jù)。 然后,onNext認(rèn)為執(zhí)行成功(例如后面的Commit, Flush等等?), 請求更多的數(shù)據(jù)。建議使用線性組的請求。為避免請求重疊,例如每次下次請求時請求10個或者更多的數(shù)據(jù)。表1 目前為止,Reactor直接使用的Reactive stream接口及實現(xiàn)
| Processor | reactor-core, reactor-stream | reactor.core.processor.*, reactor.rx.* | 在core模塊,提供了RingBuffer處理器,在stream模塊,提供了一整組操作和Broadcaster。 |
| Publisher | reactor-core, reactor-bus, reactor-stream, reactor-net | reactor.core.processor.*, reactor.rx.stream.*, reactor.rx.action.*, reactor.io.net.* | 在core模塊,處理器繼承了Publisher.在bus模塊,發(fā)布一個不限制的路由事件,在stream模塊,stream擴展直接繼承Publisher. 在net模塊,Chanel繼承了Publisher來消費請求數(shù)據(jù),同時也提供了具有flush和close的回調(diào)的providers. |
| Subscriber | reactor-core, reactor-bus, reactor-stream, reactor-net | reactor.core.processor.*, reactor.bus.EventBus.*, reactor.rx.action.*, reactor.io.net.impl.* | 在core模塊,處理器繼承了Subscriber. 在bus模塊,提供了無限制的Publisher/Subscriber能力.在stream模塊,Subscribers計算特定的回調(diào)行為.在Net模塊,subscriber的IO層實現(xiàn)處理寫、關(guān)閉和flush. |
| Subscription | reactor-stream, reactor-net | reactor.rx.subscription.*, reactor.io.net.impl.* | 在stream模塊, 提供了一個優(yōu)化過的PushSubscriptions和 buffering-ready ReactiveSubscription. 在Net模塊, 使用自定義Subscription實現(xiàn)背壓的方式實現(xiàn)異步IO讀。 |
從reactor 2啟動時我們就一直遵循這個標(biāo)準(zhǔn),并且隨著標(biāo)準(zhǔn)的改變而改變直到1.0.0正式版準(zhǔn)備發(fā)布。現(xiàn)在可以通過maven中央庫及其流行的鏡像可以找到該標(biāo)準(zhǔn),你將發(fā)現(xiàn)它作為過渡,依賴于reactor-core模塊。
Reactive擴展
Reactive擴展或者通常稱作Rx,是一種定義完備的功能api,這些api擴展了觀察者模式到一個史詩的程度。
Rx模式支持實現(xiàn)了使用少數(shù)設(shè)計的關(guān)鍵字來處理Reactive 數(shù)據(jù)序列:
使用回調(diào)鏈來抽象實時及延遲:當(dāng)可以獲得到數(shù)據(jù)時調(diào)用。
抽象了一直使用的線程模式:同步或者異步僅僅是我們處理的Observable/Stream。
控制錯誤傳遞及停止:錯誤和完成信號及數(shù)據(jù)的有效負(fù)載信號傳遞到鏈中。
在多個預(yù)先定義的api中解決了多個擴展-聚合及其它組合問題。
Reactive擴展的標(biāo)準(zhǔn)Jvm實現(xiàn)是RxJava。它提供了一個功能豐富的Api。
Reactor 2 提供了一個特定模塊實現(xiàn)了Reactive擴展的一部分功能。建議需要使用Reactive stream全部功能的用戶使用RxJava。最后,當(dāng)組合完整的RxJava系統(tǒng)時,用戶可以從Reactor提供的強大的異步和IO的中獲益。
表2:Rx和Reactor stream的不同點:
| Observable | reactor.rx.Stream | Reactive Stream Publisher的實現(xiàn) |
| Operator | reactor.rx.action.Action | Reactive Stream Processor的實現(xiàn) |
| Observable with 1 data at most | reactor.rx.Promise | 返回唯一結(jié)果的類型, ?Reactive Stream Processor實現(xiàn)并提供了可選的異步分發(fā)功能。? |
| Factory API (just, from, merge…?.) | reactor.rx.Streams | 和core模塊的 data-focused 子類一樣, 返回 Stream |
| Functional API (map, filter, take…?.) | reactor.rx.Stream | 和core模塊的data-focused 子類一樣, 返回Stream |
| Schedulers | reactor.core.Dispatcher, org.reactivestreams.Processor | Reactor Stream計算無限制的共享Dispatcher或者有限的Processor的操作。 |
| Observable.observeOn() | Stream.dispatchOn() | 只是dispatcher參數(shù)的一個適配命名。 |
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4589439.html
總結(jié)
以上是生活随笔為你收集整理的reactor官方文档译文(1)Reactor简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Jackson学习二之集合类对象与JSO
- 下一篇: reactor官方文档译文(2)Reac