迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中
服務(wù)器發(fā)送的事件 (或簡(jiǎn)稱為SSE )是非常有用的協(xié)議,它允許服務(wù)器通過(guò)HTTP將數(shù)據(jù)推送到客戶端。 這是我們的網(wǎng)絡(luò)瀏覽器支持的年齡,但令人驚訝的是, JAX-RS規(guī)范在很長(zhǎng)一段時(shí)間內(nèi)都忽略了這一點(diǎn)。 盡管Jersey提供了適用于SSE媒體類型的擴(kuò)展名,但該API尚未正式化,因此不能移植到其他JAX-RS實(shí)現(xiàn)中。
幸運(yùn)的是, JAX-RS 2.1 (也稱為JSR-370)通過(guò)將SSE支持(客戶端和服務(wù)器端)作為正式規(guī)范的一部分,已經(jīng)改變了這一點(diǎn)。 在今天的帖子中,我們將研究如何使用最近發(fā)布的出色的Apache CXF框架3.2.0版將SSE支持集成到現(xiàn)有的Java REST(ful) Web服務(wù)中。 實(shí)際上,除了自舉之外,實(shí)際上沒(méi)有CXF特定的東西,所有示例都應(yīng)在實(shí)現(xiàn)JAX-RS 2.1規(guī)范的任何其他框架中工作。
事不宜遲,讓我們開(kāi)始吧。 由于這些天大量的Java項(xiàng)目都建立在出色的Spring Framework之上,因此我們的示例應(yīng)用程序?qū)⑹褂肧pring Boot和Apache CXF Spring Boot Integration使我們Swift起步。 老好伙伴Apache Maven也可以通過(guò)管理項(xiàng)目依賴項(xiàng)來(lái)幫助我們。
org.springframework.bootspring-boot-starter1.5.8.RELEASEorg.apache.cxfcxf-rt-frontend-jaxrs3.2.0org.apache.cxfcxf-spring-boot-starter-jaxrs3.2.0org.apache.cxfcxf-rt-rs-client3.2.0org.apache.cxfcxf-rt-rs-sse3.2.0在后臺(tái), Apache CXF正在使用Atmosphere框架來(lái)實(shí)現(xiàn)SSE傳輸,因此這是我們必須包括的另一個(gè)依賴項(xiàng)。
org.atmosphereatmosphere-runtime2.4.14有關(guān)依賴于Atmosphere框架的詳細(xì)信息導(dǎo)致需要提供其他配置設(shè)置,即transportId ,以確保在運(yùn)行時(shí)拾取支持SSE的傳輸。 相關(guān)詳細(xì)信息可以添加到application.yml文件中:
cxf:servlet:init:transportId: http://cxf.apache.org/transports/http/sse太好了,所以基礎(chǔ)就在那里,繼續(xù)前進(jìn)。 我們將要構(gòu)建的REST(ful) Web服務(wù)將在SSE流中公開(kāi)虛構(gòu)的CPU平均負(fù)載(為簡(jiǎn)單起見(jiàn),隨機(jī)生成)。 Stats類將構(gòu)成我們的數(shù)據(jù)模型。
public class Stats {private long timestamp;private int load;public Stats() {}public Stats(long timestamp, int load) {this.timestamp = timestamp;this.load = load;}// Getters and setters are omitted... }說(shuō)到流, Reactive Streams規(guī)范已進(jìn)入Java 9 ,希望我們能看到Java社區(qū)加速采用反應(yīng)式編程模型。 此外,在具有Reactive Streams支持的情況下,開(kāi)發(fā)支持SSE的 REST(ful) Web服務(wù)將變得更加容易和直接。 為此,讓我們將RxJava 2集成到示例應(yīng)用程序中。
io.reactivex.rxjava2rxjava2.1.6這是開(kāi)始使用我們的StatsRestService類(典型的JAX-RS資源實(shí)現(xiàn))的好時(shí)機(jī)。 JAX-RS 2.1中的關(guān)鍵SSE功能以Sse上下文對(duì)象為中心,可以像這樣注入。
@Service @Path("/api/stats") public class StatsRestService {@Context public void setSse(Sse sse) {// Access Sse context here}在Sse上下文之外,我們可以訪問(wèn)兩個(gè)非常有用的抽象:例如SseBroadcaster和OutboundSseEvent.Builder :
private SseBroadcaster broadcaster; private Builder builder;@Context public void setSse(Sse sse) {this.broadcaster = sse.newBroadcaster();this.builder = sse.newEventBuilder(); }您可能已經(jīng)猜到了, OutboundSseEvent.Builder構(gòu)造了OutboundSseEvent類的實(shí)例,這些實(shí)例可以通過(guò)電線發(fā)送,而SseBroadcaster則向所有連接的客戶端廣播相同的SSE流。 話雖如此,我們可以生成OutboundSseEvent的流并將其分發(fā)給感興趣的每個(gè)人:
private static void subscribe(final SseBroadcaster broadcaster, final Builder builder) {Flowable.interval(1, TimeUnit.SECONDS).zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)).subscribeOn(Schedulers.single()).subscribe(broadcaster::broadcast); }private static Flowable<OutboundSseEvent.Builder> eventsStream(final Builder builder) {return Flowable.generate(emitter -> emitter.onNext(builder.name("stats"))); }如果您不熟悉RxJava 2 ,請(qǐng)不用擔(dān)心,這就是這里發(fā)生的情況。 eventsStream方法為stats類型的SSE事件返回有效無(wú)限的OutboundSseEvent.Builder實(shí)例流。 訂閱方法稍微復(fù)雜一些。 我們首先創(chuàng)建一個(gè)流,該流每秒發(fā)出序號(hào),例如fe 0,1,2,3,4,5,6,… ,依此類推。 稍后,我們將此流與eventsStream方法返回的流合并,實(shí)質(zhì)上將兩個(gè)流合并為一個(gè)流,該流每秒發(fā)出一個(gè)元組(number, OutboundSseEvent.Builder ) 。 公平地說(shuō),該元組對(duì)我們不是很有用,因此我們將其轉(zhuǎn)換為OutboundSseEvent類的實(shí)例,將數(shù)字視為SSE事件標(biāo)識(shí)符:
private static final Random RANDOM = new Random();private static OutboundSseEvent createSseEvent(OutboundSseEvent.Builder builder, long id) {return builder.id(Long.toString(id)).data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100))).mediaType(MediaType.APPLICATION_JSON_TYPE).build(); }OutboundSseEvent可以使用常規(guī)MessageBodyWriter解析策略在data屬性中攜帶將相對(duì)于指定的mediaType進(jìn)行序列化的任何有效負(fù)載。 一旦獲得OutboundSseEvent實(shí)例,便使用SseBroadcaster :: broadcast方法將其發(fā)送出去。 請(qǐng)注意,我們使用subscribeOn運(yùn)算符將控制流移交給了另一個(gè)線程,這通常是您一直在做的事情。
很好,希望現(xiàn)在清除了流部分,但是我們?nèi)绾尾拍苷嬲嗛哠seBroadcaster發(fā)出的SSE事件? 這比您想像的要容易:
@GET @Path("broadcast") @Produces(MediaType.SERVER_SENT_EVENTS) public void broadcast(@Context SseEventSink sink) {broadcaster.register(sink); }我們都準(zhǔn)備好了。 這里最重要的是正在生成的內(nèi)容類型,應(yīng)將其設(shè)置為MediaType.SERVER_SENT_EVENTS 。 在這種情況下, SseEventSink的上下文實(shí)例變得可用,并且可以向SseBroadcaster實(shí)例注冊(cè)。
要查看我們的JAX-RS資源,我們需要使用例如JAXRSServerFactoryBean引導(dǎo)服務(wù)器實(shí)例,并在此過(guò)程中配置所有必需的提供程序。 請(qǐng)注意,我們也在顯式指定要使用的傳輸,在這種情況下為SseHttpTransportFactory.TRANSPORT_ID 。
@Configuration @EnableWebMvc public class AppConfig extends WebMvcConfigurerAdapter {@Beanpublic Server rsServer(Bus bus, StatsRestService service) {JAXRSServerFactoryBean endpoint = new JAXRSServerFactoryBean();endpoint.setBus(bus);endpoint.setAddress("/");endpoint.setServiceBean(service);endpoint.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);endpoint.setProvider(new JacksonJsonProvider());return endpoint.create();}@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/static/**").addResourceLocations("classpath:/web-ui/"); } }要結(jié)束循環(huán),我們只需要為Spring Boot應(yīng)用程序提供運(yùn)行器即可:
@SpringBootApplication public class SseServerStarter { public static void main(String[] args) {SpringApplication.run(SseServerStarter.class, args);} }現(xiàn)在,如果我們運(yùn)行該應(yīng)用程序并使用多個(gè)Web瀏覽器或同一瀏覽器中的不同選項(xiàng)卡導(dǎo)航到http:// localhost:8080 / static / broadcast.html ,我們將觀察到所有事件內(nèi)部繪制的相同事件流:
不錯(cuò),廣播當(dāng)然是一個(gè)有效的用例,但是在每次端點(diǎn)調(diào)用時(shí)返回一個(gè)獨(dú)立的SSE流又如何呢? 簡(jiǎn)單,只需使用SseEventSink方法(例如send和close )即可直接操作SSE流。
@GET @Path("sse") @Produces(MediaType.SERVER_SENT_EVENTS) public void stats(@Context SseEventSink sink) {Flowable.interval(1, TimeUnit.SECONDS).zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)).subscribeOn(Schedulers.single()).subscribe(sink::send, ex -> {}, sink::close); }這次,如果我們運(yùn)行該應(yīng)用程序并使用多個(gè)Web瀏覽器或同一瀏覽器中的不同選項(xiàng)卡導(dǎo)航到http:// localhost:8080 / static / index.html ,我們將觀察到完全不同的圖表:
出色的服務(wù)器端API確實(shí)非常簡(jiǎn)潔且易于使用。 但是在客戶端方面,我們可以使用Java應(yīng)用程序中的SSE流嗎? 答案是肯定的。 JAX-RS 2.1還概述了客戶端API,其核心是SseEventSource 。
final WebTarget target = ClientBuilder.newClient().register(JacksonJsonProvider.class).target("http://localhost:8080/services/api/stats/sse");try (final SseEventSource eventSource =SseEventSource.target(target).reconnectingEvery(5, TimeUnit.SECONDS).build()) {eventSource.register(event -> {final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);System.out.println("name: " + event.getName());System.out.println("id: " + event.getId());System.out.println("comment: " + event.getComment());System.out.println("data: " + stats.getLoad() + ", " + stats.getTimestamp());System.out.println("---------------");});eventSource.open();// Just consume SSE events for 10 secondsThread.sleep(10000); }如果運(yùn)行此代碼段(假設(shè)服務(wù)器也已啟動(dòng)并且正在運(yùn)行),我們將在控制臺(tái)中看到類似的內(nèi)容(您可能還記得,數(shù)據(jù)是隨機(jī)生成的)。
name: stats id: 0 comment: null data: 82, 1509376080027 --------------- name: stats id: 1 comment: null data: 68, 1509376081033 --------------- name: stats id: 2 comment: null data: 12, 1509376082028 --------------- name: stats id: 3 comment: null data: 5, 1509376083028 ---------------...如我們所見(jiàn),服務(wù)器端的OutboundSseEvent變?yōu)榭蛻舳说腎nboundSseEvent 。 客戶端可以使用通常的MessageBodyReader解析策略,通過(guò)指定預(yù)期的媒體類型來(lái)消耗數(shù)據(jù)屬性中可以反序列化的任何有效負(fù)載。
單篇文章中壓縮了很多內(nèi)容。 而且,關(guān)于SSE和JAX-RS 2.1的更多信息我們?cè)谶@里沒(méi)有涉及,例如使用HttpHeaders.LAST_EVENT_ID_HEADER或配置重新連接延遲。 如果有興趣學(xué)習(xí)的話,這些可能是即將發(fā)布的帖子的重要話題。
總而言之, JAX-RS對(duì)SSE的支持是我們?cè)S多人期待已久的事情。 最后,它在那里,請(qǐng)嘗試一下!
完整的項(xiàng)目資源可在Github上找到 。
翻譯自: https://www.javacodegeeks.com/2017/10/better-late-never-sse-server-sent-events-now-jax-rs.html
總結(jié)
以上是生活随笔為你收集整理的迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 电脑绝地求生赛季通行证(绝地求生最新赛季
- 下一篇: 电脑能用的wifi钥匙(电脑上好用的wi