rxjava 背压_背压加载文件– RxJava常见问题解答
rxjava 背壓
事實(shí)證明,將文件作為流進(jìn)行處理非常有效且方便。 許多人似乎忘記了,自Java 8(3年以上!)以來(lái),我們可以很容易地將任何文件變成一行代碼:
String filePath = "foobar.txt"; try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {reader.lines().filter(line -> !line.startsWith("#")).map(String::toLowerCase).flatMap(line -> Stream.of(line.split(" "))).forEach(System.out::println); }reader.lines()返回Stream<String> ,您可以對(duì)其進(jìn)行進(jìn)一步轉(zhuǎn)換。 在此示例中,我們丟棄以"#"開頭的行,并通過(guò)將其拆分為單詞來(lái)爆炸每行。 這樣,我們就可以實(shí)現(xiàn)單詞流而不是行流。 使用文本文件幾乎與使用普通Java集合一樣簡(jiǎn)單。 在RxJava中, 我們已經(jīng)學(xué)習(xí)了generate()運(yùn)算符。 它也可以在這里用于從文件創(chuàng)建健壯的行流:
Flowable<String> file = Flowable.generate(() -> new BufferedReader(new FileReader(filePath)),(reader, emitter) -> {final String line = reader.readLine();if (line != null) {emitter.onNext(line);} else {emitter.onComplete();}},reader -> reader.close() );在上述示例中, generate()運(yùn)算符稍微復(fù)雜一些。 第一個(gè)參數(shù)是狀態(tài)工廠。 每次有人訂閱此流時(shí),都會(huì)調(diào)用工廠并創(chuàng)建有狀態(tài)的BufferedReader 。 然后,當(dāng)下游運(yùn)營(yíng)商或訂戶希望接收某些數(shù)據(jù)時(shí),將調(diào)用第二個(gè)lambda(帶有兩個(gè)參數(shù))。 此lambda表達(dá)式嘗試從文件中精確提取一行,然后將其發(fā)送到下游( onNext() )或在遇到文件結(jié)尾時(shí)完成。 這很簡(jiǎn)單。 generate()的第三個(gè)可選參數(shù)是一個(gè)lambda表達(dá)式,可以對(duì)state進(jìn)行一些清理。 在我們的情況下這非常方便,因?yàn)槲覀儾粌H必須在到達(dá)文件末尾時(shí)關(guān)閉文件,而且還必須在使用者過(guò)早取消訂閱時(shí)關(guān)閉文件。
認(rèn)識(shí)Flowable.using()運(yùn)算符
這似乎需要做很多工作,尤其是當(dāng)我們已經(jīng)有了來(lái)自JDK 8的一行代碼時(shí)。事實(shí)證明,有一個(gè)類似的工廠運(yùn)算符using()很方便。 的翻譯的所有最簡(jiǎn)單的方法首先Stream從Java到Flowable是通過(guò)轉(zhuǎn)換Stream成Iterator (checked異常處理忽略):
Flowable.fromIterable(new Iterable<String>() {@Overridepublic Iterator<String> iterator() {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();} });可以簡(jiǎn)化為:
Flowable.<String>fromIterable(() -> {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator(); });但是我們忘記了關(guān)閉BufferedReader從而關(guān)閉FileReader從而關(guān)閉了文件句柄。 因此,我們引入了資源泄漏。 在這種情況下, using()運(yùn)算符的作用就像一個(gè)超級(jí)按鈕。 在某種程度上,它類似于try-with-resources語(yǔ)句。 您可以基于某些外部資源創(chuàng)建流。 當(dāng)有人訂閱或取消訂閱時(shí),將為您管理此資源的生命周期(創(chuàng)建和處置):
Flowable.using(() -> new BufferedReader(new FileReader(filePath)),reader -> Flowable.fromIterable(() -> reader.lines().iterator()),reader -> reader.close() );它與上一個(gè)generate()示例非常相似,但是中間最重要的lambda表達(dá)式卻大不相同。 我們獲得一個(gè)資源( reader )作為參數(shù),并假設(shè)返回一個(gè)Flowable (而不是單個(gè)元素)。 該lambda僅被調(diào)用一次,而不是在下游每次請(qǐng)求新項(xiàng)時(shí)都被調(diào)用。 using()運(yùn)算符給我們的是管理BufferedReaders的生命周期。 當(dāng)我們有一個(gè)狀態(tài)(可以一次生成整個(gè)Flowable ,而不是一次generate()一個(gè)using()時(shí), using()很有用。
流XML文件
…或JSON。 假設(shè)您有一個(gè)非常大的XML文件,其中包含以下條目,成千上萬(wàn)個(gè)條目:
<trkpt lat="52.23453" lon="21.01685"><ele>116</ele> </trkpt> <trkpt lat="52.23405" lon="21.01711"><ele>116</ele> </trkpt> <trkpt lat="52.23397" lon="21.0166"><ele>116</ele> </trkpt>這是標(biāo)準(zhǔn)GPS交換格式的片段,可以描述任意長(zhǎng)度的地理路線。 每個(gè)<trkpt>是具有緯度,經(jīng)度和海拔的單個(gè)點(diǎn)。 我們希望有一個(gè)跟蹤點(diǎn)流(為簡(jiǎn)單起見忽略高程),以便可以部分使用文件,而不是一次加載所有文件。 我們有三個(gè)選擇:
- DOM / JAXB –必須將所有內(nèi)容加載到內(nèi)存中并映射到Java對(duì)象。 不適用于無(wú)限長(zhǎng)的文件(甚至非常大的文件)
 - SAX –基于推送的庫(kù),一旦發(fā)現(xiàn)XML標(biāo)簽打開或關(guān)閉,就會(huì)調(diào)用回調(diào)。 似乎好一點(diǎn),但可能無(wú)法支持背壓–由庫(kù)決定何時(shí)調(diào)用回調(diào),并且無(wú)法減慢其速度
 - StAX –與SAX相似,但是我們必須積極地從XML文件中提取數(shù)據(jù)。 這對(duì)于支持背壓至關(guān)重要-我們決定何時(shí)讀取下一個(gè)數(shù)據(jù)塊
 
讓我們嘗試使用StAX和RxJava實(shí)現(xiàn)可能非常大的XML文件的解析和流傳輸。 首先,我們必須首先學(xué)習(xí)如何使用StAX 。 該解析器稱為XMLStreamReader ,它是按照以下咒語(yǔ)和詛咒序列創(chuàng)建的:
XMLStreamReader staxReader(String name) throws XMLStreamException {final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));return XMLInputFactory.newInstance().createXMLStreamReader(inputStream); }只需閉上眼睛,并確保您始終有一個(gè)地方可以復(fù)制粘貼上面的代碼片段。 情況變得更糟。 為了讀取第一個(gè)<trkpt>標(biāo)記及其屬性,我們必須編寫一些復(fù)雜的代碼:
import lombok.Value;@Value class Trackpoint {private final BigDecimal lat;private final BigDecimal lon; }Trackpoint nextTrackpoint(XMLStreamReader r) {while (r.hasNext()) {int event = r.next();switch (event) {case XMLStreamConstants.START_ELEMENT:if (r.getLocalName().equals("trkpt")) {return parseTrackpoint(r);}break;case XMLStreamConstants.END_ELEMENT:if (r.getLocalName().equals("gpx")) {return null;}break;}}return null; }Trackpoint parseTrackpoint(XMLStreamReader r) {return new Trackpoint(new BigDecimal(r.getAttributeValue("", "lat")),new BigDecimal(r.getAttributeValue("", "lon"))); }API是低級(jí)報(bào)價(jià),并且?guī)缀跏枪哦?一切都發(fā)生在一個(gè)巨大的循環(huán)中,該循環(huán)讀取... int類型的東西 。 此int可以是START_ELEMENT , END_ELEMENT或我們不感興趣的其他一些東西。請(qǐng)記住,我們正在讀取XML文件,但不是逐行或逐字符,而是通過(guò)邏輯XML標(biāo)記(標(biāo)記)。 因此,如果發(fā)現(xiàn)<trkpt>元素的打開,我們將對(duì)其進(jìn)行解析,否則我們將繼續(xù)。 第二個(gè)重要條件是當(dāng)我們發(fā)現(xiàn)關(guān)閉</gpx> ,這應(yīng)該是GPX文件中的最后一件事。 在這種情況下,我們返回null ,表示XML文件結(jié)束。
感覺復(fù)雜嗎? 實(shí)際上,這是讀取具有恒定內(nèi)存使用量的大型XML(與文件大小無(wú)關(guān))的最簡(jiǎn)單方法。 所有這些與RxJava有何關(guān)系? 在這一點(diǎn)上,我們可以很容易地構(gòu)建Flowable<Trackpoint> 。 是的, Flowable ,沒有Observable (見: Obsevable與Observable )。 這樣的流將完全支持背壓,這意味著它將以適當(dāng)?shù)乃俣茸x取文件:
Flowable<Trackpoint> trackpoints = generate(() -> staxReader("track.gpx"),this::pushNextTrackpoint,XMLStreamReader::close);void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {final Trackpoint trkpt = nextTrackpoint(reader);if (trkpt != null) {emitter.onNext(trkpt);} else {emitter.onComplete();} }哇,如此簡(jiǎn)單,如此反壓! [1]我們首先創(chuàng)建一個(gè)XMLStreamReader ,并確保在文件結(jié)束或有人取消訂閱時(shí)將其關(guān)閉。 請(qǐng)記住,每個(gè)訂閱者將一次又一次打開并開始解析相同的文件。 中間的lambda表達(dá)式僅使用狀態(tài)變量( XMLStreamReader )并發(fā)出另一個(gè)跟蹤點(diǎn)。 所有這些似乎都很晦澀,事實(shí)是! 但是,現(xiàn)在我們有了一個(gè)使用很少的資源就可以從一個(gè)可能很大的文件中獲取的反向壓力感知流。 我們可以同時(shí)處理跟蹤點(diǎn),也可以將它們與其他數(shù)據(jù)源組合在一起。 在下一篇文章中,我們將學(xué)習(xí)如何以非常相似的方式加載JSON。
翻譯自: https://www.javacodegeeks.com/2017/09/loading-files-backpressure-rxjava-faq.html
rxjava 背壓
總結(jié)
以上是生活随笔為你收集整理的rxjava 背压_背压加载文件– RxJava常见问题解答的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
                            
                        - 上一篇: 联想b460报价(联想B4650)
 - 下一篇: 淘宝网tcl电视底座