图解RxJava2(一)
前言
從這篇文章開始,系統地學習RxJava2設計思想和源碼實現。說起大熱門RxJava,網上有很多例如響應式編程、觀察者模式等介紹,也有一些優秀的文章以上、下游等概念引初學者入門,在初步學習之后,可能感覺有所收獲,但是總覺得不夠解渴,要真正知曉其原理,還得結合源碼加深理解。
例子
通過生活中的幾個角色來學習RxJava2:飯店、廚師、服務員、顧客。
模擬一個情景:飯店有一個很火的套餐,顧客來店默認就要這個套餐(不存在服務員咨詢顧客要什么的過程),所以情況應該是這樣的
上面的漫畫寫成RxJava2就是很多入門文章中看到的:事件發起者(上游)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {System.out.println("服務員從廚師那取得 扁食");e.onNext("扁食");System.out.println("服務員從廚師那取得 拌面");e.onNext("拌面");System.out.println("服務員從廚師那取得 蒸餃");e.onNext("蒸餃");System.out.println("廚師告知服務員菜上好了");e.onComplete();}}); 復制代碼事件接收者(下游)
Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("來個沙縣套餐!!!");}@Overridepublic void onNext(String s) {System.out.println("服務員端給顧客 " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {System.out.println("服務員告訴顧客菜上好了");}}; 復制代碼建立聯系
observable.subscribe(observer); 復制代碼打印如下:
來個沙縣套餐!!! 服務員從廚師那取得 拌面 服務員端給顧客 拌面 服務員從廚師那取得 扁食 服務員端給顧客 扁食 服務員從廚師那取得 蒸餃 服務員端給顧客 蒸餃 廚師告知服務員菜上好了 服務員告訴顧客菜上好了 復制代碼下面把一些類代入角色結合源碼分析,演員表
源碼分析
最初看源碼的時候容易因為各個類名字起得很相似而看暈,因此先把涉及到的類之間的關系畫出來
Observable 是個抽象類,其子類是 ObservableCreate ,如果把 Observable 比成飯店,那 ObservableCreate 就是沙縣小吃,看下 Observable 的 create 方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 復制代碼Observable 的 create 方法只是將接收的 ObservableOnSubscribe 作為參數傳遞給子類 ObservableCreate 真正實例化
public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;} ... } 復制代碼上面這些代碼就是漫畫的第一格:飯店要開張(Observable.create),開張的前提是要有一個會做菜的廚師(new ObservableOnSubscribe),接著飯店起名叫沙縣小吃(new ObservableCreate),并把這個廚師和沙縣小吃建立聯系(this.source = source)。廚師有了,但是他并沒有立即開始做菜(ObservableOnSubscribe.subscribe()),這個也很好理解,現實生活中廚師也是這樣,他做不做菜取決于飯店,畢竟是飯店給他開工資;而飯店是否讓廚師做菜很大一個原因取決于有沒有顧客上門,看下顧客:
顧客沒有什么套路,上菜就吃(onNext),菜上完或菜出問題會有相應的提醒(onComplete/onError),對應上面漫畫2。接著看飯店接客 observable.subscribe(observer) 的源碼
public final void subscribe(Observer<? super T> observer) {//省略部分代碼subscribeActual(observer);//省略部分代碼 }protected abstract void subscribeActual(Observer<? super T> observer); 復制代碼Observable(飯店) 的 subscribe 方法又會調用 subscribeActual 方法,該方法是個抽象方法,具體實現在子類,看看子類 ObservableCreate(沙縣小吃)
protected void subscribeActual(Observer<? super T> observer) {//步驟①CreateEmitter<T> parent = new CreateEmitter<T>(observer);//步驟②observer.onSubscribe(parent);try {//步驟③source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);} } 復制代碼先看下涉及到的類以及所屬關系
步驟① Emitter 翻譯為發射器,這里名字起得也很形象 CreateEmitter(創建發射器) ,即對應服務員,CreateEmitter 創建的時候接收 Observer,就像一個服務員接待一個顧客一樣(對應漫畫3服務員說話)
步驟② 執行 onSubscribe 方法并接收 CreateEmitter ,所以看到 log 中最先打印該方法的內容,就像顧客認準之后自己的菜是由這個服務員上的(對應漫畫3顧客說話)
步驟③ 調用 ObservableOnSubscribe.subscribe ,并接收 CreateEmitter ,就像廚師和該服務員建立聯系,之后廚師做的菜都由該服務員端出去。上什么菜取決于 ObservableOnSubscribe.subscribe 中的實現。
分析到這里發現 CreateEmitter(服務員) 起到樞紐作用,看下代碼中 e.onNext/e.onComplete 的實現
public void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}public void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}} 復制代碼onNext 中首先判空,服務員端個空盤子出來要被顧客錘成麻瓜;接著發送之前需要執行 isDisposed() 判斷,可以理解成顧客是否還需要菜,默認情況下是需要的(!isDisposed() 為 true ),當執行完 onComplete() 方法后會執行 dispose() ,表明顧客不再需要菜了,后續的菜服務員不會再端上來給顧客了。
Observer<String> observer = new Observer<String>() {Disposable disposable;@Overridepublic void onSubscribe(Disposable d) {this.disposable = d;System.out.println("來個沙縣套餐!!!");}@Overridepublic void onNext(String s) {if (s.equals("拌面")) {disposable.dispose();}System.out.println("服務員端給顧客 " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {System.out.println("服務員告訴顧客菜上好了");} }; 復制代碼打印如下:
來個沙縣套餐!!! 服務員從廚師那取得 拌面 服務員端給顧客 拌面 服務員從廚師那取得 扁食 服務員從廚師那取得 蒸餃 廚師告知服務員菜上好了 復制代碼從上面可以看到一旦執行完 Disposable.dispose() 方法,顧客和服務員就沒有后續交流了,就像 Disposable 翻譯的那樣「一次性」,理解成顧客對服務員說「后續的菜都別上了,你也不要再出現在我面前」;但是服務員和廚師的交流還是保持著,默認情況下廚師并不知道顧客不需要菜了,因此他還是繼續做菜,然后交給服務員端出去。當然我們也可以在廚師做下一道菜的之前,判斷下顧客還要不要:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {if (!e.isDisposed()) {System.out.println("服務員從廚師那取得 拌面");e.onNext("拌面");}if (!e.isDisposed()) {System.out.println("服務員從廚師那取得 扁食");e.onNext("扁食");}if (!e.isDisposed()) {System.out.println("服務員從廚師那取得 蒸餃");e.onNext("蒸餃");}if (!e.isDisposed()) {System.out.println("廚師告知服務員菜上好了");e.onComplete();}} }); 復制代碼打印如下:
來個沙縣套餐!!! 服務員從廚師那取得 拌面 服務員端給顧客 拌面 復制代碼再看下另一種情況
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {System.out.println("服務員從廚師那取得 拌面");e.onNext("拌面");System.out.println("服務員從廚師那取得 扁食");e.onNext("扁食");System.out.println("服務員從廚師那取得 蒸餃");e.onNext("蒸餃");System.out.println("廚師告知服務員菜上好了");e.onComplete();} }); observable.subscribe(); 復制代碼打印如下
服務員從廚師那取得 拌面 服務員從廚師那取得 扁食 服務員從廚師那取得 蒸餃 廚師告知服務員菜上好了 復制代碼上面分析了要有顧客廚師才會做菜,這都沒顧客怎么也做菜呢?看下源碼
public final Disposable subscribe() {return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); }public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {//省略判空//默認的顧客LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls; }public final void subscribe(Observer<? super T> observer) {subscribeActual(observer); }protected abstract void subscribeActual(Observer<? super T> observer); 復制代碼原來系統會默認創建一個 LambdaObserver(默認顧客) ,服務員從廚師那端的菜會傳給這個顧客。所以可以看出廚師做不做菜只取決于飯店(Observable.subscribe),后面的流程和上面分析的一致。另外上面的代碼還出現了Consumer、Action類,這些類里也有對事件的處理,可以理解成顧客選擇接收服務員的哪些信息,在 functions 包下還有其他實現
subscribe() 有下面幾個重載方法:
//顧客只關心上什么菜 public final Disposable subscribe() {}public final Disposable subscribe(Consumer<? super T> onNext) {}//顧客關心上什么菜以及菜是不是出問題 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} //顧客關心上什么菜、菜是不是有問題、菜是不是上完了 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}//顧客關心上什么菜、菜是不是有問題、菜是不是上完了、 //onSubscribe()中可以獲取Disposable引用,之后選擇告訴服務員是否繼續上菜 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}//由顧客自己決定關心哪些事件,和上一條效果一樣 public final void subscribe(Observer<? super T> observer) {} 復制代碼如果顧客只關心上什么菜,我們可以這么寫:
Consumer<String> consumer = new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("服務員端給顧客 " + s);} }; observable.subscribe(consumer); 復制代碼打印如下:
服務員從廚師那取得 拌面 服務員端給顧客 拌面 服務員從廚師那取得 扁食 服務員端給顧客 扁食 服務員從廚師那取得 蒸餃 服務員端給顧客 蒸餃 廚師告知服務員菜上好了 復制代碼總結
以上是生活随笔為你收集整理的图解RxJava2(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python的类型 变量 数值和字符串
- 下一篇: 前端SEO