一起来造一个RxJava,揭秘RxJava的实现原理
RxJava是一個神奇的框架,用法很簡單,但內部實現有點復雜,代碼邏輯有點繞。我讀源碼時,確實有點似懂非懂的感覺。網上關于RxJava源碼分析的文章,源碼貼了一大堆,代碼邏輯繞來繞去的,讓人看得云里霧里的。既然用拆輪子的方式來分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴展性有關的代碼剔除,留下核心代碼帶大家揭秘RxJava的實現原理。
什么是響應式編程
首先,我們需要明確,RxJava是Reactive Programming在Java中的一種實現。什么是響應式編程??
用一個字來概括就是流(Stream)。Stream 就是一個按時間排序的 Events 序列,它可以放射三種不同的 Events:(某種類型的)Value、Error 或者一個” Completed” Signal。通過分別為 Value、Error、”Completed”定義事件處理函數,我們將會異步地捕獲這些 Events。基于觀察者模式,事件流將從上往下,從訂閱源傳遞到觀察者。
至于使用Rx框架的優點,它可以避免回調嵌套,更優雅地切換線程實現異步處理數據。配合一些操作符,可以讓處理事件流的代碼更加簡潔,邏輯更加清晰。
搭建大體的框架
要造一座房子,首先要把大體的框架搭好。在RxJava里面,有兩個必不可少的角色:Subscriber(觀察者) 和 Observable(訂閱源)。
Subscriber(觀察者)
Subsribler在RxJava里面是一個抽象類,它實現了Observer接口。
?
public interface Observer<T> {void onCompleted();void onError(Throwable t);void onNext(T var1); }?
為了盡可能的簡單,將Subscriber簡化如下:
?
public abstract class Subscriber<T> implements Observer<T> {public void onStart() {} }
Observable(訂閱源)
?
Observable(訂閱源)在RxJava里面是一個大而雜的類,擁有很多工廠方法和各式各樣的操作符。每個Observable里面有一個OnSubscribe對象,只有一個方法(void call(Subscriber<? super T> subscriber);),用來產生數據流,這是典型的命令模式。
public class Observable<T> {final OnSubscribe<T> onSubscribe;private Observable(OnSubscribe<T> onSubscribe) {this.onSubscribe = onSubscribe;}public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {return new Observable<T>(onSubscribe);}public void subscribe(Subscriber<? super T> subscriber) {subscriber.onStart();onSubscribe.call(subscriber);}public interface OnSubscribe<T> {void call(Subscriber<? super T> subscriber);} }?
?
?
實踐
到此,一個小型的RxJava的雛形就出來了。不信?我們來實踐一下吧
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 10; i++) {subscriber.onNext(i);}}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable t) {}@Overridepublic void onNext(Integer var1) {System.out.println(var1);}});?
?
?
添加操作符
其實,強大的RxJava的核心原理并沒有想象中那么復雜和神秘,運用的就是典型的觀察者模式。有了基本雛形之后,我們繼續為這個框架添磚加瓦吧。RxJava之所以強大好用,與其擁有豐富靈活的操作符是分不開的。那么我們就試著為這個框架添加一個最常用的操作符:map。
那么RxJava是如何實現操作符的呢?其實,每調用一次操作符的方法,就相當于在上層數據源和下層觀察者之間橋接了一個新的Observable。橋接的Observable內部會實例化有新的OnSuscribe和Subscriber。OnSuscribe負責接受目標Subscriber傳來的訂閱請求,并調用源Observable.OnSubscribe的subscribe方法。源Observable.OnSubscribe將Event往下發送給橋接Observable.Subscriber,最終橋接Observable.Subscriber將Event做相應處理后轉發給目標Subscriber。流程如下圖所示:
接著,我們用代碼實現這一過程。在Observable類里面添加如下代碼:
public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {return create(new OnSubscribe<R>() { // 生成一個橋接的Observable和 OnSubscribe@Overridepublic void call(Subscriber<? super R> subscriber) {Observable.this.subscribe(new Subscriber<T>() { // 訂閱上層的Observable@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable t) {subscriber.onError(t);}@Overridepublic void onNext(T var1) {// 將上層的onSubscribe發送過來的Event,通過轉換和處理,轉發給目標的subscribersubscriber.onNext(transformer.call(var1));}});}});}public interface Transformer<T, R> {R call(T from);}?
?
?
map操作符的作用是將T類型的Event轉化成R類型,轉化策略抽象成Transformer<T, R>(RxJava中用的是Func1<T, R>,但為了便于理解,起了一個有意義的名字)這一個函數接口,由外部傳入。
上面代碼中使用到一些泛型的通配符,有些地方使用了super,有些地方使用了extends,其實這是有講究的,傳給Transformer#call方法的參數是T類型的,那么call方法的參數類型可以聲明成是T的父類,Transformer#call方法的返回值要求是R類型的,那么它的返回值類型應該聲明成R的子類。如果大家不能理解,也可以不用在意這些細節。
那么我們一起來測試一下吧。
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 10; i++) {subscriber.onNext(i);}}}).map(new Observable.Transformer<Integer, String>() {@Overridepublic String call(Integer from) {return "maping " + from;}}).subscribe(new Subscriber<String>() {@Overridepublic void onNext(String var1) {System.out.println(var1);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable t) {}});?
?
?
但是,我們看到map()方法內內部類有點多,代碼缺少拓展性和可讀性,我們應該進行適當地重構,將主要的邏輯抽離成獨立的模塊,并保證模塊間盡量解耦,否則Observable只會越來越臃腫。
public <R> Observable<R> map(Transformer<? super T, ? extends R> transformer) {return create(new MapOnSubscribe<T, R>(this, transformer));} public class MapOnSubscribe<T, R> implements Observable.OnSubscribe<R> {final Observable<T> source;final Observable.Transformer<? super T, ? extends R> transformer;public MapOnSubscribe(Observable<T> source, Observable.Transformer<? super T, ? extends R> transformer) {this.source = source;this.transformer = transformer;}@Overridepublic void call(Subscriber<? super R> subscriber) {source.subscribe(new MapSubscriber<R, T>(subscriber, transformer));} } public class MapSubscriber<T, R> extends Subscriber<R> {final Subscriber<? super T> actual;final Observable.Transformer<? super R, ? extends T> transformer;public MapSubscriber(Subscriber<? super T> actual, Observable.Transformer<? super R, ? extends T> transformer) {this.actual = actual;this.transformer = transformer;}@Overridepublic void onCompleted() {actual.onCompleted();}@Overridepublic void onError(Throwable t) {actual.onError(t);}@Overridepublic void onNext(R var1) {actual.onNext(transformer.call(var1));} }?
?
?
添加線程切換功能
RxJava中最激動人心的功能是異步處理,能夠自如地切換線程。利用?subscribeOn()?結合?observeOn()?來實現線程控制,讓事件的產生和消費發生在不同的線程。?observeOn()?可以多次調用,實現了線程的多次切換,最終目標Subscriber的執行線程與最后一次observeOn()的調用有關。但subscribeOn()?多次調用只有第一個subscribeOn()?起作用。為什么呢?因為?observeOn()?作用的是Subscriber,而subscribeOn()?作用的是OnSubscribe。
這里借用扔物線的圖:
簡單地調用一個方法就可以完成線程的切換,很神奇對吧。RxJava是如何實現的呢?除了橋接Observable以外,RxJava還用到一個很關鍵的類—Scheduler(調度器)。文檔中給Scheduler的定義是:A Scheduler is an object that schedules units of work.,也就是進行任務的調度的一個東西。Scheduler里面有一個重要的抽象方法:
public abstract Worker createWorker();?
?
?
Worker是Scheduler的內部類,它是具體任務的執行者。當要提交任務給Worker執行需要調用Worker的schedule(Action0 aciton)方法。
public abstract Subscription schedule(Action0 action);?
?
?
要獲得一個Scheduler并不需要我們去new,一般是調用Schedulers的工廠方法。
public final class Schedulers {private final Scheduler computationScheduler;private final Scheduler ioScheduler;private final Scheduler newThreadScheduler;public static Scheduler io() {return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);}public static Scheduler computation() {return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);}... }?
?
?
具體的Scheduler的實現類就不帶大家一起看了,但我們需要知道,能做到線程切換的關鍵Worker的schedule方法,因為它會把傳過來的任務放入線程池,或新線程中執行,這取決于具體Scheduler的實現。
自定義Scheduler
那么,下面我們先來自定義一個簡單的Scheduler和Worker。
public class Scheduler {final Executor executor;public Scheduler(Executor executor) {this.executor = executor;}public Worker createWorker() {return new Worker(executor);}public static class Worker {final Executor executor;public Worker(Executor executor) {this.executor = executor;}// 這里接受的是Runnable而不是Action0,其實這沒什么關系,主要是懶得自定義函數式接口了。public void schedule(Runnable runnable) {executor.execute(runnable);}} }?
?
?
為了達到高仿效果,我們也提供相應的工廠方法。
public class Schedulers {private static final Scheduler ioScheduler = new Scheduler(Executors.newSingleThreadExecutor());public static Scheduler io() {return ioScheduler;} }?
?
?
實現subscribeOn
subscribeOn是作用于上層OnSubscribe的,可以讓OnSubscribe的call方法在新線程中執行。
因此,在Observable類里面,添加如下代碼:
public Observable<T> subscribeOn(Scheduler scheduler) {return Observable.create(new OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onStart();// 將事件的生產切換到新的線程。scheduler.createWorker().schedule(new Runnable() {@Overridepublic void run() {Observable.this.onSubscribe.call(subscriber);}});}});}?
?
?
測試一下:
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {System.out.println("OnSubscribe@ "+Thread.currentThread().getName()); //new Threadsubscriber.onNext(1);}}).subscribeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {...@Overridepublic void onNext(Integer var1) {System.out.println("Subscriber@ "+Thread.currentThread().getName()); // new ThreadSystem.out.println(var1);}});?
?
?
實現observeOn
subscribeOn是作用于下層Subscriber的,需要讓下層Subscriber的事件處理方法放到新線程中執行。
為此,在Observable類里面,添加如下代碼:
public Observable<T> observeOn(Scheduler scheduler) {return Observable.create(new OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onStart();Scheduler.Worker worker = scheduler.createWorker();Observable.this.onSubscribe.call(new Subscriber<T>() {@Overridepublic void onCompleted() {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onCompleted();}});}@Overridepublic void onError(Throwable t) {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onError(t);}});}@Overridepublic void onNext(T var1) {worker.schedule(new Runnable() {@Overridepublic void run() {subscriber.onNext(var1);}});}});}});}?
?
?
測試一下:
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {System.out.println("OnSubscribe@ " + Thread.currentThread().getName()); // mainsubscriber.onNext(1);}}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {...@Overridepublic void onNext(Integer var1) {System.out.println("Subscriber@ " + Thread.currentThread().getName()); // new ThreadSystem.out.println(var1);}});?
?
?
在Android中切換線程
經過以上實踐,我們終于知道了RxJava線程切換的核心原理了。下面我們順便來看看Android里面是如何進行線程切換的。
首先找到AndroidSchedulers,發現一個Scheduler的具體實現類:LooperScheduler。
private AndroidSchedulers() {...mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());...}/** A {@link Scheduler} which executes actions on the Android UI thread. */public static Scheduler mainThread() {return getInstance().mainThreadScheduler;}?
?
?
LooperScheduler的代碼很清晰,內部持有一個Handler,用于線程的切換。在Worker的schedule(Action0 action,...)方法中,將action通過Handler切換到所綁定的線程中執行。
class LooperScheduler extends Scheduler {private final Handler handler;LooperScheduler(Looper looper) {handler = new Handler(looper);}LooperScheduler(Handler handler) {this.handler = handler;}@Overridepublic Worker createWorker() {return new HandlerWorker(handler);}static class HandlerWorker extends Worker {private final Handler handler;...@Overridepublic Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {...action = hook.onSchedule(action);ScheduledAction scheduledAction = new ScheduledAction(action, handler);Message message = Message.obtain(handler, scheduledAction);message.obj = this; // Used as token for unsubscription operation.handler.sendMessageDelayed(message, unit.toMillis(delayTime));...return scheduledAction;}@Overridepublic Subscription schedule(final Action0 action) {return schedule(action, 0, TimeUnit.MILLISECONDS);}}static final class ScheduledAction implements Runnable, Subscription {private final Action0 action;private final Handler handler;private volatile boolean unsubscribed;...@Override public void run() {try {action.call();} ...}...} }?
?
?
結語
就這樣,以上用代碼演示了RxJava一些核心功能是如何實現的,希望能給大家帶來不一樣的啟發。但這只是一個小小的Demo,離真正能運用于工程的Rx框架還差太遠。這也讓我們明白到,一個健壯的框架,需要考慮太多東西,比如代碼的可拓展性和可讀性,性能優化,可測試性,兼容性,極端情況等等。但有時要想深入理解一個復雜框架的實現原理,就需要剝離這些細節代碼,多關注主干的調用邏輯,化繁為簡。
Demo代碼可到Github獲取:https://github.com/TellH/RxJavaDemo/tree/master/src/my_rxjava
參考&拓展
- https://mp.weixin.qq.com/s?__biz=MzI1MTA1MzM2Nw==&mid=2649796857&idx=1&sn=ed8325aeddac7fd2bd81a0717c010e98&scene=1&srcid=0817o3Xzkx4ILR6FKaR1M9LX#rd
- https://gank.io/post/560e15be2dca930e00da1083
- https://zhuanlan.zhihu.com/p/22338235
總結
以上是生活随笔為你收集整理的一起来造一个RxJava,揭秘RxJava的实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微服务部署:蓝绿部署、滚动部署、灰度发布
- 下一篇: 大家好,我是区块链本人。今天,我要给你们