rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件
rxjava 循環(huán)發(fā)送事件
Spring Framework 4.2 GA即將發(fā)布,讓我們看一下它提供的一些新功能。 引起我注意的一個(gè)事件是一個(gè)簡(jiǎn)單的新類(lèi)SseEmitter ,它是對(duì)Spring MVC控制器中容易使用的發(fā)送事件的抽象。 SSE是一項(xiàng)技術(shù),可讓您在一個(gè)HTTP連接內(nèi)沿一個(gè)方向?qū)?shù)據(jù)從服務(wù)器流式傳輸?shù)綖g覽器。 聽(tīng)起來(lái)像是websocket可以做什么的子集。 但是,由于它是一個(gè)簡(jiǎn)單得多的協(xié)議,因此可以在不需要全雙工的情況下使用,例如實(shí)時(shí)推動(dòng)股價(jià)變化或顯示長(zhǎng)時(shí)間運(yùn)行的進(jìn)程。 這將是我們的例子。
假設(shè)我們有一個(gè)具有以下API的虛擬硬幣礦工:
public interface CoinMiner {BigDecimal mine() {//...} }每次調(diào)用mine()我們都必須等待幾秒鐘,才能獲得大約1個(gè)硬幣的回報(bào)(平均)。 如果要挖掘多個(gè)硬幣,則必須多次調(diào)用此方法:
@RestController public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}這項(xiàng)工作,我們可以請(qǐng)求/mine/10和mine()方法將執(zhí)行10次。 到目前為止,一切都很好。 但是挖掘是一項(xiàng)占用大量CPU的任務(wù),將計(jì)算分散到多個(gè)內(nèi)核將是有益的。 此外,即使使用并行化,我們的API端點(diǎn)也相當(dāng)慢,我們必須耐心等待直到所有工作完成而沒(méi)有任何進(jìn)度通知。 讓我們首先修復(fù)并行性–但是,由于并行流無(wú)法控制底層線程池,因此我們來(lái)使用顯式的ExecutorService :
@Component class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}客戶(hù)端代碼必須顯式提供ExecutorService (只是設(shè)計(jì)選擇):
@RequestMapping("/mine/{count}") void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join); }首先多次調(diào)用mineAsync ,然后(作為第二階段)等待所有期貨完成并join ,這mineAsync重要。 很容易寫(xiě):
IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);但是,由于Java 8中流的惰性,該任務(wù)將按順序執(zhí)行! 如果您還不習(xí)慣流的懶惰,請(qǐng)始終從下至上閱讀它們:我們要求join一些將來(lái)的內(nèi)容,以便流上升并只調(diào)用一次mineAsync() (惰性!),并將其傳遞給join() 。 當(dāng)join()完成時(shí),它再次上升并要求另一個(gè)Future 。 通過(guò)使用collect()我們強(qiáng)制所有mineAsync()執(zhí)行,開(kāi)始所有異步計(jì)算。 稍后,我們等待每一個(gè)。
介紹
現(xiàn)在該變得更具React性了(我說(shuō)過(guò)了)。 控制器可以返回SseEmitter的實(shí)例。 從處理程序方法return后,容器線程將被釋放并可以處理更多即將到來(lái)的請(qǐng)求。 但是連接沒(méi)有關(guān)閉,客戶(hù)端一直在等待! 我們應(yīng)該做的是保留對(duì)SseEmitter實(shí)例的引用,并在以后從另一個(gè)線程調(diào)用其send()和complete方法。 例如,我們可以啟動(dòng)一個(gè)長(zhǎng)時(shí)間運(yùn)行的進(jìn)程,并保持send()從任意線程進(jìn)行進(jìn)度。 完成該過(guò)程后,我們complete() SseEmitter ,最后關(guān)閉HTTP連接(至少?gòu)倪壿婼seEmitter ,請(qǐng)記住Keep-alive )。 在下面的示例中,我們有一堆CompletableFuture ,當(dāng)每個(gè)CompletableFuture完成時(shí),我們只需將1發(fā)送給客戶(hù)端( notifyProgress() )。 當(dāng)所有期貨都完成后,我們完成流( thenRun(sseEmitter::complete) ),關(guān)閉連接:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);} }private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList()); }調(diào)用此方法將產(chǎn)生以下響應(yīng)(注意Content-Type ):
< HTTP/1.1 200 OK < Content-Type: text/event-stream;charset=UTF-8 < Transfer-Encoding: chunked < data:1data:1data:1data:1* Connection #0 to host localhost left intact稍后我們將學(xué)習(xí)如何在客戶(hù)端解釋這種響應(yīng)。 現(xiàn)在暫時(shí)讓我們整理一下設(shè)計(jì)。
與引進(jìn)RxJava
上面的代碼有效,但是看起來(lái)很混亂。 我們實(shí)際上有一系列事件,每個(gè)事件都代表計(jì)算的進(jìn)度。 計(jì)算最終完成,因此流也應(yīng)發(fā)出信號(hào)結(jié)束。 聽(tīng)起來(lái)就像是Observable ! 我們從重構(gòu)CoinMiner開(kāi)始,以返回Observable<BigDecimal :
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject; }每當(dāng)mineMany()返回的事件出現(xiàn)在Observable ,我們就mineMany()那么多硬幣。 當(dāng)所有期貨都完成后,我們也完成了交易。 在實(shí)現(xiàn)方面,這看起來(lái)還沒(méi)有改善,但是從控制器的角度來(lái)看,它有多干凈:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }調(diào)用coinMiner.mineMany()我們只需訂閱事件。 事實(shí)證明Observable和SseEmitter方法匹配1:1。 這里發(fā)生的事情是不言自明的:啟動(dòng)異步計(jì)算,每當(dāng)后臺(tái)計(jì)算發(fā)出任何進(jìn)度信號(hào)時(shí),將其轉(zhuǎn)發(fā)給客戶(hù)端。 好的,讓我們回到實(shí)現(xiàn)上。 由于我們將CompletableFuture和Observable混合使用,因此看起來(lái)很混亂。 我已經(jīng)描述了如何僅使用一個(gè)元素將CompletableFuture轉(zhuǎn)換為Observable 。 這是一個(gè)概述,包括rx.Single從RxJava 1.0.13開(kāi)始發(fā)現(xiàn)的rx.Single抽象(此處未使用):
public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}將這些實(shí)用程序運(yùn)算符放在某個(gè)地方,我們可以改善實(shí)現(xiàn)并避免混合使用兩個(gè)API:
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables); }Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future); }RxJava有一個(gè)內(nèi)置的運(yùn)算符,用于將多個(gè)Observable合并為一個(gè),我們的每個(gè)基礎(chǔ)Observable發(fā)出一個(gè)事件。
深入研究RxJava運(yùn)算符
讓我們使用RxJava的功能來(lái)稍微改善流式傳輸。
scan()
當(dāng)前,每次我們開(kāi)采一枚硬幣時(shí),我們都會(huì)send(1)客戶(hù)端send(1)事件。 這意味著每個(gè)客戶(hù)都必須跟蹤其已經(jīng)收到的硬幣數(shù)量,以便計(jì)算總的計(jì)算數(shù)量。 如果服務(wù)器總是發(fā)送總金額而不是增量,那就太好了。 但是,我們不想更改實(shí)現(xiàn)。 事實(shí)證明,使用Observable.scan()運(yùn)算符非常簡(jiǎn)單:
@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();} }scan()運(yùn)算符接收上一個(gè)事件和當(dāng)前事件,并將它們組合在一起。 通過(guò)應(yīng)用BigDecimal::add我們只需將所有數(shù)字相加即可。 例如1、1 + 1,(1 + 1)+ 1,依此類(lèi)推。 scan()類(lèi)似于flatMap() ,但保留中間值。
用sample()采樣
可能是因?yàn)槲覀兊暮蠖朔?wù)產(chǎn)生了太多的進(jìn)度更新,我們無(wú)法使用。 我們不想給客戶(hù)端增加不相關(guān)的更新并飽和帶寬。 每秒最多發(fā)送兩次更新聽(tīng)起來(lái)很合理。 幸運(yùn)的是,RxJava也有一個(gè)內(nèi)置的運(yùn)算符:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);sample()將定期查看底層流,并僅發(fā)出最新的項(xiàng),并丟棄中間項(xiàng)。 幸運(yùn)的是,我們使用scan()即時(shí)聚合了項(xiàng)目,因此我們不會(huì)丟失任何更新。
window() –恒定的發(fā)射間隔
不過(guò)有一個(gè)陷阱。 如果在選定的500毫秒內(nèi)沒(méi)有新內(nèi)容出現(xiàn), sample()將不會(huì)兩次發(fā)出相同的項(xiàng)目。 很好,但是請(qǐng)記住我們正在通過(guò)TCP / IP連接推送這些更新。 最好定期將更新發(fā)送給客戶(hù)端,即使在此期間什么也沒(méi)發(fā)生–只是為了保持連接的正常運(yùn)行,就像ping 。 可能有多種方法可以實(shí)現(xiàn)此要求,例如,涉及timeout()運(yùn)算符。 我選擇使用window()運(yùn)算符每500毫秒對(duì)所有事件進(jìn)行分組:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);這是一個(gè)棘手的問(wèn)題。 首先,我們將所有進(jìn)度更新分組在500毫秒的窗口中。 然后,我們使用reduce來(lái)計(jì)算在此時(shí)間段內(nèi)開(kāi)采的硬幣的總數(shù)(類(lèi)似于scan() )。 如果在此期間未開(kāi)采任何硬幣,我們只需返回ZERO 。 最后,我們使用scan()匯總每個(gè)窗口的小計(jì)。 我們不再需要sample()因?yàn)閣indow()確保每500毫秒發(fā)出一個(gè)事件。
客戶(hù)端
JavaScript中有很多SSE用法示例,因此為您提供一種調(diào)用我們的控制器的快速解決方案:
var source = new EventSource("/mine/10"); source.onmessage = function (event) {console.info(event); };我相信SseEmitter是Spring MVC的一項(xiàng)重大改進(jìn),它將使我們能夠編寫(xiě)更健壯和更快的Web應(yīng)用程序,需要即時(shí)的單向更新。
翻譯自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html
rxjava 循環(huán)發(fā)送事件
總結(jié)
以上是生活随笔為你收集整理的rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 340开头的身份证是哪里的 340开头的
- 下一篇: 鸡是哺乳动物吗 来这里看专业回答了