java品酒会,我学 rxjava 2(3)- 热发射
這篇文章離上一篇文章有些時(shí)日了,概因最難心情大大的不好,非常不爽。
為啥我會(huì)專門寫一下熱發(fā)射呢,因?yàn)?RxBus 就是使用 RxJava 的熱發(fā)射(Subject)實(shí)現(xiàn)的,但是呢我的出發(fā)點(diǎn)不同,我是因?yàn)檠芯苛?AAC 的 LivaData 之后才有感而發(fā)的,同樣是響應(yīng)式編程,我們?nèi)绾问褂?RxJava 實(shí)現(xiàn)構(gòu)建數(shù)據(jù)和 UI 之間的通道,做到動(dòng)態(tài)更新數(shù)據(jù)呢
先來回顧一下 LivaData 的使用,這是介紹 AAC 組件:Android Architecture Components 開發(fā)架構(gòu) 中 demo 的數(shù)據(jù)流程圖
數(shù)據(jù)流程圖
UI 層拿到 modle層拋出的 livedata(Observable) ,在其上注冊(cè)更新 UI 的方法,然后 modle層在更新數(shù)據(jù)的時(shí)候直接調(diào)用 livedata.setValue 方法就可以更新 UI 層的數(shù)據(jù)了
這其實(shí)就是 RxJava 中熱發(fā)射的典型使用,所以我們真的有必要去學(xué)習(xí)一下了
先來概念
有熱必然有冷,那何為熱,何為冷,這點(diǎn)很重要的:
冷發(fā)射 寫
Observable 在收到 Observer 注冊(cè)時(shí)就發(fā)送數(shù)據(jù),此為冷,我們平時(shí)使用的 RxJava 方式都是冷發(fā)射
熱發(fā)射
Observable 在收到 Observer 注冊(cè)時(shí)只是建立了相互關(guān)系,可以稱之為管道。之后 Observable 可以在需要的任何時(shí)候發(fā)射任意數(shù)據(jù)。
細(xì)細(xì)品鑒
冷發(fā)射經(jīng)典應(yīng)用如下
Observable.just(1, 2, 3)
.subscribe(integer -> {
......
});
冷發(fā)射的數(shù)據(jù)固定,要不是固定的數(shù)據(jù),要不就是一個(gè)固定的從遠(yuǎn)程 IP 獲取的數(shù)據(jù),是相對(duì)寫死的
冷發(fā)射在注冊(cè)時(shí)即開始發(fā)射數(shù)據(jù),我們不能決定發(fā)射數(shù)據(jù)的時(shí)機(jī)和地點(diǎn)
熱發(fā)射經(jīng)典應(yīng)用如下
// 創(chuàng)建熱發(fā)射 Observable
PublishSubject subject = PublishSubject.create();
Disposable disposable = subject.subscribe(s -> show(s));
// 動(dòng)態(tài)發(fā)送數(shù)據(jù)
subject.onNext("響應(yīng)式編程");
// 中斷管道,接觸注冊(cè)關(guān)系
disposable.dispose();
熱發(fā)射我們可以決定發(fā)射時(shí)機(jī),地點(diǎn),數(shù)據(jù),靈活可以實(shí)現(xiàn)和 livadata 相同的效果
可以解除單個(gè)注冊(cè)
熱發(fā)射核心 Subject
Subject 這個(gè)東西既可以當(dāng) Observable 用,上面我們已經(jīng)看到了,也可以當(dāng) Observer 用,接受別的 Observable 的數(shù)據(jù)
我們先來看看 Subject 的幾個(gè)實(shí)現(xiàn)類,然后才好往下繼續(xù)
PublishSubject
該Subject不會(huì)改變事件的發(fā)送順序。
如果在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的observer,
是不會(huì)收到之前發(fā)送的事件。
private void doPublishSubject() {
//將事件發(fā)送到observer,如果先前已經(jīng)漏掉的事件,不會(huì)重新發(fā)送到后注冊(cè)的observer上
PublishSubject publish = PublishSubject.create();
publish.subscribe(new PublishObserver("first"));
publish.onNext("1");
publish.onNext("2");
publish.subscribe(new PublishObserver("seconde"));
publish.onNext("3");
publish.onCompleted();
}
BehaviorSubject
該類有創(chuàng)建時(shí)需要一個(gè)默認(rèn)參數(shù),該默認(rèn)參數(shù)會(huì)在subject未發(fā)送過其他的事件時(shí),向注冊(cè)的observer發(fā)送。
注意看代碼注釋
private void doBehaviorSubject() {
//將事件發(fā)送到observer,如果先前已經(jīng)漏掉的事件,除了最近的一個(gè)事件以外,
//其他相關(guān)事件不會(huì)重新發(fā)送到后注冊(cè)的observer上。所以需要帶默認(rèn)值,
//第一次被observer注冊(cè)時(shí),observable中沒有內(nèi)容的時(shí)候,就會(huì)將默認(rèn)值發(fā)給observer
BehaviorSubject behavior = BehaviorSubject.create("創(chuàng)建beahavior時(shí)候帶的消息");
behavior.subscribe(new SubjectObserver("first"));
behavior.onNext("1");
behavior.onNext("2");
behavior.subscribe(new SubjectObserver("seconde"));
behavior.onNext("3");
behavior.onCompleted();
}
ReplaySubject
將事件發(fā)送到observer,無論什么時(shí)候注冊(cè)observer,
無論何時(shí)通過該observable發(fā)射的所有事件,均會(huì)發(fā)送給新的observer。
private void doReplaySubject() {
//將事件發(fā)送到observer,無論什么時(shí)候注冊(cè)observer,
//無論何時(shí)通過該observable發(fā)射的所有事件,均會(huì)發(fā)送給新的observer。
ReplaySubject replay = ReplaySubject.create();
replay.subscribe(new SubjectObserver("first"));
replay.onNext("1");
replay.onNext("2");
replay.subscribe(new SubjectObserver("seconde"));
replay.onNext("3");
replay.onCompleted();
}
AsyncSubject
只有當(dāng)subject調(diào)用onComplete方法時(shí),才會(huì)將subject中的最后一個(gè)事件傳遞給observer。
如果不調(diào)用onComplete方法,則不會(huì)給observer發(fā)送任何事件。
private void doAsyncSubject() {
//只會(huì)有當(dāng)subject調(diào)用onComplete方法時(shí),才會(huì)將subject中的最后一個(gè)事件傳遞給observer。
//如果不調(diào)用onComplete方法,則不會(huì)向observer中發(fā)送任何事件
AsyncSubject async = AsyncSubject.create();
async.subscribe(new SubjectObserver("first"));
async.onNext("1");
async.onNext("2");
async.onNext("3");
async.onCompleted();
async.subscribe(new SubjectObserver("seconde"));
async.onCompleted();
}
Subject 作為Observer ,注冊(cè)到一個(gè)冷發(fā)射的 Observable 上面
注意此時(shí)我們只能使用 ReplaySubject
因?yàn)槔浒l(fā)射注冊(cè)既發(fā)射數(shù)據(jù),所有這個(gè) Subject 在注冊(cè)到冷發(fā)射的 Observable 上時(shí)就會(huì)接受這個(gè)冷發(fā)射的 Observable 的數(shù)據(jù),然后繼續(xù)向下傳遞
ReplaySubject subject = ReplaySubject.create();
subject.subscribe(s -> show(s));
Observable observable = Observable.just("AA");
observable.subscribe(subject);
Subject 作為Observer ,注冊(cè)到一個(gè)熱發(fā)射的 Observable 上面
這是既沒有類型限制了
我們添加一個(gè)變換進(jìn)來,要不然沒啥意義,這個(gè)其實(shí)和 livadata 天劍 transform 一個(gè)思路
PublishSubject subject1 = PublishSubject.create();
PublishSubject subject2 = PublishSubject.create();
subject1.map(new Function() {
@Override
public String apply(String s) throws Exception {
return s + "_經(jīng)過2的修改了";
}
}).subscribe(subject2);
subject2.subscribe(s -> show(s));
subject1.onNext("響應(yīng)式編程");
ConnnectableObservbale
我們可以把一個(gè)冷發(fā)射轉(zhuǎn)為熱發(fā)射,使用 publish
ConnectableObservable source = Observable
.just("Alpha","Beta","Delta","Gamma","Epsilon")
.publish();
source.subscribe(s -> System.out.println("observer1 RECEIVED: " + s));
source.map(String::length)
.subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
//發(fā)射!
source.connect();
不過我覺得這個(gè) ConnnectableObservbale 沒啥發(fā)用,寫死數(shù)據(jù)的熱發(fā)射沒太大應(yīng)用價(jià)值。
代碼特征
PublishSubject 我想大家肯定有一些疑問這里我測(cè)試過直接上答案
val subject = PublishSubject.create()
val observable = subject.map {
return@map "BB"
}
observable.subscribe {
Log.d("AA", "收到數(shù)據(jù)1:$it")
}
observable.subscribe {
Log.d("AA", "收到數(shù)據(jù)2:$it")
}
subject.onNext("AA")
PublishSubject 變換之后雖然我們拿到的是 Observable,但是 PublishSubject 特性不會(huì)丟失,為啥?因?yàn)閿?shù)據(jù)源頭不會(huì)像 Observable 一樣由注冊(cè)的就觸發(fā)數(shù)據(jù),這個(gè) Observable 的源頭還是 PublishSubject ,Observable 求到的只是銜接作用
總結(jié)
以上是生活随笔為你收集整理的java品酒会,我学 rxjava 2(3)- 热发射的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java图形界面猜字游戏,java程序,
- 下一篇: posixkill php,在linux