深入RxJava2 源码解析(二)
本文作者JasonChen,原文地址: http://chblog.me/2018/12/19/rxjava2 源碼解析(一)/
前一篇文章我們講述到RxJava2 的內部設計模式與原理機制,包括觀察者模式和裝飾者模式,其本質上都是RxJava2的事件驅動,那么本篇文章將會講到RxJava2 的另外一個重要功能:異步。
RxJava2 深入解析
依舊是從源碼實現開始,帶著疑惑去讀,前一篇文章我們講到subcribeOn方法內部的實現涉及線程池:Scheduler.Worker w = scheduler.createWorker() 這邊涉及兩個重要組件:
scheduler調度器源碼解析
public final class Schedulers {@NonNullstatic final Scheduler SINGLE;@NonNullstatic final Scheduler COMPUTATION;@NonNullstatic final Scheduler IO;@NonNullstatic final Scheduler TRAMPOLINE;@NonNullstatic final Scheduler NEW_THREAD;一共有如下的五種調度器,分別對應不同的場景,當然企業可以針對自身的場景設置自己的調度器。
- SINGLE,針對單一任務設置的單個定時線程池
- COMPUTATION,針對計算任務設置的定時線程池的資源池(數組)
- IO,針對IO任務設置的單個可復用的定時線程池
- TRAMPOLINE,trampoline翻譯是蹦床(佩服作者的腦洞)。這個調度器的源碼注釋是:任務在當前線程工作(不是線程池)但是不會立即執行,任務會被放入隊列并在當前的任務完成之后執行。簡單點說其實就是入隊然后慢慢線性執行(這里巧妙的方法其實和前面我們所講的回壓實現機制基本是一致的,值得借鑒)
- NEW_THREAD,單個的周期線程池和single基本一致唯一不同的是single對thread進行了一個簡單的NonBlocking封裝,這個封裝從源碼來看基本沒有作用,只是一個marker interface標志接口
computation調度器源碼分析
computation調度器針對大量計算場景,在后端并發場景會更多的用到,那么其是如何實現的呢?接下來帶著疑惑進行源碼分析。
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {// 資源池final AtomicReference<FixedSchedulerPool> pool;// 這是computationScheduler類中實現的createWork()方法public Worker createWorker() {// 創建EventLoop工作者,入參是一個PoolWorkerreturn new EventLoopWorker(pool.get().getEventLoop());}static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {final int cores;// 資源池工作者,每個工作者其實都是一個定時線程池final PoolWorker[] eventLoops;long n;// 對應前面的函數調用public PoolWorker getEventLoop() {int c = cores;if (c == 0) {return SHUTDOWN_WORKER;}// simple round robin, improvements to come// 這里其實就是從工作者數組中輪詢選出一個工作者這里其實擁有提升和優化的空間,這里筆者可能會向開源社區提交一個pr以此進行比較好的調度器調度return eventLoops[(int)(n++ % c)];}// 此處是一個簡單的封裝 static final class PoolWorker extends NewThreadWorker {PoolWorker(ThreadFactory threadFactory) {super(threadFactory);}}public class NewThreadWorker extends Scheduler.Worker implements Disposable {private final ScheduledExecutorService executor;volatile boolean disposed;public NewThreadWorker(ThreadFactory threadFactory) {// 進行定時線程池的初始化executor = SchedulerPoolFactory.create(threadFactory);}public static ScheduledExecutorService create(ThreadFactory factory) {final ScheduledExecutorService exec =// 初始化一個定時線程池Executors.newScheduledThreadPool(1, factory);tryPutIntoPool(PURGE_ENABLED, exec);return exec;}上述代碼清晰的展示了computation調度器的實現細節,這里需要說明的是定時線程池的core設置為1,線程池的個數最多為cpu數量,這里涉及到ScheduledThreadPoolExecutor定時線程池的原理,簡單的說起內部是一個可自動增長的數組(隊列)類似于ArrayList,也就是說隊列永遠不會滿,線程池中的線程數不會增加。
接下來結合訂閱線程和發布線程分析其之間如何進行溝通的本質。
發布線程在上一篇的文章已經提到,內部是一個worker,那么訂閱線程也是么,很顯然必須是的,接下來我們來看下源代碼:
// 還是從subscribeActul開始(原因見上一篇文章) public void subscribeActual(Subscriber<? super T> s) {Worker worker = scheduler.createWorker();if (s instanceof ConditionalSubscriber) {source.subscribe(new ObserveOnConditionalSubscriber<T>((ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));} else {// source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));} }其內部封裝了一個ObserveOnsubcriber,這是個對下流訂閱者的封裝,主要什么作用呢,為什么要這個呢?其實這個涉及訂閱線程內部的機制,接著看源代碼了解其內部機制。
// 基類abstract static class BaseObserveOnSubscriber<T> extends BasicIntQueueSubscription<T>implements FlowableSubscriber<T>, Runnable {private static final long serialVersionUID = -8241002408341274697L;final Worker worker;final boolean delayError;final int prefetch;//...@Overridepublic final void onNext(T t) {if (done) {return;}if (sourceMode == ASYNC) {trySchedule();return;}if (!queue.offer(t)) {upstream.cancel();error = new MissingBackpressureException("Queue is full?!");done = true;}// 開啟訂閱者線程池模式的調度,具體實現在子類中實現trySchedule();}@Overridepublic final void onError(Throwable t) {if (done) {RxJavaPlugins.onError(t);return;}error = t;done = true;trySchedule();}@Overridepublic final void onComplete() {if (!done) {done = true;trySchedule();}}// 這里并沒有向上傳遞request請求,而是把自己當做數據發射者進行request計數@Overridepublic final void request(long n) {if (SubscriptionHelper.validate(n)) {BackpressureHelper.add(requested, n);// 開啟調度trySchedule();}}// 調度代碼final void trySchedule() {// 上一篇文章講過這個的用法if (getAndIncrement() != 0) {return;}// 啟用一個work來進行任務的執行 this對象說明實現了runable接口worker.schedule(this);}// 調度實現的代碼@Overridepublic final void run() {if (outputFused) {runBackfused();} else if (sourceMode == SYNC) {runSync();} else {// 一般會調用runAsync方法runAsync();}}abstract void runBackfused();abstract void runSync();abstract void runAsync();//...}當上游的裝飾者(上一篇提到的裝飾者模式)調用onNext方法時,這時并沒有類似的去調用下游的onNext方法,那這個時候其實就是訂閱者線程模式的核心原理:采用queue隊列進行數據的store,這里嘗試將數據放進隊列。
ObserveOnSubscriber的具體實現類部分實現如下。
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>implements FlowableSubscriber<T> {private static final long serialVersionUID = -4547113800637756442L;final Subscriber<? super T> downstream;ObserveOnSubscriber(Subscriber<? super T> actual,Worker worker,boolean delayError,int prefetch) {super(worker, delayError, prefetch);this.downstream = actual;}//這是上游回調這個subscriber時調用的方法,詳情見上一篇文章@Overridepublic void onSubscribe(Subscription s) {if (SubscriptionHelper.validate(this.upstream, s)) {this.upstream = s;if (s instanceof QueueSubscription) {@SuppressWarnings("unchecked")QueueSubscription<T> f = (QueueSubscription<T>) s;int m = f.requestFusion(ANY | BOUNDARY);if (m == SYNC) {sourceMode = SYNC;queue = f;done = true;downstream.onSubscribe(this);return;} elseif (m == ASYNC) {sourceMode = ASYNC;queue = f;downstream.onSubscribe(this);s.request(prefetch);return;}}// 設置緩存隊列// 這里涉及一個特別之處就是預獲取(提前獲取數據)queue = new SpscArrayQueue<T>(prefetch);// 觸發下游subscriber 如果有request則會觸發下游對上游數據的requestdownstream.onSubscribe(this);// 請求上游數據 上面的代碼和這行代碼就是起到承上啟下的一個作用,也就是預獲取,放在隊列中s.request(prefetch);}}//...下面看一下抽象方法runAsync()的實現。
@Overridevoid runAsync() {int missed = 1;final Subscriber<? super T> a = downstream;final SimpleQueue<T> q = queue;long e = produced;for (;;) {long r = requested.get();while (e != r) {boolean d = done;T v;try {// 獲取數據v = q.poll();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);cancelled = true;upstream.cancel();q.clear();a.onError(ex);worker.dispose();return;}boolean empty = v == null;if (checkTerminated(d, empty, a)) {return;}if (empty) {break;}a.onNext(v);e++;// limit = prefetch - (prefetch >> 2)// prefetch = BUFFER_SIZE(上一篇文章提到的默認128)if (e == limit) {if (r != Long.MAX_VALUE) {r = requested.addAndGet(-e);}upstream.request(e);e = 0L;}}if (e == r && checkTerminated(done, q.isEmpty(), a)) {return;}// 下面的代碼機制在上一篇講過主要涉及異步編程技巧int w = get();if (missed == w) {produced = e;missed = addAndGet(-missed);if (missed == 0) {break;}} else {missed = w;}}}//...}前面說過,訂閱者把自己當成一個發射者,那數/據從哪里來呢,而且還要持續有數據,那么后面的代碼說明了數據來源,當數據達到limit,開始新的數據的prefetch,每次preftch的數量是limit。
為何要將訂閱者這樣區別設置呢,其實原因很簡單,訂閱者和發布者需要不同的線程機制異步地執行,比如訂閱者需要computation的線程機制來進行大量的耗時數據計算,但又要保持一致的裝修者模式,所以源碼的做法是訂閱者這邊打破回調的調用流,采用數據隊列進行兩個線程池之間的數據傳送。
本文總結
筆者喜歡總結,總結意味著我們反思和學習前面的知識點,應用點以及自身的不足。
訂閱最新文章,歡迎關注我的公眾號
總結
以上是生活随笔為你收集整理的深入RxJava2 源码解析(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里短信模板API
- 下一篇: 网上经常看到的冒泡排序的动图如何制作