Java中expecial,RxJava 学习笔记 (一)
作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-12-13】
更新日志
日期
更新內(nèi)容
備注
2017-12-13
RxJava學(xué)習(xí)筆記系列
系列筆記 (一)
2017-12-15
增加系列筆記(二)
2017-12-15 21:36
考慮到RxJava很大程度上用于android開發(fā)中,而我自身不是移動(dòng)開發(fā)者,所以暫時(shí)將RxJava學(xué)習(xí)筆記系列掛起,在未來(lái)需要使用RxJava的時(shí)候再繼續(xù)學(xué)習(xí),并且結(jié)合實(shí)際的應(yīng)用來(lái)學(xué)習(xí)會(huì)收獲更多
掛起
導(dǎo)入
其實(shí)在很早以前就接觸過(guò)RxJava,并且當(dāng)時(shí)學(xué)習(xí)RxJava還有一個(gè)產(chǎn)出:JSwitcher,這是一個(gè)基于RxJava的實(shí)驗(yàn)性框架,對(duì)于該框架的介紹可以參考下面的描述:
JSwitcher is a Convenient tool to switch schedule base on RxJava, and Jswitcher also implement a sample Version Observer/Observable, you can learn how RxJava works from the sample codes. it's easy to switch to Another schedule from current schedule. you just need to care about your bussiness, using 'switchTo' Operator to switch to the longing schedlue when you want to do the work on the suitable schedule. There are some especial schedules for you, like I/O Bound Schedule, Cpu Bound Schedule, And Single Schedule, etc, if you want to create an extra schedule by yourself, it's ok for JSwitcher, and it's very easy to do this .And the most important thing is the jswitch support 'chain operator', that means you can switch to a schedule, then fit on this schedule some works, then you can do switch operator continue from current position, or you can just fit another work on current schedule, and jswitcher has terminal operator like 'waitAndShutdown', after do the operator, you can not do 'chain operator' anymore. and the jswitcher will wait some time and shutdown all of schedule.
該框架將RxJava的核心部分抽離出來(lái)并做了一些簡(jiǎn)化處理,說(shuō)到這里,需要提及一下,將一個(gè)復(fù)雜框架中的某部分抽象出來(lái)看似很簡(jiǎn)單,但是實(shí)際操作起來(lái)還是有一些困難的,并且在實(shí)際操作的過(guò)程中為了不涉及過(guò)多外圍的內(nèi)容時(shí)常需要簡(jiǎn)化,就是將一些依賴外圍的核心部分中的某些內(nèi)容拋棄,但是最為主要的骨架不能丟掉,這樣操作下來(lái)會(huì)對(duì)整個(gè)框架有一定的了解。如果上面的描述激起了你的興趣,可以實(shí)際去閱讀JSwitcher框架代碼,也可以作為快速入門RxJava的學(xué)習(xí)材料,但是該框架存在一些不確定性以及一些待研究正確性的點(diǎn),所以不宜在實(shí)際項(xiàng)目中應(yīng)用。
JSwitcher的核心功能是實(shí)現(xiàn)線程池的切換,并且支持按任務(wù)性質(zhì)(I/O,Compute)來(lái)劃分線程池,切換到合適的線程池可以提交任務(wù),具體的使用可以參考下面的例子:
SwitcherFitter.switcherFitter()
.switchToIoSchedule() //switch to i/o bound schedule
.switchToSingleSchedule() //switch to single schedule
.fit(normalRunner, future1, true) //do the normal runner at current schedule
.switchToComputeSchedule() // switch to cpu bound schedule
.fit(normalRunner, future2, true) // do
.fit(timeoutRunner, future3, true) // do
.switchToSingleSchedule() //switch
.switchToSingleSchedule() //switch
.fit(timeoutRunner, future4, true) //do
.awaitFuturesCompletedOrTimeout(100,
completableFutures, timeoutFutures, 10) //wait for the future
.switchToComputeSchedule() //switch
.fit(() -> {
System.out.println("i am a tester->" + Thread.currentThread().getName());
}) // do the stupid work
.waitAndShutdown(1000); //wait and shutdown !
關(guān)于JSwitcher的設(shè)計(jì),可以參考下面的圖片:
本文作為學(xué)習(xí)RxJava的學(xué)習(xí)筆記的第一篇文章,會(huì)從RxJava的一些核心概念出發(fā),并且從實(shí)際的例子來(lái)梳理RxJava的實(shí)現(xiàn)原理,當(dāng)然,為了閱讀的流暢性,每一篇文章不會(huì)涉及太多的內(nèi)容。需要說(shuō)明的一點(diǎn)是,本文乃至本系列的所有文章都是基于RxJava2,RxJava目前有兩個(gè)版本,一個(gè)是RxJava1,一個(gè)是RxJava2,據(jù)說(shuō)兩個(gè)版本間的差別還是很大的,介于我的學(xué)習(xí)都是基于RxJava2的,并且沒(méi)有接觸過(guò)RxJava1,所以本系列文章不會(huì)涉及RxJava1與RxJava2的對(duì)比內(nèi)容,所有內(nèi)容都是基于RxJava2的。
Observer和Observable
學(xué)習(xí)RxJava之前,你需要了解什么是Reactive,我的理解是應(yīng)該要和傳統(tǒng)的代碼進(jìn)行對(duì)比學(xué)習(xí),我們一般寫代碼都是命令式的,我們希望做什么就做什么,比如我們想下載一張圖片,然后判斷圖片是否下載成功,如果成功了就展示出來(lái),如果沒(méi)有下載成功則使用兜底圖片進(jìn)行展示,如果沒(méi)有兜底圖片則不展示。下面是這個(gè)功能的偽代碼實(shí)現(xiàn):
Image img = EntryDownloadHelper.downloadImageByUrl(url, timeout)
if img is null
then
if FALLBACK_IMG != null
then img = FALLBACK_IMG
if img != null
then
ShowEntryHelper.showImage(img, height, weight)
看起來(lái)很熟悉并且很容易理解,那什么是Reactive的呢?如果使用RxJava來(lái)重寫上面的代碼,則代碼看起來(lái)像下面這樣:
String imgUrl = "xxx";
Image img = null;
Image FALLBACK_IMG = "xxx";
int timeout = 1000;
int height = 100;
int weight = 200;
Observable.create(new ObservableOnSubscribe() {
public void subscribe(ObservableEmitter e) throws Exception {
if (imgUrl == null || imgUrl.isEmpty()) {
e.onNext(FALLBACK_IMG);
} else {
img = EntryDownloadHelper.downloadImageByUrl(imgUrl, timeout);
if (img == null) {
e.onNext(FALLBACK_IMG);
} else {
e.onNext(img);
}
}
e. onComplete();
}
}).subscribe(new Observer() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(Image s) {
if (s != null) {
ShowEntryHelper.showImage(img, height, weight);
}
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
}
});
這只是一個(gè)簡(jiǎn)單的小例子,并沒(méi)有什么使用價(jià)值,并且需要說(shuō)明的一點(diǎn)是,RxJava更適合用于移動(dòng)應(yīng)用的開發(fā),所以如果是做移動(dòng)開發(fā)的話,學(xué)習(xí)RxJava的價(jià)值會(huì)更大,但是在一些其他的開發(fā)過(guò)程中也會(huì)使用到RxJava。
在上面的例子中,出現(xiàn)了兩個(gè)比較關(guān)鍵的對(duì)象,ObServer和Observable,RxJava在實(shí)現(xiàn)Reactive的時(shí)候使用了觀察者設(shè)計(jì)模式,Observable是被觀察者,可以叫數(shù)據(jù)源,也可以叫做生產(chǎn)者,反正就是負(fù)責(zé)生產(chǎn)數(shù)據(jù),并且將數(shù)據(jù)推送出去的東西,而ObServer是觀察者對(duì)象,它會(huì)綁定到一個(gè)Observable上,并且觀察Observable的行為,當(dāng)ObServable觸發(fā)事件的時(shí)候,ObServer會(huì)接收到事件,并且對(duì)相應(yīng)的事件作出相應(yīng)。所以可以將ObServer叫做事件的接收者,也可以叫做事件的消費(fèi)者。有了觀察者和被觀察者,需要將兩個(gè)角色聯(lián)系起來(lái),也就是上面所說(shuō)到的將Observer綁定到Observable上,這個(gè)時(shí)候就需要使用Observable的subscribe方法,叫做訂閱,下面會(huì)詳細(xì)講解Observable是如何將事件傳遞給Observer的。
學(xué)習(xí)一個(gè)新技術(shù)最開始需要做的就是寫一個(gè)demo,并且運(yùn)行起來(lái),然后再繼續(xù)學(xué)習(xí)下去。下面首先寫一個(gè)RxJava的demo,下面的分析將會(huì)基于該demo:
Observable.create(new ObservableOnSubscribe() {
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("test");
e.onComplete();
}
}).subscribe(new Observer() {
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
public void onNext(String s) {
System.out.println("onNext:" + s);
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
System.out.println("onComplete:");
}
});
首先需要?jiǎng)?chuàng)建一個(gè)Observable,可以使用Observable的靜態(tài)方法create,當(dāng)然可以直接new一個(gè)Observable對(duì)象,并且實(shí)現(xiàn)Observable的方法來(lái)實(shí)現(xiàn),就像下面這樣:
Observable observable = new Observable() {
@Override
protected void subscribeActual(Observer super String> observer) {
observer.onNext("ok");
observer.onComplete();
}
};
現(xiàn)在,Observable已經(jīng)有了,下面就需要在該Observable上綁定一個(gè)Observer,就像上面的例子一樣,使用Observable的subscribe方法,需要說(shuō)明的一點(diǎn)是,可以在Observable做非常豐富的聚合操作,可以對(duì)Observable進(jìn)行一系列聚合操作(比如map,filter等操作)之后再綁定Observer,但是本文不會(huì)涉及這些操作的內(nèi)容,這些內(nèi)容將在下一篇該系列的文章中出現(xiàn)。
目前Observable有六個(gè)subscribe方法供Observer選擇:
public final Disposable subscribe()
public final Disposable subscribe(Consumer super T> onNext)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,Action onComplete)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError, Action onComplete, Consumer super Disposable> onSubscribe)
public final void subscribe(Observer super T> observer)
可以選擇這六個(gè)中的任意一個(gè)來(lái)綁定Observer,本文以一個(gè)看起來(lái)較為簡(jiǎn)單的subscribe方法來(lái)分析,也就是上面例子中使用的版本:
public final void subscribe(Observer super T> observer)
下面展示了該方法的詳細(xì)實(shí)現(xiàn)細(xì)節(jié):
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看起來(lái)代碼很多,但是核心代碼就一句:subscribeActual(observer),然后:
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
*
There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer super T> observer);
再看一下new一個(gè)Observable的代碼:
Observable observable = new Observable() {
@Override
protected void subscribeActual(Observer super String> observer) {
// XXX
}
};
也就是說(shuō),subscribe方法中會(huì)調(diào)用Observable的subscribeActual方法,并且將subscribe的參數(shù)(也就是綁定到該Observable的Observer)傳遞給subscribeActual,然后,我們?cè)趕ubscribeActual方法里面對(duì)subscribeActual的參數(shù)observer的操作實(shí)際上就是直接調(diào)用了Observer的方法,所以O(shè)bserver當(dāng)然會(huì)對(duì)響應(yīng)相應(yīng)的事件。
這個(gè)理解起來(lái)不太困難,下面看一下使用Observable的create靜態(tài)方法來(lái)創(chuàng)建Observable的時(shí)候是怎么講一個(gè)Observer綁定到一個(gè)create出來(lái)的Observable上的,回頭看下面的代碼:
Observable.create(new ObservableOnSubscribe() {
public void subscribe(ObservableEmitter e) throws Exception {
// XXX
}
}).subscribe(new Observer() {
// XXX
});
這個(gè)看起來(lái)好像不能像上面那種情況一樣理解,因?yàn)閏reate的參數(shù)是new一個(gè)ObservableOnSubscribe對(duì)象,現(xiàn)在先來(lái)看一下create方法的具體實(shí)現(xiàn)細(xì)節(jié):
public static Observable create(ObservableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}
可以看到,create方法返回的是一個(gè)ObservableCreate對(duì)象,并且將我們的Observable對(duì)象傳遞給了ObservableCreate,這里使用了包裝模式,將Observable包裝成了ObservableCreate對(duì)象。在ObservableCreate類中找到了subscribeActual的實(shí)現(xiàn),而這個(gè)subscribeActual正是實(shí)現(xiàn)了Observable的subscribeActual。所以包裝需要包裝徹底啊。下面是ObservableCreate類的subscribeActual的具體實(shí)現(xiàn):
@Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在subscribeActual內(nèi)部,又對(duì)Observer做了一次包裝,將Observer對(duì)象包裝成了CreateEmitter對(duì)象,為什么呢?因?yàn)樵赾reate方法的參數(shù)中我們new的Observable是一個(gè)ObservableOnSubscribe類型的對(duì)象,而ObservableOnSubscribe的subscribe的參數(shù)需要是CreateEmitter類型的,那我們new出來(lái)的ObservableOnSubscribe到哪去了呢?看下面的構(gòu)造函數(shù):
final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
可以看到,我們new出來(lái)的ObservableOnSubscribe被保存在source字段中,在來(lái)看ObservableCreate類的subscribeActual方法,其中有關(guān)鍵的一句話:source.subscribe(parent),source是Observable,parent是Observer,只是Observer和Observable都是被包裝了一層的。如果想具體了解到底是怎么包裝的,可以參考CreateEmitter類,也可以借助這個(gè)機(jī)會(huì)學(xué)習(xí)一下包裝模式,還是比較有用的。
本文是對(duì)RxJava學(xué)習(xí)筆記系列的第一篇文章,內(nèi)容淺顯易懂,沒(méi)有涉及太多的內(nèi)容,主要分析了一下RxJava中的兩個(gè)重要的對(duì)象,Observable和Observer,并且梳理了一下一個(gè)Observer是如何綁定到一個(gè)Observable上的,當(dāng)然,這是學(xué)習(xí)RxJava的基礎(chǔ)內(nèi)容,如果對(duì)這一部分內(nèi)容都不清楚的話,還需要繼續(xù)學(xué)習(xí)一下,本文涉及到兩個(gè)設(shè)計(jì)模式,一個(gè)是觀察者模式,一個(gè)是包裝模式,結(jié)合具體的例子來(lái)看還是很好理解的。本文開頭還介紹了一下JSwitcher,對(duì)于學(xué)習(xí)RxJava還是比較有幫助的。下面簡(jiǎn)單做一下RxJava學(xué)習(xí)筆記系列的文章計(jì)劃:
《RxJava學(xué)習(xí)筆記 (一)》 : 了解RxJava中的Observable和Observer,并且明白如何實(shí)現(xiàn)訂閱
《RxJava學(xué)習(xí)筆記 (二)》 : RxJava中Observable豐富的聚合操作支持的學(xué)習(xí)筆記
《RxJava學(xué)習(xí)筆記 (三)》 : RxJava2中的線程切換學(xué)習(xí)筆記
《RxJava學(xué)習(xí)筆記 (四)》 : RxJava Flowable學(xué)習(xí)
暫時(shí)定這幾部分內(nèi)容,在總結(jié)過(guò)程中如果發(fā)現(xiàn)還有什么內(nèi)容需要補(bǔ)充的時(shí)候會(huì)進(jìn)行補(bǔ)充更新。
掃碼入群
總結(jié)
以上是生活随笔為你收集整理的Java中expecial,RxJava 学习笔记 (一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 本地php后台密码恢复默认,找回word
- 下一篇: 试验设计与matlab数据分析 下载,试