Android—RxJava库知识
RXJAVA:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。
優點:異步,邏輯簡潔易懂。
程序要求:將一個給出的目錄數組 File[] folders 中每個目錄下的 png 圖片都加載出來并顯示在 imageCollectorView 中。
例子:
new Thread() {@Overridepublic void run() {super.run();for (File folder : folders) {File[] files = folder.listFiles();for (File file : files) {if (file.getName().endsWith(".png")) {final Bitmap bitmap = getBitmapFromFile(file);getActivity().runOnUiThread(new Runnable() {@Overridepublic void run() {imageCollectorView.addImage(bitmap);}});}}}} }.start();RxJava實現:?
Observable.from(folders).flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}}).filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {return file.getName().endsWith(".png");}}).map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}});RxJava 3個基本要素:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)。
Observable 和 Observer 通過 subscribe() 方法實現訂閱關系,從而 Observable 可以在需要的時候發出事件來通知 Observer。
實現基本步驟:
1、創建 Observer
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer接口的實現方式:
Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };- onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標志。
- onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
- 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的。它們的區別對于使用者來說主要有兩點:
2、創建 Observable
Observable 即被觀察者,它決定什么時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,并為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();} });RxJava 還提供了一些方法用來快捷創建事件隊列,例如:
- just(T...): 將傳入的參數依次發送出來。
- from(T[]) / from(Iterable<? extends T>) : 將傳入的數組或 Iterable 拆分成具體對象后,依次發送出來。
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。
3、Subscribe (訂閱)
創建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯結起來,整條鏈子就可以工作了。
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);觀察者模式本身的目的就是『后臺處理,前臺回調』的異步機制,因此異步對于 RxJava 是至關重要的。而要實現異步,則需要用到 RxJava 的另一個概念:Scheduler?。
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調,RxJava 會自動根據定義創建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);} }; Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling} }; Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");} };// 自動創建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);Action0 ,Action1 是 RxJava 的接口,它們只有一個方法 call,Action0 的這個call()方法是無參無返回值的;而Action1的方法有一個參數,為call(T param)。由于onCompleted() 方法也是無參無返回值的,對應Action0 可以被當成一個包裝對象,onNext(T obj) 和 onError(Throwable error) 也是單參數無返回值的,對應Action1。將這三個方法的內容打包起來將自己作為一個參數傳入 subscribe() 以實現不完整定義的回調。根據參數的數量X還有ActionX。
String[] names = ...; Observable.from(names).subscribe(new Action1<String>() {@Overridepublic void call(String name) {Log.d(tag, name);}});3. 線程控制 —— Scheduler (一)
RxJava 已經內置了幾個 Scheduler :
- Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在于 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。
- subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。 只能有一個。
- ?observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程??捎卸鄠€。
由于 subscribeOn(Schedulers.io()) 的指定,被創建的事件的內容 1、2、3、4 將會在 IO 線程發出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的打印將發生在主線程 。
這兩句經常會使用到,一般在IO線程獲取網絡數據,主線程將數據更新到界面?!汉笈_線程取數據,主線程顯示』的程序策略。
4、RxJava 的操作符
map():數據類型轉換
將String轉換為Bitmap對象
Observable.just("images/logo.png") // 輸入類型 String.map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String filePath) { // 參數類型 Stringreturn getBitmapFromPath(filePath); // 返回類型 Bitmap}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) { // 參數類型 BitmapshowBitmap(bitmap);}});Func1 的類。和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數的方法。 Func1 和 Action 的區別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。
flatMap():假設有一個數據結構『學生』,現在需要打印出一組學生的名字,可以用map()將Student類型轉換為String。
但是如果要打印出學生的課程,則需要flatMap(),因為課程有多個。
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() {@Overridepublic void onNext(Course course) {Log.d(tag, course.getName());}... }; Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {@Overridepublic Observable<Course> call(Student student) {return Observable.from(student.getCourses());}}).subscribe(subscriber);我們也可以用map轉換并返回List<Course>,然后在onNext里遍歷,但是不夠實用。
concat():concat操作符連接多個Observable一起輸出,第一個Observable發射的所有數據在第二個Observable發射的任何數據前面,以此類推。
first():只發射第一個滿足某個條件的數據
三級緩存:
- 首先檢查內存是否有緩存
- 然后檢查文件緩存中是否有
- 最后才從網絡中取
任何一步一旦發現數據后面的操作都不執行
在rxjava中為我們提供了兩個解決這個問題的操作符,分別是:concat和first
concat將3個Observable連在一起,first則如果發射第一個滿足條件的Observable
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {if (memoryCache != null) {subscriber.onNext(memoryCache);} else {subscriber.onCompleted();}}});Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {String cachePref = rxPreferences.getString("cache").get();if (!TextUtils.isEmpty(cachePref)) {subscriber.onNext(cachePref);} else {subscriber.onCompleted();}}});Observable<String> network = Observable.just("network");//依次檢查memory、disk、network Observable.concat(memory, disk, network).first().subscribeOn(Schedulers.newThread()).subscribe(s -> {memoryCache = "memory";System.out.println("--------------subscribe: " + s);});skip(int i):跳過Observable發射的前i項數據
repeat(int i):重復發送i次
interval(int i,TimeUnit.SECONDS) :每個i秒發送一次
take(int i):只取前幾個
takelat(int i):只取后幾個
?
轉載自:
RxJava 詳解
Rx系列之RxJava初識
Rx系列之RxJava操作符
Rx系列之Rxjava操作符進階-使用場景
總結
以上是生活随笔為你收集整理的Android—RxJava库知识的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JIRA 5.0.1 发布
- 下一篇: 专访刘伟:软件开发人员的内功修炼之道