map原理 java_RxJava的基本原理以及Map,flatMap的原理
前言:
RxJava想必很多人都用過了,其實也是一早就接觸過這個框架了,但是最近看了下一些關于是否需要使用RxJava的文章,對于RxJava的優點缺點有了更深的理解,然后看了論壇上有朋友提到很難理解Map,flatMap的區別,所以突然興致來了,想寫一點點東西,從源碼出發簡單的聊一下這兩個方法的區別以及使用,通過通俗易懂的方式使大家能輕松簡單的理解他們,好的讓我們開始吧:
首先我看來看看RxJava的一些基本概念:
RxJava 簡介:
RxJava是 ReactiveX 在 Java 上的開源的實現。RxJava可以輕松處理不同運行環境下的后臺線程或UI線程任務的框架。RxJava 的異步實現,是通過一種擴展的觀察者模式來實現的。
Observable(可觀察者,即被觀察者) 和 Subscriber(訂閱者)是兩個主要的類。在 RxJava 上,一個 Observable 是一個發出數據流或者事件的類,Subscriber 是一個對這些發出的 items (數據流或者事件)進行處理(采取行動)的類。
Observable 和Observer 通過 subscribe() 方法實現訂閱關系。一個 Observable 的標準流發出一個或多個item,然后成功完成或者出錯。一個 Observable 可以有多個 Subscribers,并且通過 Observable 發出的每一個 item,該 item 將會被發送到 Subscriber.onNext() 方法來進行處理。一旦 Observable 不再發出 items,它將會調用 Subscriber.onCompleted() 方法,或如果有一個出錯的話Observable 會調用 Subscriber.onError() 方法。
RxJava里面兩個關鍵的概念:Observable 與 Observer 即被觀察者,與觀察者,他們在RxJava里面對應的類分別為:class Observable 與 interface Observer ,Observer是 interface 里面定義了我們最經常見的三個方法,
public interface Observer {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
但是我們在代碼中一般不直接使用他,而是在代碼里面經常看見實現了他的抽象類:
public abstract class Subscriber implements Observer, Subscription {
// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber> subscriber;
/* protected by `this` */
private Producer producer;
/* protected by `this` */
private long requested = NOT_SET; // default to not set
protected Subscriber() {
this(null, false);
}
//省略以下方法
}
因為他是 abstract 也不能直接new出來,我們在使用中經常會new 他的匿名類來進行使用(注意:new匿名的abstract類并不是直接new 了abstract,底層而是會生成一個類,這個類繼承了abstract,這個有機會展開給大家說)
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(String s) {
Log.d("Rxjava", "Item: " + s);
}
@Override
public void onCompleted() {
Log.d("Rxjava", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("Rxjava", "Error!");
}
};
其中還有一個重要的概念就是 Observable.OnSubscribe 這個表示的是被監聽者與監聽者綁定以后會觸發 OnSubscribe 里面的 call 方法,我們一般在 call 方法里面進行對于監聽者 subscribe 的三個方法 onNext ,onCompleted,onError 的觸發調用,使得整個調用過程像”鏈式“一樣進行。
好了,我們現在再來看看RxJava的最最基本使用(😆大俠可以直接繞道走了),通過最最基本的使用我們來說一說基本的調用流程:
public void testCreate(){
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(String s) {
Log.d("Rxjava", "Item: " + s);
}
@Override
public void onCompleted() {
Log.d("Rxjava", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("Rxjava", "Error!");
}
};
observable.subscribe(subscriber);
}
首先創建一個被觀察者 observable ,再創建一個觀察者 subscriber 通過
observable.subscribe(subscriber);
方法來進行關聯和調用,一旦關聯以后就是調用到 Observable 的 call 方法,會把 subscribe 傳遞到 call 方法的參數里面(大家一定要理解這個,不要只記住調用步驟就完了,因為這個是你理解RxJava的基礎)
為什么是這個步驟了,讓我從代碼中找到答案,其實很簡單:
static Subscription subscribe(Subscriber super T> subscriber, Observable observable) {
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
//省略以下代碼
}
}
而 hook.onSubscribeStart 只是返回 observable.onSubscribe 然后調用它的 call 方法,而這個又是什么呢?
protected Observable(OnSubscribe f) {
this.onSubscribe = f;
}
只不過是Observable.create 出來的 OnSubscribe ,
public interface OnSubscribe extends Action1> {
// cover for generics insanity
}
public interface Action1 extends Action {
void call(T t);
}
OnSubscribe 也就是我們create時候的匿名類:
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
所以現在執行就很明顯了, 現在 observable.subscribe(subscriber); 把 subscribe 傳入到你實現的 OnSubscribe 的 call 方法里面,然后調用 call 方法
好了有了這個基礎,我們再來看看Map方法是怎么實現數據轉換的,首先我們看看Map的基本使用:
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("333");
Log.d("Rxjava", "call observable");
}
});
observable.map(new Func1() {
@Override
public Integer call(String number) { // 參數類型 String
return Integer.parseInt(number);
}
}).subscribe(new Action1() {
@Override
public void call(Integer number) {
Log.d("Rxjava", "number:" + number.getClass());
}
});
為了大家好理解,我還是分開來實現,首先 Observable.create 了一個被監聽者,然后 Map 轉換,然后再 subscribe 訂閱觸發事件,
我們先看看 observable.map 里面做了什么:
public final Observable map(Func1 super T, ? extends R> func) {
return lift(new OperatorMap(func));
}
通過 Map 里面的轉換步驟 Func1 new了一個 OperatorMap ,再看看這個是什么:
public final class OperatorMap implements Operator {
final Func1 super T, ? extends R> transformer;
public OperatorMap(Func1 super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber super T> call(final Subscriber super R> o) {
MapSubscriber parent = new MapSubscriber(o, transformer);
o.add(parent);
return parent;
}
static final class MapSubscriber extends Subscriber {
final Subscriber super R> actual;
final Func1 super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber super R> actual, Func1 super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
//省略以下
}
這個 OperatorMap 有一個 MapSubscriber 這個是作為一個轉接的 Subscriber ,因為你的數據流是通過被監聽者的 Observable.OnSubscribe call 觸發然后經過 Map 里面的 Fun1 進行轉換以后再到最終的 subscriber ,這個 MapSubscriber 就起到這么一個作用,再看看 lift 函數做點什么
public final Observable lift(final Operator extends R, ? super T> operator) {
return new Observable(new OnSubscribeLift(onSubscribe, operator));
}
又 new 了一個 Observable ,里面的參數是 new OnSubscribeLift(onSubscribe, operator) ,我們再看看 OnSubscribeLift 是什么:
public final class OnSubscribeLift implements OnSubscribe {
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe parent;
final Operator extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe parent, Operator extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber super R> o) {
try {
Subscriber super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
他實現了 OnSubscribe 了所以new 出來是沒有問題的,因為被監聽者的構造方法只是要 OnSubscribe 這個參數
protected Observable(OnSubscribe f) {
this.onSubscribe = f;
}
OnSubscribeLift 利用原來的 OnSubscribe 方法(原數據流最開始的執行方法)與new出來 ** OperatorMap** 為構造函數重新生成了一個 OnSubscribe,
整個 Map 函數的的過程就是new了一個新的被監聽者 Observable ,然后里面的監聽方法 onSubscribe 就是新生成的 OnSubscribeLift
然后執行下面語句會進行調用 Observable.OnSubscribe 的call 方法
observable.subscribe(subscriber);
也就是調用了 OnSubscribeLift 里面的 call 方法,里面會調用到 OperatorMap 里面的call方法
Subscriber super T> st = hook.onLift(operator).call(o);
operator 里面的call方法 o 變量為監聽者 Subscriber ,transformer變量就是 new operator 是傳入的 Func1 也就是Map里面的轉換步驟 ,通過這兩個變量生成了 this.actual 與 this.mapper
public Subscriber super T> call(final Subscriber super R> o) {
MapSubscriber parent = new MapSubscriber(o, transformer);
o.add(parent);
return parent;
}
這里面用這兩個參數new出了一個 MapSubscriber 然后返回給 st 變量,然后調用 parent.call(st); 這個 parent 就是我們new OnSubscribeLift 的指定的原監聽者的 OnSubscribe ,也就是要調用原被監聽者的 OnSubscribe 的 call 函數來執行第一個”數據流“(這個也是正常現象Map只是轉換,數據流還是應該從頭進行然后經過Map最后到被監聽者),只不過是傳入的參數為 MapSubscriber,這個對象我們上面大概提到了作為一個橋接的Subscriber使用
public void call(Subscriber super String> subscriber) {
subscriber.onNext("333");
}
這個會”鏈接“調用到 MapSubscriber 的 onNext 方法并把原數據 ”333“ 傳入,
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
前面說了 this.mapper 為轉換函數 Fun1 ,mapper.call(t); 即調用Map的中轉函數進行轉換
public Integer call(String number) { // 參數類型 String
return Integer.parseInt(number);
}
然后再調用 actual 的 onNext 把轉換的參數傳進去,actual 上面也說了就是原 subscriber ,調用它的call 方法
new Action1() {
@Override
public void call(Integer number) {
Log.d("Rxjava", "number:" + number.getClass());
}
})
這里有點疑問不是調用 onNext 方法么,怎么調用到了 call 方法呢
接著看:
public final Subscription subscribe(final Action1 super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1 onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber(onNext, onError, onCompleted));
}
subscribe 方法里面會把 Action1 super T> onNext 封裝成一個 ActionSubscriber 再進行 subscribe 調用,
public final class ActionSubscriber extends Subscriber {
final Action1 super T> onNext;
final Action1 onError;
final Action0 onCompleted;
public ActionSubscriber(Action1 super T> onNext, Action1 onError, Action0 onCompleted) {
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
@Override
public void onNext(T t) {
onNext.call(t);
}
}
ActionSubscriber 的 onNext 方法執行了 Action1 的 call 方法,這樣一來就全都對應上了
最后總結一下:Map 方法其實就是構造了一個新的 Observable 與 一個新的 Observable.OnSubscribe (OnSubscribeLift),OnSubscribeLift 執行的時候會生成一個 MapSubscriber ,然后會調用原 Observable.OnSubscribe 方法并且把 MapSubscriber 傳入作為參數,并且調用它的 onNext 方法,因為 MapSubscriber 的生成是通過原 Subscriber 與 Func1 實現的,所以在 MapSubscriber 里面會拿到原 Observable.OnSubscribe 傳入的數據流然后調用 Func1 進行轉換然后再把轉換結果作為參數繼續調用原 Subscriber 的 onNext 方法,這樣一來數據流到監聽者那端就是被處理過的數據流了。(注:OnSubscribeLift 實現了 OnSubscribe,也就是可以作為OnSubscribe)
提問:為什么會生成新的 Observable 與 Observable.OnSubscribe (OnSubscribeLift) 與 Subscriber(MapSubscriber) ?
因為被監聽者到監聽者的數據流方式要被打斷,中間要加入一環,所以生成一個新的 Observable 與 Observable.OnSubscribe(OnSubscribeLift) ,這個里面還要保存原 Observable.OnSubscribe 的方法作為數據流的源頭,然后還要生成一個新的 Subscriber (MapSubscriber),OnSubscribeLift 作為原數據流(也就是程序最開始執行的call方法)與 MapSubscriber 的橋接,而 MapSubscriber 里面執行了Map轉換函數然后再繼續執行與訂閱者的調用。
好了Map函數說完了,應該通俗易懂比較好理解了吧,下面再來看看 flatMap函數:
可以提前給大家介紹下flatMap函數與Map函數的區別,Map函數起的作用是轉換數據流的數據,然后往下再傳遞,flatMap其實也有類似的效果也是可以處理數據流的數據(為什么這么說,咋們往下看),只不過是Map函數里面call方法直接返回轉換的數據類型,而flatMap返回的是一個Observable對象
下面我們看看flatMap的基本使用,老規矩上一個最基本的例子:
public void testFlatMap1(){
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
Log.d("Rxjava", "onNext123");
subscriber.onNext("123");
}
});
observable.flatMap(new Func1>() {
@Override
public Observable call(String s) {
return createIpObservable(s);
}
})
.subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("Rxjava", "Data: " + s);
}
});
}
private Observable createIpObservable(final String url)
{
return Observable.create(new Observable.OnSubscribe()
{
@Override
public void call(Subscriber super String> subscriber)
{
try {
String ip = url + "!!!";
Log.d("Rxjava", "Emit Data -> " + url);
subscriber.onNext(ip);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
打印為:
2020-05-04 00:35:25.819 4340-4340/com.example.myrxjava2 D/Rxjava: onNext123
2020-05-04 00:39:12.642 4340-4340/com.example.myrxjava2 D/Rxjava: Emit Data -> 123
2020-05-04 00:39:55.588 4340-4340/com.example.myrxjava2 D/Rxjava: Data: 123!!!
我們再來看看flatMap函數里面做了什么:
public final Observable flatMap(Func1 super T, ? extends Observable extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable)this).scalarFlatMap(func);
}
return merge(map(func)); //里面有一個map函數,難怪上面說和map很像
}
再看看merge里面做了什么:
public static Observable merge(Observable extends Observable extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.instance(false));
}
merge里面又調用了lift函數,為什么說又調用了,大家可以回頭看看map函數也調用了lift函數,只不過兩次的參數不同而已,map里面調用lift函數的參數是OperatorMap,而merge里面調用lift函數的參數是OperatorMerge
那么大家可以想一想調用流程:map函數調用lift生成的 Observable 里面通過new一個OnSubscribeLift來作為原數據流(也就是程序最開始執行的call方法)與 MapSubscriber 的橋接,而merge里面是再利用lift函數再次生成了一個新的 Observable 里面的new的OnSubscribeLift 則應該為map函數里面的OnSubscribeLift 與 OperatorMerge的橋接,(注:OnSubscribeLift 實現了 OnSubscribe,也就是可以作為OnSubscribe)
所以程序執行的大致流程應該是:先執行 OperatorMerge 里面的 call 方法,然后執行 MapSubscriber 里面的call 方法,然后執行到原數據流(也就是程序最開始執行的call方法)的call方法,然后到flapMap(也可以說是map里面,因為flapMap里面調用了map)里面的 Func1 的call 方法,最后根據上一步返回值為 Observable 再執行其的 call 方法,最后把數據流傳到訂閱者Subscriber的call方法里面
😎流程是不是這樣的呢,讓我去驗證下:
subscribe執行以后首先執行的是OperatorMergel里面的 OnSubscribeLift 里面的call方法
public void call(Subscriber super R> o) {
try {
Subscriber super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
hook.onLift(operator) 返回的就是生成的 OperatorMerge,再執行里面的call方法:
public Subscriber> call(final Subscriber super T> child) {
MergeSubscriber subscriber = new MergeSubscriber(child, delayErrors, maxConcurrent);
MergeProducer producer = new MergeProducer(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
OperatorMerge 里面的call方法做了幾個事情其中最重要的就是 new MergeSubscriber 生成了一個 MergeSubscriber
public MergeSubscriber(Subscriber super T> child, boolean delayErrors, int maxConcurrent) {
this.child = child;
this.delayErrors = delayErrors;
this.maxConcurrent = maxConcurrent;
this.nl = NotificationLite.instance();
this.innerGuard = new Object();
this.innerSubscribers = EMPTY;
if (maxConcurrent == Integer.MAX_VALUE) {
scalarEmissionLimit = Integer.MAX_VALUE;
request(Long.MAX_VALUE);
} else {
scalarEmissionLimit = Math.max(1, maxConcurrent >> 1);
request(maxConcurrent);
}
}
里面很重要的一步就是 this.child = child; 而這個child就是訂閱者Subscriber,然后執行 parent.call(st); 這個 parent 就是 map函數里面返回的 OnSubscribeLift,首先執行里面的 Subscriber super T> st = hook.onLift(operator).call(o); 這個operator則應該是 MapSubscriber 執行他的call方法:
public Subscriber super T> call(final Subscriber super R> o) {
MapSubscriber parent = new MapSubscriber(o, transformer);
o.add(parent);
return parent;
}
返回一個 MapSubscriber ,再執行map 的OnSubscribeLift里面的parent.call(st);
這個里面的parent就是原數據流(也就是最開始要執行的call)函數:
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
Log.d("Rxjava", "onNext");
subscriber.onNext("333");
}
});
這里面的 Subscriber 則應該為MapSubscriber,然后調用其 onNext 方法:
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
先用 result = mapper.call(t); ,mapper則為Fun1 ,也就是調用Fun1
里面的call方法:
new Func1>() {
@Override
public Observable call(String s) {
return createIpObservable(s);
}
}
這個方法返回的是一個新的 Observable 對象:
最后調用MapSubscriber 里面的 actual.onNext(result); 這個actual就是創建MapSubscriber傳進來的 MergeSubscriber ,所以執行他的 onNext 方法:
public void onNext(Observable extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable extends T>)t).get());
} else {
InnerSubscriber inner = new InnerSubscriber(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner); //關鍵一句話
emit();
}
}
里面 t.unsafeSubscribe(inner);
public final Subscription unsafeSubscribe(Subscriber super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
這個里面的調用 hook.onSubscribeStart(this, onSubscribe).call(subscriber); 而這個里面的 onSubscribe 就是上面通過 createIpObservable 生成的 Observable ,執行他的call 方法 :
private Observable createIpObservable(final String url)
{
return Observable.create(new Observable.OnSubscribe()
{
@Override
public void call(Subscriber super String> subscriber)
{
try {
String ip = url + "!!!";
Log.d("Rxjava", "Emit Data -> " + url);
subscriber.onNext(ip);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
里面處理完數據流以后會調用 subscriber.onNext(ip); 這個里面的 subscriber 就是上面new出來的InnerSubscriber,再調用其onNext 方法:
public void onNext(T t) {
parent.tryEmit(this, t);
}
parent 就是 OperatorMerge,接著調用tryEmit:
void tryEmit(InnerSubscriber subscriber, T value) {
boolean success = false;
long r = producer.get();
if (r != 0L) {
synchronized (this) {
// if nobody is emitting and child has available requests
r = producer.get();
if (!emitting && r != 0L) {
emitting = true;
success = true;
}
}
}
if (success) {
emitScalar(subscriber, value, r);
} else {
queueScalar(subscriber, value);
}
}
里面 emitScalar :
protected void emitScalar(InnerSubscriber subscriber, T value, long r) {
boolean skipFinal = false;
try {
try {
child.onNext(value);
} catch (Throwable t) {
//省略無關緊要的
}
}
而這個child 就是上面我們提到過的專門保存的訂閱者也就是監聽者subscriber,最后調用其onNext方法:(我們上文講解了onNext 與 call 方法的聯系,忘記了可以拉到上面進行查看),完成所有調用。
new Action1() {
@Override
public void call(String s) {
Log.d("Rxjava", "Data: " + s);
}
}
上面我是為了講解flatMap的基礎調用方式做的例子,一般情況下我們不會這么去使用flatMap,給大家一個例子參考:
public void testFlatMap(){
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
Log.d("Rxjava", "onNext");
subscriber.onNext("1,2,3");
}
});
observable.flatMap(new Func1>() {
@Override
public Observable call(String s) {
String[] names = s.split(",");
return Observable.from(names);
}
}).subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("Rxjava", "Consume Data
}
});
}
打印:
2020-05-04 16:30:27.061 6627-6627/? D/Rxjava: onNext
2020-05-04 16:30:27.086 6627-6627/? D/Rxjava: Consume Data
2020-05-04 16:30:27.086 6627-6627/? D/Rxjava: Consume Data
2020-05-04 16:30:27.086 6627-6627/? D/Rxjava: Consume Data
我們最后來總結下:flatMap的調用也用到了map函數,只不過是Map函數里面call方法直接返回轉換的數據類型,而flatMap返回的是一個Observable對象,其本質還是作為數據流的其中一環用于轉換數據而出現的。
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的map原理 java_RxJava的基本原理以及Map,flatMap的原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java彩色的世界_JAVA真彩色转25
- 下一篇: 什么是溶肌症