冷热rx-java可观察
我自己對“熱的和冷的可觀察的”的理解是不穩定的,但這是我到目前為止所了解的!
冷觀測
考慮一個返回rx-java Observable的API:
import obs.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.schedulers.Schedulers;public class Service1 {private static final Logger logger = LoggerFactory.getLogger(Service1.class);public Observable<String> operation() {return Observable.<String>create(s -> {logger.info("Start: Executing slow task in Service 1");Util.delay(1000);s.onNext("data 1");logger.info("End: Executing slow task in Service 1");s.onCompleted();}).subscribeOn(Schedulers.computation());} }現在,首先要注意的是,典型的Observable在訂閱之前不會做任何事情:
所以基本上,如果我要這樣做:
Observable<String> op1 = service1.operation();除非通過以下方式在Observable上進行訂閱,否則不會打印或返回任何內容:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(1);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();因此,現在,如果此Observable上有多個訂閱,將會發生什么:
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(3);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();有了冷的可觀察到的代碼,代碼將再次被調用并再次發出項目,我在機器上得到了這個代碼:
06:04:07.206 [RxComputationThreadPool-2] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:04:07.208 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:04:08.211 [RxComputationThreadPool-2] INFO o.b.BasicObservablesTest - From Subscriber 2: data 1 06:04:08.211 [RxComputationThreadPool-1] INFO o.b.BasicObservablesTest - From Subscriber 1: data 1 06:04:08.211 [RxComputationThreadPool-3] INFO o.b.BasicObservablesTest - From Subscriber 3: data 1 06:04:08.213 [RxComputationThreadPool-2] INFO o.b.Service1 - End: Executing slow task in Service 1 06:04:08.214 [RxComputationThreadPool-1] INFO o.b.Service1 - End: Executing slow task in Service 1 06:04:08.214 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1熱可觀察–使用ConnectableObservable
另一方面,Hot Observable確實不需要訂閱即可開始發射項目。 一種實現Hot Observable的方法是使用ConnectableObservable ,它是一個Observable,它在調用connect方法之前不會發出項目,但是一旦開始發出項目,它的任何訂閱者只能從訂閱點獲取項目。 因此,再次回顧前面的示例,但使用ConnectableObservable代替:
Observable<String> op1 = service1.operation();ConnectableObservable<String> connectableObservable = op1.publish();CountDownLatch latch = new CountDownLatch(3);connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());connectableObservable.connect();latch.await();并打印以下內容:
06:07:23.852 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 1 06:07:24.860 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 1: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 2: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 3: data 1 06:07:24.862 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1熱點可觀察–使用主題
將冷的Observable轉換為熱的另一種方法是使用Subject 。 主題既表現為可觀察者,又表現為觀察者,有不同類型的主題具有不同的行為。 在這里,我使用一個名為PublishSubject的Subject,它具有Pub / Sub行為–這些項目被發送給所有在其上監聽的訂閱者。 因此,隨著PublishSubject的引入,代碼如下所示:
Observable<String> op1 = service1.operation();PublishSubject<String> publishSubject = PublishSubject.create();op1.subscribe(publishSubject);CountDownLatch latch = new CountDownLatch(3);publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),e -> logger.error(e.getMessage(), e),() -> latch.countDown());latch.await();了解如何將PublishSubject作為Observable的訂閱者引入,而其他訂閱者如何訂閱PublishSubject。 輸出將類似于ConnectableObservable的輸出。
從本質上來說,這就是我對“熱可觀察”的理解程度。 因此,總而言之,Cold和Hot Observable之間的區別在于訂戶何時獲得發射的項目以及何時發射項目–使用Cold Observable,它們在訂閱并通常獲得所有發射的項目時發射,一個Hot Observable,項目將在沒有訂閱服務器的情況下發出,而訂閱者通常會在訂閱點之后獲得項目。
參考
翻譯自: https://www.javacodegeeks.com/2015/03/hot-and-cold-rx-java-observable.html
總結
以上是生活随笔為你收集整理的冷热rx-java可观察的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMS 2.0中JMSContext的类
- 下一篇: 修改无线路由器密码要注意什么