Android RxJava 3.x 使用总结
轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/106720158
本文出自【趙彥軍的博客】
文章目錄
- 依賴接入
- Flowable
- Single
- Maybe
- BackpressureStrategy
- 線程切換
- concat
- 例子1
依賴接入
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation "io.reactivex.rxjava3:rxjava:3.0.4"Flowable
//java 方式 Flowable.just(1).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Throwable {}});//或者用 Lambda 簡寫 Flowable.just(1).subscribe( it -> {}, throwable -> {});range 一組序列數據
Flowable.range(0, 4).subscribe(it -> {//結果 0 1 2 3}, throwable -> {});Single
Single只發射單個數據或錯誤事件,即使發射多個數據,后面發射的數據也不會處理。
只有 onSuccess 和 onError事件,沒有 onNext 、onComplete事件。
SingleEmitter
public interface SingleEmitter<@NonNull T> {void onSuccess(@NonNull T t);void onError(@NonNull Throwable t);void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();boolean tryOnError(@NonNull Throwable t);}示例1
Single.create(new SingleOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull SingleEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);}}).subscribe(integer -> {}, throwable -> {});示例2
Single.just(1).subscribe(integer -> {}, throwable -> {});Maybe
Maybe 是 RxJava2.x 之后才有的新類型,可以看成是Single和Completable的結合。
Maybe 也只能發射單個事件或錯誤事件,即使發射多個數據,后面發射的數據也不會處理。
只有 onSuccess 、 onError 、onComplete事件,沒有 onNext 事件。
實例1
Maybe.create(new MaybeOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull MaybeEmitter<Integer> emitter) throws Throwable {emitter.onSuccess(1);emitter.onComplete();}}).subscribe(integer -> {}, throwable -> {});實例2
Maybe.just(1).subscribe(integer -> {}, throwable -> {});BackpressureStrategy
背壓策略
public enum BackpressureStrategy {/*** The {@code onNext} events are written without any buffering or dropping.* Downstream has to deal with any overflow.* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.*/MISSING,/*** Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}* in case the downstream can't keep up.*/ERROR,/*** Buffers <em>all</em> {@code onNext} values until the downstream consumes it.*/BUFFER,/*** Drops the most recent {@code onNext} value if the downstream can't keep up.*/DROP,/*** Keeps only the latest {@code onNext} value, overwriting any previous value if the* downstream can't keep up.*/LATEST }- MISSING 策略則表示通過 Create 方法創建的 Flowable 沒有指定背壓策略,不會對通過 OnNext 發射的數據做緩存或丟棄處理,需要下游通過背壓操作符
- BUFFER 策略則在還有數據未下發完成時就算上游調用onComplete或onError也會等待數據下發完成
- LATEST 策略則當產生背壓時僅會緩存最新的數據
- DROP 策略為背壓時丟棄背壓數據
- ERROR 策略是背壓時拋出異常調用onError
線程切換
RxUtil
package com.example.streamimport io.reactivex.rxjava3.android.schedulers.AndroidSchedulers import io.reactivex.rxjava3.core.FlowableTransformer import io.reactivex.rxjava3.core.MaybeTransformer import io.reactivex.rxjava3.core.ObservableTransformer import io.reactivex.rxjava3.core.SingleTransformer import io.reactivex.rxjava3.schedulers.Schedulers/*** @author yanjun.zhao* @time 2020/6/12 8:39 PM* @desc*/object RxUtil {/*** 線程切換*/fun <T> maybeToMain(): MaybeTransformer<T, T> {return MaybeTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> singleToMain(): SingleTransformer<T, T> {return SingleTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}/*** 線程切換*/fun <T> flowableToMain(): FlowableTransformer<T, T> {return FlowableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}fun <T> observableToMain(): ObservableTransformer<T, T> {return ObservableTransformer { upstream ->upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())}}}具體實現
package com.example.streamimport android.os.Bundle import androidx.appcompat.app.AppCompatActivity import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Maybe import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Singleclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)Single.just(1).map {//運行在子線程it}.compose(RxUtil.singleToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Maybe.just(1).map {//運行在子線程it}.compose(RxUtil.maybeToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Flowable.just(1).map {//運行在子線程it}.compose(RxUtil.flowableToMain()) //線程轉換.subscribe({//運行在主線程},{it.printStackTrace()})Observable.just(1).map {//運行在子線程it}.compose(RxUtil.observableToMain()) //線程轉換.subscribe({ it ->//運行在主線程},{it.printStackTrace()})} }concat
Concat操作符連接多個Observable的輸出,就好像它們是一個Observable,第一個Observable發射的所有數據在第二個Observable發射的任何數據前面,以此類推。
直到前面一個Observable終止,Concat才會訂閱額外的一個Observable。注意:因此,如果你嘗試連接一個"熱"Observable(這種Observable在創建后立即開始發射數據,即使沒有訂閱者),Concat將不會看到也不會發射它之前發射的任何數據。
例子1
private var ob1 = Observable.create<String> {Log.d("concat-數據源1", " ${Thread.currentThread().name} ")it.onNext("a1")it.onComplete()}private var ob2 = Observable.create<String> {Log.d("concat-數據源2", " ${Thread.currentThread().name} ")it.onNext("a2")it.onComplete()}private var ob3 = Observable.create<String> {Log.d("concat-數據源3", " ${Thread.currentThread().name} ")it.onNext("a3")it.onComplete()}Observable.concat<String>(ob1, ob2, ob3).subscribeOn(Schedulers.io()).subscribe{Log.d("concat-結果", " ${Thread.currentThread().name} " + it)}結果是:
concat-數據源1: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1 concat-數據源2: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1 concat-數據源3: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1結果分析:
- concat 輸出結果是有序的
- concat 會使三個數據源都會執行
那么如果我要實現哪個數據源有數據,我就用哪個數據,一旦獲取到想要的數據,后續數據源不再執行。其實很簡單,用 firstElement() ,這個需求有點像圖片加載流程 先從內存取,內存沒有從本地文件取,本都文件沒有就請求服務器。一旦哪個環節獲取到了數據,立刻停止后面的流程
Observable.concat<String>(ob1, ob2, ob3).firstElement().subscribeOn(Schedulers.io()).subscribe {Log.d("concat-結果", " ${Thread.currentThread().name} ")}}運行結果為:
concat-數據源1: RxCachedThreadScheduler-1 concat-結果: RxCachedThreadScheduler-1總結
以上是生活随笔為你收集整理的Android RxJava 3.x 使用总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin实战指南十五:协程泄漏
- 下一篇: Android Bitmap 研究与思考