使用RxJava和SseEmitter进行服务器发送的事件
Spring Framework 4.2 GA即將發布,讓我們看一下它提供的一些新功能。 引起我注意的一個事件是一個簡單的新類SseEmitter ,它是對Spring MVC控制器中易于使用的發送事件的抽象。 SSE是一項技術,使您可以在一個HTTP連接內沿一個方向將數據從服務器流式傳輸到瀏覽器。 聽起來像是websocket可以做什么的子集。 但是,由于它是一個簡單得多的協議,因此可以在不需要全雙工的情況下使用,例如,實時推動股價變化或顯示長時間運行的進程。 這將是我們的例子。
假設我們有一個具有以下API的虛擬硬幣礦工:
public interface CoinMiner {BigDecimal mine() {//...} }每次調用mine()我們都必須等待幾秒鐘,才能獲得大約1個硬幣的回報(平均)。 如果要挖掘多個硬幣,我們必須多次調用此方法:
@RestController public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}這項工作有效,我們可以請求/mine/10和mine()方法將執行10次。 到目前為止,一切都很好。 但是挖掘是一項占用大量CPU的任務,將計算分散到多個內核將是有益的。 此外,即使使用并行化,我們的API端點也相當慢,我們必須耐心等待直到所有工作完成而沒有任何進度通知。 讓我們首先修復并行性–但是,由于并行流無法控制底層線程池,因此我們來使用顯式的ExecutorService :
@Component class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}客戶端代碼必須顯式提供ExecutorService (只是設計選擇):
@RequestMapping("/mine/{count}") void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join); }首先多次調用mineAsync ,然后(作為第二階段)等待所有mineAsync完成并join這非常重要。 很容易寫:
IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);但是,由于Java 8中流的惰性,該任務將按順序執行! 如果您還不習慣流的懶惰,請始終從下至上閱讀它們:我們要求join一些將來的內容,以便流上升并只調用一次mineAsync() (惰性!),并將其傳遞給join() 。 當join()完成時,它再次上升并要求另一個Future 。 通過使用collect()我們強制所有mineAsync()執行,開始所有異步計算。 稍后我們等待每一個。
介紹
現在該變得更具反應性了(我說過了)。 控制器可以返回SseEmitter的實例。 從處理程序方法return后,容器線程將被釋放并可以處理更多即將到來的請求。 但是連接沒有關閉,客戶端一直在等待! 我們應該做的是保留對SseEmitter實例的引用,并在以后從另一個線程調用其send()和complete方法。 例如,我們可以啟動一個長時間運行的進程,并保持send()從任意線程進行進度。 完成該過程后,我們complete() SseEmitter ,最后關閉HTTP連接(至少從邏輯SseEmitter ,請記住Keep-alive )。 在下面的示例中,我們有一堆CompletableFuture ,當每個CompletableFuture完成時,我們只需將1發送給客戶端( notifyProgress() )。 當所有期貨都完成后,我們完成流( thenRun(sseEmitter::complete) ),關閉連接:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);} }private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList()); }調用此方法將產生以下響應(注意Content-Type ):
< HTTP/1.1 200 OK < Content-Type: text/event-stream;charset=UTF-8 < Transfer-Encoding: chunked < data:1data:1data:1data:1* Connection #0 to host localhost left intact稍后我們將學習如何在客戶端解釋這種響應。 現在暫時讓我們整理一下設計。
與引進RxJava
上面的代碼有效,但是看起來很凌亂。 實際上,我們有一系列事件,每個事件都代表計算的進度。 計算最終完成,因此流也應發出信號結束。 聽起來就像是Observable ! 我們從重構CoinMiner開始,以返回Observable<BigDecimal :
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject; }每當mineMany()返回的事件出現在Observable ,我們就mineMany()那么多硬幣。 當所有期貨都完成后,我們也完成了交易。 在實現方面,這看起來還沒有改善,但是從控制器的角度來看,它有多干凈:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }調用coinMiner.mineMany()我們只需訂閱事件。 事實證明Observable和SseEmitter方法匹配1:1。 這里發生的事情很不言自明:啟動異步計算,每當后臺計算發出任何進度信號時,將其轉發給客戶端。 好的,讓我們回到實現。 看起來很亂,因為我們將CompletableFuture和Observable混合使用。 我已經描述了如何僅使用一個元素將CompletableFuture轉換為Observable 。 這是一個概述,包括rx.Single從RxJava 1.0.13開始發現的rx.Single抽象(此處未使用):
public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}將這些實用程序運算符放在某個地方,我們可以改善實現并避免混合使用兩個API:
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables); }Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future); }RxJava有一個內置的運算符,用于將多個Observable合并為一個,我們的每個基礎Observable發出一個事件,這無關緊要。
深入研究RxJava運算符
讓我們使用RxJava的功能來稍微改善流式傳輸。
scan()
當前,每次我們開采一枚硬幣時,我們都會send(1)客戶端send(1)事件。 這意味著每個客戶都必須跟蹤其已經收到的硬幣數量,以便計算總的計算數量。 如果服務器始終發送總金額而不是增量,那就太好了。 但是,我們不想更改實現。 事實證明,使用Observable.scan()運算符非常簡單:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();} }scan()運算符接收上一個事件和當前事件,并將它們組合在一起。 通過應用BigDecimal::add我們只需將所有數字相加即可。 例如1、1 +1,(1 +1)+1等。 scan()類似于flatMap() ,但保留中間值。
用sample()采樣
可能是因為我們的后端服務產生了太多的進度更新,我們無法使用。 我們不想給客戶端增加不相關的更新并飽和帶寬。 每秒最多發送兩次更新聽起來很合理。 幸運的是,RxJava也有一個內置的運算符:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);sample()將定期查看底層流,并僅發出最新的項,并丟棄中間的項。 幸運的是,我們使用scan()即時聚合了項目,因此我們不會丟失任何更新。
window() –恒定的發射間隔
不過有一個陷阱。 如果在選定的500毫秒內沒有新內容出現, sample()將不會兩次發出相同的項目。 很好,但是請記住我們正在通過TCP / IP連接推送這些更新。 最好是定期向客戶端發送更新,即使在此期間什么也沒發生–只是為了保持連接的正常運行,就像ping 。 可能有多種方法可以滿足此要求,例如,涉及timeout()運算符。 我選擇使用window()運算符每500毫秒對所有事件進行分組:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);這是一個棘手的問題。 首先,我們將所有進度更新分組在500毫秒窗口中。 然后,我們使用reduce來計算在此時間段內開采的硬幣的總數(類似于scan() )。 如果在此期間未開采任何硬幣,我們只需返回ZERO 。 最后,我們使用scan()匯總每個窗口的小計。 我們不再需要sample()因為window()確保每500毫秒發出一個事件。
客戶端
JavaScript中有很多SSE用法的示例,因此為您提供一種快速的解決方案,稱為我們的控制器:
var source = new EventSource("/mine/10"); source.onmessage = function (event) {console.info(event); };我相信SseEmitter是Spring MVC的一項重大改進,它將使我們能夠編寫更健壯和更快的Web應用程序,需要即時的單向更新。
翻譯自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html
總結
以上是生活随笔為你收集整理的使用RxJava和SseEmitter进行服务器发送的事件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: market1501 data_mana
- 下一篇: 图像滤镜原理