使用RxNetty访问Meetup的流API
本文將涉及多個主題:響應式編程,HTTP,解析JSON以及與社交API集成。 完全在一個用例中:我們將通過非夸張的RxNetty庫實時加載和處理新的metup.com事件,結合Netty框架的強大功能和RxJava庫的靈活性。 Meetup提供了公開可用的流API ,可實時推送世界各地注冊的每一個Meetup。 只需瀏覽至stream.meetup.com/2/open_events并觀察JSON塊如何緩慢地出現(xiàn)在屏幕上。 每當有人創(chuàng)建新事件時,自包含的JSON就會從服務器推送到您的瀏覽器。 這意味著這樣的請求永無止境,相反,只要需要,我們就會不斷接收部分數(shù)據(jù)。 我們已經(jīng)在將Twitter4J變成RxJava的Observable中研究了類似的情況。 每個新的Meetup事件都會發(fā)布一個獨立的JSON文檔,與此類似(省略很多細節(jié)):
{ "id" : "219088449","name" : "Silver Wings Brunch","time" : 1421609400000,"mtime" : 1417814004321,"duration" : 900000,"rsvp_limit" : 0,"status" : "upcoming","event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/","group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co","state" : "CA"...},"venue" : { "address_1" : "26860 Ortega Highway","city" : "San Juan Capistrano","country" : "US"...},"venue_visibility" : "public","visibility" : "public","yes_rsvp_count" : 1... }每當我們長時間輪詢的HTTP連接(帶有Transfer-Encoding: chunked響應標頭)推送此類JSON時,我們都希望對其進行解析并以某種方式進一步傳遞。 我們討厭回調(diào),因此RxJava似乎是一個合理的選擇(認為: Observable<Event> )。
步驟1:使用RxNetty接收原始數(shù)據(jù)
我們不能使用普通的HTTP客戶端,因為它們專注于請求-響應語義。 這里沒有任何響應,我們只是永遠保持打開的連接,并在數(shù)據(jù)到達時使用它們。 RxJava具有開箱即用的RxApacheHttp庫,但它假定為text/event-stream內(nèi)容類型 。 相反,我們將使用底層的通用RxNetty庫。 它是Netty(duh!)的包裝,并且能夠?qū)崿F(xiàn)任意的 TCP / IP(包括HTTP)以及UDP客戶端和服務器。 如果您不了解Netty,則它是基于數(shù)據(jù)包的,而不是面向流的,因此我們可以預期每次Meetup推送都會有一個Netty事件。 該API當然不是簡單明了的,但是一旦您使用它,它就會變得有意義:
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443).pipelineConfigurator(new HttpClientPipelineConfigurator<>()).withSslEngineFactory(DefaultFactories.trustAll()).build();final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events")); final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent); final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));首先,我們創(chuàng)建HttpClient并設置SSL(請注意,關于服務器證書的trustAll()可能不是最佳的生產(chǎn)設置)。 稍后,我們submit() GET請求并接收Observable<HttpClientResponse<ByteBuf>>作為回報。 ByteBuf是Netty對通過網(wǎng)絡發(fā)送或接收的一堆字節(jié)的抽象。 該觀察結果將立即告訴我們從Meetup收到的每條數(shù)據(jù)。 從響應中提取ByteBuf ,我們將其轉(zhuǎn)換為包含上述JSON的String 。 到目前為止,一切正常。
步驟2:將數(shù)據(jù)包與JSON文檔對齊
Netty非常強大,因為它不會掩蓋泄漏抽象所固有的復雜性。 每次通過TCP / IP線路接收到某些內(nèi)容時,都會通知我們。 您可能會相信,當服務器發(fā)送100字節(jié)時,客戶端的Netty會通知我們有關這100字節(jié)的信息。 但是,TCP / IP堆棧可以自由地拆分和合并您通過有線發(fā)送的數(shù)據(jù),特別是因為它假定是流,因此如何將其拆分為數(shù)據(jù)包無關緊要。 Netty的文檔中對此警告做了很大的解釋。 對我們意味著什么? 當Meetup發(fā)送單個事件時,我們可能僅收到一個可觀察到的chunks String 。 但是同樣可以將其劃分為任意數(shù)量的數(shù)據(jù)包,因此chunks將發(fā)出多個String 。 更糟糕的是,如果Meetup接連發(fā)送兩個事件,則它們可能適合一個數(shù)據(jù)包。 在這種情況下, chunks將發(fā)出一個帶有兩個獨立JSON文檔的String 。 事實上,我們不能假設JSON字符串和收到的網(wǎng)絡數(shù)據(jù)包之間有任何對齊。 我們所知道的是,代表事件的各個JSON文檔由換行符分隔。 令人驚訝的是, RxJavaString官方附加組件RxJavaString提供了一種精確的方法:
Observable jsonChunks = StringObservable.split(chunks, "\n");實際上,甚至還有更簡單的StringObservable.byLine(chunks) ,但它使用的是平臺相關的行尾。 最好在官方文檔中解釋split()作用:
現(xiàn)在我們可以安全地解析jsonChunks發(fā)出的每個String了:
步驟3:解析JSON
有趣的是,這一步驟并不是那么簡單。 我承認,我排序的享受WSDL時間,因為我很容易,可預見生成如下web服務的合同Java模型。 JSON,特別是在JSON模式的邊緣市場滲透方面,基本上是集成的“狂野西部”。 通常,您會得到非正式的文檔或請求和響應的樣本。 沒有類型信息或格式,無論字段是否為必填項,等等。此外,由于我不情愿使用地圖映射 (在那里,Clojure程序員),為了使用基于JSON的REST服務,我必須自己編寫映射POJO。 好吧,有解決方法。 首先,我舉了一個由Meetup流API生成的JSON的代表性示例,并將其放在src/main/json/meetup/event.json 。 然后,我使用jsonschema2pojo-maven-plugin ( 也存在Gradle和Ant版本)。 插件的名稱令人困惑,它還可以與JSON示例(不僅是架構)一起使用以生成Java模型:
<plugin><groupId>org.jsonschema2pojo</groupId><artifactId>jsonschema2pojo-maven-plugin</artifactId><version>0.4.7</version><configuration><sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory><targetPackage>com.nurkiewicz.meetup.generated</targetPackage><includeHashcodeAndEquals>true</includeHashcodeAndEquals><includeToString>true</includeToString><initializeCollections>true</initializeCollections><sourceType>JSON</sourceType><useCommonsLang3>true</useCommonsLang3><useJodaDates>true</useJodaDates><useLongIntegers>true</useLongIntegers><outputDirectory>target/generated-sources</outputDirectory></configuration><executions><execution><id>generate-sources</id><phase>generate-sources</phase><goals><goal>generate</goal></goals></execution></executions> </plugin>此時,Maven將創(chuàng)建與Jackson兼容的Event.java , Venue.java , Group.java等:
private Event parseEventJson(String jsonStr) {try {return objectMapper.readValue(jsonStr, Event.class);} catch (IOException e) {throw new UncheckedIOException(e);} }很好,它很好:
final Observableevents = jsonChunks.map(this::parseEventJson);步驟5:獲利!!!
有了Observable<Event>我們可以實現(xiàn)一些非常有趣的用例。 是否要查找剛剛創(chuàng)建的波蘭所有聚會的名稱? 當然!
events.filter(event -> event.getVenue() != null).filter(event -> event.getVenue().getCountry().equals("pl")).map(Event::getName).forEach(System.out::println);尋找統(tǒng)計信息每分鐘創(chuàng)建多少個事件? 沒問題!
events.buffer(1, TimeUnit.MINUTES).map(List::size).forEach(count -> log.info("Count: {}", count));或者,也許您想繼續(xù)搜索將來最遠的聚會,而不是尋找比已發(fā)現(xiàn)的聚會更近的聚會?
events.filter(event -> event.getTime() != null).scan(this::laterEventFrom).distinct().map(Event::getTime).map(Instant::ofEpochMilli).forEach(System.out::println);//...private Event laterEventFrom(Event first, Event second) {return first.getTime() > second.getTime() ?first :second; }此代碼過濾掉未知時間的事件,發(fā)出當前事件或前一個事件( scan() ),具體取決于后面的事件,過濾出重復事件并顯示時間。 這個運行了幾分鐘的小程序已經(jīng)發(fā)現(xiàn)一個計劃于2015年11月創(chuàng)建的聚會,而在撰寫本文時,聚會是2014年12月。 可能性是無止境的。
希望我能對如何輕松地將各種技術融合在一起有一個很好的了解:反應式編程以編寫超快速的網(wǎng)絡代碼,無需樣板代碼的類型安全JSON解析,以及RxJava來快速處理事件流。 請享用!
翻譯自: https://www.javacodegeeks.com/2014/12/accessing-meetups-streaming-api-with-rxnetty.html
總結
以上是生活随笔為你收集整理的使用RxNetty访问Meetup的流API的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Spring Integration
- 下一篇: 启动进程linux命令(启动进程 lin