在Completablefuture和Observable之间转换
Java 8中的CompletableFuture<T>是對T類型的值將來將可用的承諾的高級抽象。 Observable<T>非常相似,但是它承諾將來會出現任意數量的項,從0到無窮大。 異步結果的這兩種表示與僅使用一項即可使用Observable而不是CompletableFuture情況非常相似,反之亦然。 另一方面, CompletableFuture更專業,并且由于它現在是JDK的一部分,因此應該很快就會流行起來。 讓我們用簡短的文章來慶祝RxJava 1.0發行版,該文章展示了如何在兩者之間進行轉換而又不失去它們的異步和事件驅動性質。
從
CompletableFuture表示將來的一個值,因此將其變為Observable非常簡單。 當Future以某個值完成時, Observable也將立即發出該值并關閉流:
class FuturesTest extends Specification {public static final String MSG = "Don't panic"def 'should convert completed Future to completed Observable'() {given:CompletableFuture<String> future = CompletableFuture.completedFuture("Abc")when:Observable<String> observable = Futures.toObservable(future)then:observable.toBlocking().toIterable().toList() == ["Abc"]}def 'should convert failed Future into Observable with failure'() {given:CompletableFuture<String> future = failedFuture(new IllegalStateException(MSG))when:Observable<String> observable = Futures.toObservable(future)then:observable.onErrorReturn({ th -> th.message } as Func1).toBlocking().toIterable().toList() == [MSG]} CompletableFuture failedFuture(Exception error) {CompletableFuture future = new CompletableFuture()future.completeExceptionally(error)return future}}尚未執行的 Futures.toObservable()第一個測試會將Future轉換為Observable ,并確保正確傳播值。 第二個測試創建了失敗的Future ,將失敗替換為異常的消息,并確保傳播了異常。 實現要短得多:
public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}})); }注意: Observable.fromFuture()存在,但是我們想充分利用ComplatableFuture的異步運算符。
從
實際上,有兩種將Observable轉換為Future -創建CompletableFuture<List<T>>或CompletableFuture<T> (如果我們假設Observable僅包含一項)。 讓我們從前一種情況開始,用以下測試用例進行描述:
def 'should convert Observable with many items to Future of list'() {given:Observable<Integer> observable = Observable>just(1, 2, 3)when:CompletableFuture<List<Integer>> future = Futures>fromObservable(observable)then:future>get() == [1, 2, 3] }def 'should return failed Future when after few items exception was emitted'() {given:Observable<Integer> observable = Observable>just(1, 2, 3)>concatWith(Observable>error(new IllegalStateException(MSG)))when:Futures>fromObservable(observable)then:def e = thrown(Exception)e>message == MSG }顯然,直到源Observable信號流結束, Future才完成。 因此, Observable.never()將永遠不會完成包裝Future ,而是使用空列表來完成它。 該實現更短,更甜蜜:
public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) {final CompletableFuture<List<T>> future = new CompletableFuture<>();observable.doOnError(future::completeExceptionally).toList().forEach(future::complete);return future; }關鍵是Observable.toList() ,它可以方便地從Observable<T>和Observable<List<T>> 。 當源Observable<T>完成時,后者將發出List<T>類型的一項。
從
當我們知道CompletableFuture<T>將恰好返回一項時,就會發生上一次轉換的特殊情況。 在這種情況下,我們可以將其直接轉換為CompletableFuture<T> ,而不是僅包含一項的CompletableFuture<List<T>> 。 首先測試:
def 'should convert Observable with single item to Future'() {given:Observable<Integer> observable = Observable.just(1)when:CompletableFuture<Integer> future = Futures.fromSingleObservable(observable)then:future.get() == 1 }def 'should create failed Future when Observable fails'() {given:Observable<String> observable = Observable.<String> error(new IllegalStateException(MSG))when:Futures.fromSingleObservable(observable)then:def e = thrown(Exception)e.message == MSG }def 'should fail when single Observable produces too many items'() {given:Observable<Integer> observable = Observable.just(1, 2)when:Futures.fromSingleObservable(observable)then:def e = thrown(Exception)e.message.contains("too many elements") }同樣,實現非常簡單并且幾乎相同:
public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) {final CompletableFuture<T> future = new CompletableFuture<>();observable.doOnError(future::completeExceptionally).single().forEach(future::complete);return future; }上面的Helper方法還不夠完善,但是,如果您需要在JDK 8和RxJava風格的異步計算之間進行轉換,那么這篇文章應該足以幫助您入門。
翻譯自: https://www.javacodegeeks.com/2014/12/converting-between-completablefuture-and-observable.html
總結
以上是生活随笔為你收集整理的在Completablefuture和Observable之间转换的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是防御过当(什么是防御cc ddos
- 下一篇: linux系统安装在u盘(linux系统