Java Streams,第 1 部分: java.util.stream 库简介
? ?Java SE 8 中主要的新語言特性是拉姆達表達式。可以將拉姆達表達式想作一種匿名方法;像方法一樣,拉姆達
表達式具有帶類型的參數、主體和返回類型。但真正的亮點不是拉姆達表達式本身,而是它們所實現的功能。拉姆達表
達式使得將行為表達為數據變得很容易,從而使開發具有更強表達能力、更強大的庫成為可能。
? ?Java SE 8 中引入的一個這樣的庫是?java.util.stream?包 (Streams),它有助于為各種數據來源上的可能的并
行批量操作建立簡明的、聲明性的表達式。較早的 Java 版本中也編寫過像 Streams 這樣的庫,但沒有緊湊的行為即數
據語言特性,而且它們的使用很麻煩,以至于沒有人愿意使用它們。您可以將 Streams 視為 Java 中第一個充分利用了
拉姆達表達式的強大功能的庫,但它沒有什么特別奇妙的地方(盡管它被緊密集成到核心 JDK庫中)。
? ?Streams 不是該語言的一部分 — 它是一個精心設計的庫,充分利用了一些較新的語言特性。
使用流的查詢
?
? ? ? ?本文是一個深入探索?java.util.stream?庫的系列的第一部分。本期介紹該庫,并概述它的優勢和設計原理。
在后續幾期中,您將學習如何使用流來聚合和匯總數據,了解該庫的內部原理和性能優化。
? ? ? ?流的最常見用法之一是表示對集合中的數據的查詢。清單 1 給出了一個簡單的流管道示例。該管道獲取一個在
買家和賣家之間模擬購買的交易集合,并計算生活在紐約的賣家的交易總價值。
清單 1. 一個簡單的流管道
?
int totalSalesFromNY= txns.stream().filter(t -> t.getSeller().getAddr().getState().equals("NY")).mapToInt(t -> t.getAmount()).sum();
?
?
? ?filter()?操作僅選擇與來自紐約的賣家進行的交易。mapToInt()?操作選擇所關注交易的交易金額。最終的?sum()?操作將對這些金額求和。
? ? ? 作為來自清單 1 中的相同領域的更復雜查詢,考慮 “打印與年齡超過 65 歲的買家進行交易的賣家姓名,并按姓名
排序。”以舊式的(命令)方式編寫此查詢可能會得到類似清單 2 的結果。這個例子非常容易理解,即使比較挑剔的人
也會發現這個查詢的命令版本(for?循環)非常簡單,而且需要更少的代碼行即可表達。為了體現流方法的好處,示
例問題沒有必要變得過于復雜。流利用了這種最強大的計算原理:組合。通過使用簡單的構建塊(過濾、映射、排序、
聚合)來組合復雜的操作,在問題變得比相同數據源上更加臨時的計算更復雜時,流查詢更可能保留寫入和讀取的簡單性。
清單 2. 對一個集合的臨時查詢
?
Set<Seller> sellers = new HashSet<>(); for (Txn t : txns) {if (t.getBuyer().getAge() >= 65)sellers.add(t.getSeller()); } List<Seller> sorted = new ArrayList<>(sellers); Collections.sort(sorted, new Comparator<Seller>() {public int compare(Seller a, Seller b) {return a.getName().compareTo(b.getName());} }); for (Seller s : sorted)System.out.println(s.getName());? ? ? ?盡管此查詢比第一個查詢稍微復雜一點,但很明顯采用命令方法的結果代碼的組織結構和可讀性已開始下降。讀者首先看到的
不是計算的起點和終點;而是一個一次性中間結果的聲明。要閱讀此代碼,您需要在頭腦中緩存大量上下文,然后才能明白代碼的
實際用途。清單 3?展示了可以如何使用 Streams 重寫此查詢。
清單 3. 使用 Streams 表達的清單 2 中的查詢
?
txns.stream().filter(t -> t.getBuyer().getAge() >= 65).map(Txn::getSeller).distinct().sorted(comparing(Seller::getName)).map(Seller::getName).forEach(System.out::println);? ? ? 清單 3 中的代碼更容易閱讀,因為用戶既沒有被 “垃圾” 變量(比如?sellers?和?sorted)分心,也不需要在閱讀代碼的同時跟
蹤記錄大量上下文;而且代碼看起來幾乎就像問題陳述一樣。可讀性更強的代碼也更不容易出錯,因為維護者更容易一眼就看出代
碼在做什么。
? ? ? ?Streams 登錄所采用的設計方法實現了實際的關注點分離。客戶端負責指定計算的是 “什么”,而庫負責控制 “如何做”。這種分離
傾向于與專家經驗的分發平行進行;客戶端編寫者通常能夠更好地了解問題領域,而庫編寫者通常擁有所執行的算法屬性的更多專業
技能。編寫允許這種關注點分離的庫的主要推動力是,能夠像傳遞數據一樣輕松地傳遞行為,從而使調用方可在 API 中描述復雜計算
的結構,然后離開,讓庫來選擇執行戰略。
?
流管道剖析
? ? ? ?所有流計算都有一種共同的結構:它們具有一個流來源、0 或多個中間操作,以及一個終止操作。流的元素可以是對象引用?
(Stream<String>),也可以是原始整數 (IntStream)、長整型 (LongStream) 或雙精度?(DoubleStream)。因為 Java 程序使用的大
部分數據都已存儲在集合中,所以許多流計算使用集合作為它們的來源。JDK 中的?Collection?實現都已增強,可充當高效的流來源。
但是,還存在其他可能的流來源,比如數組、生成器函數或內置的工廠(比如數字范圍),而且(如本系列中的?第 3 期?所示)可以編
寫自定義的流適配器,以便可以將任意數據源充當流來源。表 1 給出了 JDK 中的一些流生成方法。
表 1. JDK 中的流來源
?
| Collection.stream() | 使用一個集合的元素創建一個流。 |
| Stream.of(T...) | 使用傳遞給工廠方法的參數創建一個流。 |
| Stream.of(T[]) | 使用一個數組的元素創建一個流。 |
| Stream.empty() | 創建一個空流。 |
| Stream.iterate(T first, BinaryOperator<T> f) | 創建一個包含序列?first, f(first), f(f(first)), ...?的無限流 |
| Stream.iterate(T first, Predicate<T> test, BinaryOperator<T> f) | (僅限 Java 9)類似于?Stream.iterate(T first, BinaryOperator<T> f),但流在測試預期返回?false?的第一個元素上終止。 |
| Stream.generate(Supplier<T> f) | 使用一個生成器函數創建一個無限流。 |
| IntStream.range(lower, upper) | 創建一個由下限到上限(不含)之間的元素組成的?IntStream。 |
| IntStream.rangeClosed(lower, upper) | 創建一個由下限到上限(含)之間的元素組成的?IntStream。 |
| BufferedReader.lines() | 創建一個有來自?BufferedReader?的行組成的流。 |
| BitSet.stream() | 創建一個由?BitSet?中的設置位的索引組成的?IntStream。 |
| Stream.chars() | 創建一個與?String?中的字符對應的?IntStream。 |
中間操作負責將一個流轉換為另一個流,中間操作包括?filter()(選擇與條件匹配的元素)、map()(根據函
數來轉換元素)、distinct()(刪除重復)、limit()(在特定大小處截斷流)和?sorted()。一些操作
(比如?mapToInt())獲取一種類型的流并返回一種不同類型的流;清單 1?中的示例的開頭處有一個
?Stream<Transaction>,它隨后被轉換為?IntStream。表 2 給出了一些中間流操作。
表 2. 中間流操作
| filter(Predicate<T>) | 與預期匹配的流的元素 |
| map(Function<T, U>) | 將提供的函數應用于流的元素的結果 |
| flatMap(Function<T, Stream<U>> | 將提供的流處理函數應用于流元素后獲得的流元素 |
| distinct() | 已刪除了重復的流元素 |
| sorted() | 按自然順序排序的流元素 |
| Sorted(Comparator<T>) | 按提供的比較符排序的流元素 |
| limit(long) | 截斷至所提供長度的流元素 |
| skip(long) | 丟棄了前 N 個元素的流元素 |
| takeWhile(Predicate<T>) | (僅限 Java 9)在第一個提供的預期不是?true?的元素處階段的流元素 |
| dropWhile(Predicate<T>) | (僅限 Java 9)丟棄了所提供的預期為?true?的初始元素分段的流元素 |
? ? ? ?中間操作始終是惰性的:調用中間操作只會設置流管道的下一個階段,不會啟動任何操作。重建操作可進一步劃分為無狀態?
和有狀態?操作。無狀態操作(比如?filter()?或?map())可獨立處理每個元素,而有狀態操作(比如?sorted()?或?distinct())
可合并以前看到的影響其他元素處理的元素狀態。
? ? ? ?數據集的處理在執行終止操作時開始,比如縮減(sum()?或?max())、應用 (forEach()) 或搜索?(findFirst()) 操作。終止
操作會生成一個結果或副作用。執行終止操作時,會終止流管道,如果您想再次遍歷同一個數據集,可以設置一個新的流管道。
表 3 給出了一些終止流操作。
表 3. 終止流操作
?
| forEach(Consumer<T> action) | 將提供的操作應用于流的每個元素。 |
| toArray() | 使用流的元素創建一個數組。 |
| reduce(...) | 將流的元素聚合為一個匯總值。 |
| collect(...) | 將流的元素聚合到一個匯總結果容器中。 |
| min(Comparator<T>) | 通過比較符返回流的最小元素。 |
| max(Comparator<T>) | 通過比較符返回流的最大元素。 |
| count() | 返回流的大小。 |
| {any,all,none}Match(Predicate<T>) | 返回流的任何/所有元素是否與提供的預期相匹配。 |
| findFirst() | 返回流的第一個元素(如果有)。 |
| findAny() | 返回流的任何元素(如果有)。 |
?
流與集合比較
? ? ? ?盡管流在表面上可能類似于集合(您可以認為二者都包含數據),但事實上,它們完全不同。集合是一種數據結構;它的主要關
注點是在內存中組織數據,而且集合會在一段時間內持久存在。集合通常可用作流管道的來源或目標,但流的關注點是計算,而不是
數據。數據來自其他任何地方(集合、數組、生成器函數或 I/O 通道),而且可通過一個計算步驟管道處理來生成結果或副作用,
在此刻,流已經完成了。流沒有為它們處理的元素提供存儲空間,而且流的生命周期更像一個時間點 — 調用終止操作。不同于集合,
流也可以是無限的;相應地,一些操作(limit()、findFirst())是短路,而且可在無限流上運行有限的計算。
? ? ? ?集合和流在執行操作的方式上也不同。集合上的操作是急切和突變性的;在?List?上調用?remove()?方法時,調用返回后,您知
道列表狀態會發生改變,以反映指定元素的刪除。對于流,只有終止操作是急切的;其他操作都是惰性的。流操作表示其輸入(也是
流)上的功能轉換,而不是數據集上的突變性操作(過濾一個流會生成一個新流,新流的元素是輸入流的子集,但沒有從來源刪除任
何元素)。將流管道表達為功能轉換序列可以實現多種有用的執行戰略,比如惰性、短路?或操作融合。短路使得管道能夠成功終止,
而不必檢查所有數據;類似 “找到第一筆超過 1000 美元的交易” 這樣的查詢不需要在找到匹配值后檢查其他任何交易。操作融合表示,
可在數據上的一輪中執行多個操作;在?清單 1?的示例中,3 個操作組合成了數據上的一輪操作,而不是首先選擇所有匹配的交易,然
后選擇所有對應的金額,最后對它們求和。
? ? ? ?類似?清單 1?和?清單 3?中的查詢的命令版本通常依靠物化集合來獲得中間計算的結果,比如過濾或映射的結果。這些結果不僅可能
讓代碼變得雜亂,還可能讓執行變得混亂。中間集合的物化僅作用于實現,而不作用于結果,而且它使用計算周期將中間結果組織為將
會被丟棄的數據結構。
? ? ? ?相反,流管道將它們的操作融合到數據上盡可能少的輪次中,通常為單輪。(有狀態中間操作,比如排序,可引入對多輪執行必不
可少的障礙點。)流管道的每個階段惰性地生成它的元素,僅在需要時計算元素,并直接將它們提供給下一階段。您不需要使用集合來
保存過濾或映射的中間結果,所以省去了填充(和垃圾收集)中間集合的工作。
? ? ? ?另外,遵循 “深度優先” 而不是 “寬度優先” 的執行戰略(跟蹤一個數據元素在整個管道中的深度),會讓被處理的操作在緩存中變
得更 “熱”,所以您可以將更多時間用于計算,花更少時間來等待數據。
? ? ? ?除了將流用于計算之外,您可能還希望考慮通過 API 方法使用流來返回聚合結果,而在以前,您可能返回一個數組或集合。返回流
的效率通常更高一些,因為您不需要將所有數據復制到一個新數組或集合中。返回流通常更加靈活;庫選擇返回的集合形式可能不是調
用方所需要的,而且很容易將流轉換為任何集合類型。(返回流不合適,而返回物化集合更合適的主要情形是,調用方需要查看某個時
間點的狀態的一致快照。)
?
并行性
? ? ? ?將計算構建為功能轉換的一個有益的結果是,您只需對代碼進行極少的更改,即可輕松地在順序和并行執行之間切
換。流計算的順序表達和相同計算的并行表達幾乎相同。清單 4 展示了如何并行地執行?清單 1?中的查詢。
清單 4. 清單 1 的并行版本
?
int totalSalesFromNY= txns.parallelStream().filter(t -> t.getSeller().getAddr().getState().equals("NY")).mapToInt(t -> t.getAmount()).sum();
?
? ? ? ?第一行將會請求一個并行流而不是順序流,這是與?清單 1?的唯一區別,因為 Streams 庫有效
地從執行計算的戰略中分解出了計算的描述和結構。以前,并行執行要求完全重寫代碼,這樣做不
僅代價高昂,而且往往容易出錯,因為得到的并行代碼與順序版本不太相似。
? ? ? ?所有流操作都可以順序或并行執行,但請記住,并行性并不是高性能的原因。并行執行可能比順序執行更快、一樣
快或更慢。最好首先從順序流開始,在您知道您能夠獲得提速(并從中受益)時才應用并行性。本系列后面的一期
文章會返回分析流管道的并行性能。
?
附加信息
? ? ? ?盡管 Streams 庫是為計算而精心設計的,但執行計算涉及到回調客戶端所提供的拉姆達表達式,這些拉姆達表達式的用途具有
一定的限制。違反這些限制可能導致流管道失敗或計算出不正確的結果。此外,對于具有副作用的拉姆達表達式,這些副作用的時
限(或存在)可能在某些情況下不合情理。
? ? ? ?大多數流操作都要求傳遞給它們的拉姆達表達是互不干擾?和無狀態?的。互不干擾意味著它們不會修改流來源;無狀態意味著它
們不會訪問(讀或寫)任何可能在流操作壽命內改變的狀態。對于縮減操作(例如計算?sum、min?或?max?等匯總數據),傳遞給這些
操作的拉姆達表達式必須是結合性?的(或遵守類似的要求)。
? ? ? ?從某種程度講,這些要求源于以下事實:如果管道并行執行,流庫可能從多個線程訪問數據源,或并發地調用這些拉姆達表達式。
需要這些限制才能確保計算保持正確。(這些限制也可能得到更加簡單、更容易理解的代碼,無論是否采用并行性。)您可能傾向于
讓自己相信,您可以忽略這些限制,因為您認為特定的管道從不會并行運行,但最好控制住這一傾向,否則您會在代碼中埋下定時炸
彈。花點精力來表達您的流管道,使得無論采用何種執行戰略,它們都是正確的。
? ? ? ?所有并發性風險的根源是共享可變狀態。共享可變狀態的一種可能來源是流來源。如果來源是像?ArrayList?這樣的傳統集合,
Streams?庫會假設它在流操作過程中保持不變。(明顯為了實現并發訪問而設計的集合,比如?ConcurrentHashMap,不符合這一假
設。)互不干擾要求不僅不包括在流操作期間被其他操作突變的來源,而且傳遞給流操作的拉姆達表達式本身也應避免突變來源。
除了不修改流來源之外,傳遞給流操作的拉姆達表達式也應是無狀態的。例如,清單 5 中的代碼(嘗試消除任何與前面的元素重復的
元素)就違背了這一規則。
清單 5. 使用有狀態拉姆達表達式的流管道(不要這么做!)
?
HashSet<Integer> twiceSeen = new HashSet<>(); int[] result= elements.stream().filter(e -> {twiceSeen.add(e * 2);return twiceSeen.contains(e);}).toArray();? ? ? ?如果并行執行,此管道會生成錯誤的結果,原因有兩個。首先,對?twiceSeen?集的訪問是從多個線程進行的,沒有進行任何協調,
因此不是線程安全的。第二,因為數據被分區了,所以無法確保在處理給定元素時已經處理了該元素前面的所有元素。
? ? ? ?最好的情況是,如果傳遞給流操作的拉姆達表達式完全沒有副作用,也就是說,它們不會突變任何基于堆的狀態或在執行過程中
執行任何?I/O。如果有副作用,它們應負責執行任何需要的協調,以確保這些副作用是線程安全的。
? ? ? ?此外,無法保證所有副作用都將執行。例如,在清單 6 中,該庫被釋放了,以完全避免執行傳遞給?map()?的拉姆達表達式。因為
來源具有已知大小,map()?操作被認為會保持該大小,而且映射不會影響計算的結果,所以庫可以通過完全不執行映射來優化計算!
(這種優化可以將計算從?O(n)?轉換到?O(1),還可以消除與調用映射函數相關的工作。)
清單 6. 具有可能不會被執行的副作用的流管道
?
int count = anArrayList.stream().map(e -> { System.out.println("Saw " + e); e }).count();? ? ? ?您會注意到受這種優化影響的唯一情況(除了計算速度快得多)是,傳遞給?map()?的拉姆達表達式具有副作用 —?在這種情況下,
如果這些副作用沒有發生,您可能感到非常奇怪。能夠實現這些優化的假設前提是,流操作屬于功能轉換。在大多數時候,該庫使我
們的代碼能夠運行得更快,而且不需要我們投入精力。能夠執行這樣的優化的代價是,我們必須接受對我們傳遞給流操作的拉姆達表
達式的操作的一些限制,以及我們對副作用的一定的依賴。
(總之,這是一次很劃算的交易。)
?
第 1 部分的小結
java.util.stream?庫提供了一種簡單而又靈活的方法來表達各種數據源上可能并行的函數式查詢,包括集合、數組、生成器函數、
范圍或自定義數據結構。一旦您開始使用這個庫,就會被它深深吸引!下一期?將介紹?Streams 庫的一個最強大的特性:聚合。
參考資料
?
學習
- java.util.stream?的?包文檔:查看該庫的工作原理概述。
- Java 中的函數編程。利用 Lambda 表達式的強大功能(Venkat Subramaniam,Pragmatic Bookshelf,2014 年):閱讀對 Java 8 語言和庫特性背后的函數編程概念的詳細介紹。
- 精通拉姆達表達式:多核世界中的 Java 編程(Maurice Naftalin,McGraw-Hill Education,2014):了解對 Java 8 語言和庫特性的詳細分析和基本原理。
- 我應返回集合還是流?:在這個 Stack Overflow 答案中,查找在 API 簽名中使用流的指導原則。
- developerWorks Java 技術專區:這里有數百篇關于 Java 編程各個方面的文章。
獲得產品和技術
- RxJava 庫:了解一個類似 Streams 的反應式庫,該庫為?java.util.stream?的功能提供了補充。
討論
- 加入?developerWorks 中文社區,查看開發人員推動的博客、論壇、組和維基,并與其他 developerWorks 用戶交流。
總結
以上是生活随笔為你收集整理的Java Streams,第 1 部分: java.util.stream 库简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java线上应用故障排查之一:高CPU占
- 下一篇: Java Streams,第 2 部分: