【Java 8 in Action】Stream
文章目錄
- Stream
- Stream的介紹
- Stream的創(chuàng)建
- Stream的中間操作
- Stream 的終止操作
- 并行流與串行流
Stream
Stream的介紹
Lambda表達(dá)式是Stream的基礎(chǔ),如果你還不懂它,可以參考這篇文章
在Lambda表達(dá)式章節(jié)也說(shuō)了Stream 是 Java 8 的一大亮點(diǎn)。它與 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。我們看一下官方文檔
官方文檔表明:
簡(jiǎn)單的說(shuō)流 (Stream) 是數(shù)據(jù)渠道,用于操作數(shù)據(jù)源(集合、數(shù)組等)所生成的元素序列。“集合講的是數(shù)據(jù),流講的是計(jì)算!
值得注意的地方:
- Stream 自己不會(huì)存儲(chǔ)元素。
- Stream 不會(huì)改變?cè)磳?duì)象。相反,他們會(huì)返回一個(gè)持有結(jié)果的新Stream。
- Stream 操作是延遲執(zhí)行的。這意味著他們會(huì)等到需要結(jié)果的時(shí)候才執(zhí)行。
Stream的操作有三個(gè)步驟:
一個(gè)數(shù)據(jù)源(如:集合、數(shù)組),獲取一個(gè)流
一個(gè)中間操作鏈,對(duì)數(shù)據(jù)源的數(shù)據(jù)進(jìn)行處理
一個(gè)終止操作,執(zhí)行中間操作鏈,并產(chǎn)生結(jié)果
類(lèi)似于這種:
Stream的創(chuàng)建
可以通過(guò)多種方式創(chuàng)建流:
1、通過(guò)集合的stream()方法或者parallelStream(),比如Arrays.asList(1,2,3).stream()。
2、通過(guò)Arrays.stream(Object[])方法, 比如Arrays.stream(new int[]{1,2,3})。
3、使用流的靜態(tài)方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator),如Stream.iterate(0, n -> n * 2),或者generate(Supplier<T> s)如Stream.generate(Math::random)。
4、BufferedReader.lines()從文件中獲得行的流。
5、Files類(lèi)的操作路徑的方法,如list、find、walk等。
6、隨機(jī)數(shù)流Random.ints()。
7、其它一些類(lèi)提供了創(chuàng)建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
8、更底層的使用StreamSupport,它提供了將Spliterator轉(zhuǎn)換成流的方法。
我們這里僅展示常見(jiàn)的
Java8 中的 Collection 接口被擴(kuò)展,提供了兩個(gè)獲取流的方法 :
default Stream<E> stream() : 返回一個(gè)順序流default Stream<E> parallelStream() : 返回一個(gè)并行流下面我們來(lái)看看如何創(chuàng)建流
由數(shù)組創(chuàng)建流
Java8 中的 Arrays 的靜態(tài)方法 stream() 可以獲取數(shù)組流:
由值創(chuàng)建流
可以使用靜態(tài)方法 Stream.of(), 通過(guò)顯示值創(chuàng)建一個(gè)流。它可以接收任意數(shù)量的參數(shù)。
由函數(shù)創(chuàng)建流:創(chuàng)建無(wú)限流
可以使用靜態(tài)方法 Stream.iterate() 和Stream.generate(), 創(chuàng)建無(wú)限流。
演示:
//1. Collection 提供了兩個(gè)方法 stream() 與 parallelStream()List<String> list = new ArrayList<>();Stream<String> stream = list.stream(); //獲取一個(gè)順序流Stream<String> parallelStream = list.parallelStream(); //獲取一個(gè)并行流//2. 通過(guò) Arrays 中的 stream() 獲取一個(gè)數(shù)組流Integer[] nums = new Integer[10];Stream<Integer> stream1 = Arrays.stream(nums);//3. 通過(guò) Stream 類(lèi)中靜態(tài)方法 of()Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);//4. 創(chuàng)建無(wú)限流//迭代Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);stream3.forEach(System.out::println);//生成Stream<Double> stream4 = Stream.generate(Math::random).limit(2);stream4.forEach(System.out::println);Stream的中間操作
多個(gè)中間操作可以連接起來(lái)形成一個(gè) 流水線,除非流水線上觸發(fā)終止操作,否則 中間操作不會(huì)執(zhí)行任何的 處理!而在 終止操作時(shí)一次性全部處理,稱(chēng)為“惰性求值”
篩選與切片
| filter(Predicate p) | 接收 Lambda , 從流中排除某些元素。 |
| distinct() | 篩選,通過(guò)流所生成元素的 hashCode() 和 equals() 去除重復(fù)元素 |
| limit(long maxSize) | 截?cái)嗔?#xff0c;使其元素不超過(guò)給定數(shù)量。 |
| skip(long n) | 跳過(guò)元素,返回一個(gè)扔掉了前 n 個(gè)元素的流。若流中元素不足 n 個(gè),則返回一個(gè)空流。與 limit(n) 互補(bǔ) |
映射
| map(Function f) | 接收一個(gè)函數(shù)作為參數(shù),該函數(shù)會(huì)被應(yīng)用到每個(gè)元素上,并將其映射成一個(gè)新的元素。 |
| mapToDouble(ToDoubleFunction f) | 接收一個(gè)函數(shù)作為參數(shù),該函數(shù)會(huì)被應(yīng)用到每個(gè)元素上,產(chǎn)生一個(gè)新的 DoubleStream。 |
| mapToInt(ToIntFunction f) | 接收一個(gè)函數(shù)作為參數(shù),該函數(shù)會(huì)被應(yīng)用到每個(gè)元素上,產(chǎn)生一個(gè)新的 IntStream。 |
| mapToLong(ToLongFunction f) | 接收一個(gè)函數(shù)作為參數(shù),該函數(shù)會(huì)被應(yīng)用到每個(gè)元素上,產(chǎn)生一個(gè)新的 LongStream。 |
| flatMap(Function f) | 接收一個(gè)函數(shù)作為參數(shù),將流中的每個(gè)值都換成另一個(gè)流,然后把所有流連接成一個(gè)流 |
排序
| sorted() | 產(chǎn)生一個(gè)新流,其中按自然順序排序 |
| sorted(Comparator comp) | 產(chǎn)生一個(gè)新流,其中按比較器順序排序 |
演示:
新建一個(gè)實(shí)體類(lèi)Employee :
核心代碼
import org.junit.Test;import java.io.PrintStream; import java.util.*; import java.util.function.*; import java.util.stream.Stream;/** 一、Stream API 的操作步驟:** 1. 創(chuàng)建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//1. 創(chuàng)建 Stream@Testpublic void test1(){//1. Collection 提供了兩個(gè)方法 stream() 與 parallelStream()List<String> list = new ArrayList<>();Stream<String> stream = list.stream(); //獲取一個(gè)順序流Stream<String> parallelStream = list.parallelStream(); //獲取一個(gè)并行流//2. 通過(guò) Arrays 中的 stream() 獲取一個(gè)數(shù)組流Integer[] nums = new Integer[10];Stream<Integer> stream1 = Arrays.stream(nums);//3. 通過(guò) Stream 類(lèi)中靜態(tài)方法 of()Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);//4. 創(chuàng)建無(wú)限流//迭代Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);stream3.forEach(System.out::println);//生成Stream<Double> stream4 = Stream.generate(Math::random).limit(2);stream4.forEach(System.out::println);}//2. 中間操作List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66),new Employee(101, "張三", 18, 9999.99),new Employee(103, "王五", 28, 3333.33),new Employee(104, "趙六", 8, 7777.77),new Employee(104, "趙六", 8, 7777.77),new Employee(104, "趙六", 8, 7777.77),new Employee(105, "田七", 38, 5555.55));/*篩選與切片filter——接收 Lambda , 從流中排除某些元素。limit——截?cái)嗔?#xff0c;使其元素不超過(guò)給定數(shù)量。skip(n) —— 跳過(guò)元素,返回一個(gè)扔掉了前 n 個(gè)元素的流。若流中元素不足 n 個(gè),則返回一個(gè)空流。與 limit(n) 互補(bǔ)distinct——篩選,通過(guò)流所生成元素的 hashCode() 和 equals() 去除重復(fù)元素 要記得使用前重寫(xiě)元素的hashCode() 和 equals() 方法*///內(nèi)部迭代:迭代操作 Stream API 內(nèi)部完成@Testpublic void test2(){//所有的中間操作不會(huì)做任何的處理Stream<Employee> stream = emps.stream().filter((e) -> {System.out.println("測(cè)試中間操作");return e.getAge() <= 35;});//只有當(dāng)做終止操作時(shí),所有的中間操作會(huì)一次性的全部執(zhí)行,稱(chēng)為“惰性求值”stream.forEach(System.out::println);}//外部迭代@Testpublic void test3(){Iterator<Employee> it = emps.iterator();while(it.hasNext()){System.out.println(it.next());}}@Testpublic void test4(){emps.stream().filter((e) -> {System.out.println("短路!"); // && ||return e.getSalary() >= 5000;}).limit(3).forEach(System.out::println);}@Testpublic void test5(){emps.parallelStream().filter((e) -> e.getSalary() >= 5000).skip(2).forEach(System.out::println);}@Testpublic void test6(){emps.stream().distinct().forEach(System.out::println);} }
代碼太長(zhǎng)了,分成了幾段
Stream 的終止操作
終端操作會(huì)從流的流水線生成結(jié)果。其結(jié)果可以是任何不是流的值,例如:List、Integer,甚至是 void 。
查找與匹配
| allMatch(Predicate p) | 檢查是否匹配所有元素 |
| anyMatch( (Predicate p) ) | 檢查是否至少匹配一個(gè)元素 |
| noneMatch(Predicate p) | 檢查是否沒(méi)有匹配所有元素 |
| findFirst() | 返回第一個(gè)元素 |
| findAny() | 返回當(dāng)前流中的任意元素 |
| count() | 返回流中元素總數(shù) |
| max(Comparator c ) | 返回流中最大值 |
| min(Comparator c ) | 返回流中最小值 |
| forEach(Consumer c ) | 內(nèi)部迭代( (用 使用 Collection 接口需要用戶(hù)去做迭代,稱(chēng)為 外部迭代 。相反, Stream API 使用內(nèi)部迭代 —— 它幫你把迭代做了) ) |
演示一下:
更改Employee類(lèi),添加枚舉類(lèi)型
public enum Status{SLEEP,WORK,TRAVEL}代碼:
import org.junit.Test;import java.util.Arrays; import java.util.List; import java.util.Optional;/** 一、Stream API 的操作步驟:** 1. 創(chuàng)建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//3. 終止操作(終端操作)List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66, Employee.Status.SLEEP),new Employee(101, "張三", 18, 9999.99, Employee.Status.WORK),new Employee(103, "王五", 28, 3333.33, Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.SLEEP),new Employee(104, "趙六", 8, 7777.77, Employee.Status.SLEEP),new Employee(105, "田七", 38, 5555.55, Employee.Status.WORK));/*查找與匹配a11Match 檢查是否匹配所有元素anyMatch 檢查是否至少匹配一個(gè)元素noneMatch 檢查是否沒(méi)有匹配所有元素findfirst 返回第一個(gè)元素findAny 返回當(dāng)前流中的任意元素count 返回流中元素的總個(gè)數(shù)max—返回流中最大值min—返回流中最小值*/@Testpublic void test1() {boolean b1 = emps.stream().allMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("allMatch: "+b1);boolean b2 = emps.stream().anyMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("anyMatch: "+b2);boolean b3 = emps.stream().noneMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("noneMatch: "+b3);//值可能為空 結(jié)果封裝到Optional容器中Optional<Employee> op = emps.stream().sorted((e1, e2) -> Double.compare(e1.getSalary() ,e2.getSalary())).findFirst();System.out.println(op.get());System.out.println("**************************************");Optional<Employee> op2 = emps.stream().filter((e1) -> e1.getStatus().equals(Employee.Status.SLEEP)).findAny();System.out.println(op2.get());System.out.println("**************************************");Long count = emps.stream().count();System.out.println(count);System.out.println("**************************************");Optional<Employee> op3 = emps.stream().max((e1, e2) -> Double.compare(e1.getSalary(),e2.getSalary()));System.out.println(op3.get());emps.stream().map(Employee :: getSalary).min(Double::compare);} }歸約
| reduce(T iden, BinaryOperator b) | 可以將流中元素反復(fù)結(jié)合起來(lái),得到一個(gè)值。返回 T |
| reduce(BinaryOperator b) | 可以將流中元素反復(fù)結(jié)合起來(lái),得到一個(gè)值。返回 Optional<T> |
map 和 reduce 的連接通常稱(chēng)為 map-reduce 模式,因 Google 用它來(lái)進(jìn)行網(wǎng)絡(luò)搜索而出名。
收集
| collect(Collector c) | 將流轉(zhuǎn)換為其他形式。接收一個(gè) Collector接口實(shí)現(xiàn),用于給Stream中元素做匯總的方法 |
Collector 接口中方法的實(shí)現(xiàn)決定了如何對(duì)流執(zhí)行收集操作(如收集到 List、Set、Map)。但是 Collectors 實(shí)用類(lèi)提供了很多靜態(tài)
方法,可以方便地創(chuàng)建常見(jiàn)收集器實(shí)例,具體方法與實(shí)例如下:
| toList | List、 | 把流中元素收集到List |
| toSet | Set<T> | 把流中元素收集到Set |
| toCollection | Collection<T> | 把流中元素收集到創(chuàng)建的集合 |
| counting | Long | 計(jì)算流中元素的個(gè)數(shù) |
| summingInt | Integer | 對(duì)流中元素的整數(shù)屬性求和 |
| averagingInt | Double | 計(jì)算流中元素Integer屬性的平均值 |
| summarizingInt | IntSummaryStatistics | 收集流中Integer屬性的統(tǒng)計(jì)值。如:平均值 |
| joining | String | 連接流中每個(gè)字符串 |
| maxBy | Optional<T> | 根據(jù)比較器選擇最大值 |
| minBy | Optional<T> | 根據(jù)比較器選擇最小值 |
| reducing | 歸約產(chǎn)生的類(lèi)型 | 從一個(gè)作為累加器的初始值開(kāi)始,利用BinaryOperator與流中元素逐個(gè)結(jié)合,從而歸約成單個(gè)值 |
| collectingAndThen | 轉(zhuǎn)換函數(shù)返回的類(lèi)型 | 包裹另一個(gè)收集器,對(duì)其結(jié)果轉(zhuǎn)換函數(shù) |
| groupingBy | Map<K, List<T>> | 根據(jù)某屬性值對(duì)流分組,屬性為K,結(jié)果為V |
| partitioningBy | Map<Boolean, List<T>> | 根據(jù)true或false進(jìn)行分區(qū) |
演示
package com.study;import org.junit.Test; import java.util.*; import java.util.stream.Collectors;/** 一、Stream API 的操作步驟:** 1. 創(chuàng)建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//3. 終止操作(終端操作)List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66, Employee.Status.SLEEP),new Employee(101, "張三", 18, 9999.99, Employee.Status.WORK),new Employee(103, "王五", 28, 3333.33, Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.SLEEP),new Employee(104, "趙六", 8, 7777.77, Employee.Status.SLEEP),new Employee(105, "田七", 38, 5555.55, Employee.Status.WORK));/*歸約reduce( T identity, Binaryoperator)/ reduce( Binaryoperator)- 可以將流中元泰反復(fù)結(jié)合起來(lái),得到一個(gè)值。identity為起始值Binaryoperator為二元運(yùn)算 繼承BinFunction接口 屬于函數(shù)式接口*/@Testpublic void test1() {List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);Integer sum = list.stream().reduce(0, (x, y) -> x+y );System.out.println(sum);System.out.println("***********************************************");Optional<Double> op = emps.stream().map(Employee::getSalary).reduce(Double::sum);System.out.println(op.get());System.out.println("***********************************************");}/*收集collect—將流轉(zhuǎn)換為其他形式。接收一個(gè)Co1lector接口的實(shí)現(xiàn),用于給 Stream中元素做匯總的方法*/@Testpublic void test2() {List<String> list = emps.stream().map(Employee::getName).collect(Collectors.toList());list.forEach(System.out::println);System.out.println("***********************************************");Set<String> set = emps.stream().map(Employee::getName).collect(Collectors.toSet());set.forEach(System.out::println);System.out.println("***********************************************");// 放在特殊的集合中HashSet<String> hs = emps.stream().map(Employee::getName).collect(Collectors.toCollection(HashSet::new));hs.forEach(System.out::println);System.out.println("***********************************************");// 總數(shù)Long count = emps.stream().collect(Collectors.counting());System.out.println(count);System.out.println("***********************************************");// 平均值Double avg = emps.stream().collect(Collectors.averagingDouble(Employee::getSalary));System.out.println(avg);System.out.println("***********************************************");// 總和Double sum = emps.stream().collect(Collectors.summingDouble(Employee::getSalary));System.out.println(sum);System.out.println("***********************************************");// 最大值Optional<Employee> max = emps.stream().collect(Collectors.maxBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));System.out.println(max.get());System.out.println("***********************************************");// 最小值Optional<Employee> min = emps.stream().collect(Collectors.minBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));System.out.println(min.get());}@Testpublic void test3() {// 分組Map<Employee.Status,List<Employee>> map = emps.stream().collect(Collectors.groupingBy(Employee::getStatus));System.out.println(map);System.out.println("***********************************************");// 多級(jí)分組Map<Employee.Status, Map<String,List<Employee>>> maps = emps.stream()//groupingBy的條件可以一直加.collect(Collectors.groupingBy(Employee::getStatus, Collectors.groupingBy((e) -> {if(e.getAge() <= 40) {return "青年";} else {return "其他";}})));System.out.println(maps);}@Testpublic void test4() {// 分區(qū)Map<Boolean, List<Employee>> map = emps.stream().collect(Collectors.partitioningBy((e) -> e.getSalary() > 80));System.out.println(map);}@Testpublic void test5(){DoubleSummaryStatistics dss = emps.stream().collect(Collectors.summarizingDouble(Employee::getSalary));System.out.println(dss.getSum());System.out.println(dss.getAverage());System.out.println(dss.getMax());}@Testpublic void test6() {// 所有的名字都連成了字符串String str = emps.stream().map(Employee::getName).collect(Collectors.joining());// joining() 里面填寫(xiě)連接符 比如逗號(hào)分割 joining(",")// 如果想要添加兩側(cè)的 可以這樣joining(",","【","】")System.out.println(str);}}test1的結(jié)果:
test2的結(jié)果
test3的結(jié)果:
test4的結(jié)果
test5的結(jié)果
test6的結(jié)果
組合
concat用來(lái)連接類(lèi)型一樣的兩個(gè)流。
如果細(xì)分的話(huà),還有下面的這種分類(lèi)
轉(zhuǎn)換
toArray方法將一個(gè)流轉(zhuǎn)換成數(shù)組,而如果想轉(zhuǎn)換成其它集合類(lèi)型,需要調(diào)用collect方法,利用Collectors.toXXX方法進(jìn)行轉(zhuǎn)換:
public static <T,C extends Collection<T>> Collector<T,?,C> toCollection(Supplier<C> collectionFactory) public static …… toConcurrentMap(……) public static <T> Collector<T,?,List<T>> toList() public static …… toMap(……) public static <T> Collector<T,?,Set<T>> toSet()并行流與串行流
并行流 就是把一個(gè)內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流。
Java 8 中將并行進(jìn)行了優(yōu)化,我們可以很容易的對(duì)數(shù)據(jù)進(jìn)行并行操作。Stream API 可以聲明性地通過(guò) parallel() 與sequential() 在并行流與順序流之間進(jìn)行切換。
Fork/Join 框架
Fork/Join 框架:就是在必要的情況下,將一個(gè)大任務(wù),進(jìn)行拆分(fork)成若干個(gè)小任務(wù)(拆到不可再拆時(shí)),再將一個(gè)個(gè)的小任務(wù)運(yùn)算的結(jié)果進(jìn)行 join 匯總.
Fork/Join 框架與傳統(tǒng)線程池的區(qū)別:
采用 “工作竊取”模式(work-stealing):當(dāng)執(zhí)行新的任務(wù)時(shí)它可以將其拆分分成更小的任務(wù)執(zhí)行,并將小任務(wù)加到線程隊(duì)列中,然后再?gòu)囊粋€(gè)隨機(jī)線程的隊(duì)列中偷一個(gè)并把它放在自己的隊(duì)列中。
相對(duì)于一般的線程池實(shí)現(xiàn),fork/join框架的優(yōu)勢(shì)體現(xiàn)在對(duì)其中包含的任務(wù)的處理方式上.在一般的線程池中,如果一個(gè)線程正在執(zhí)行的任務(wù)由于某些原因無(wú)法繼續(xù)運(yùn)行,那么該線程會(huì)處于等待狀態(tài).而在fork/join框架實(shí)現(xiàn)中,如果某個(gè)子問(wèn)題由于等待另外一個(gè)子問(wèn)題的完成而無(wú)法繼續(xù)運(yùn)行.那么處理該子問(wèn)題的線程會(huì)主動(dòng)尋找其他尚未運(yùn)行的子問(wèn)題來(lái)執(zhí)行.這種方式減少了線程的等待時(shí)間,提高了性能
演示:
類(lèi)ForkJoinCalculate
Test類(lèi)
import org.junit.Test; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream;public class MyTest {// Fork Join 框架@Testpublic void test1() {ForkJoinPool pool = new ForkJoinPool();ForkJoinTask<Long> task = new ForkJoinCalculate(0,1000000L);Long sum = pool.invoke(task);System.out.println(sum);}// Java并行流@Testpublic void test2() {LongStream.rangeClosed(0,1000000L).parallel() //并行流 // .sequential() 順序流.reduce(0,Long::sum);}}說(shuō)點(diǎn)看法吧,這篇文章僅僅起到入門(mén)到使用的作用,Stream其實(shí)蘊(yùn)涵著比較深的技術(shù),深入理解還需要大量的實(shí)踐、探究與學(xué)習(xí)。
另外本文是基于《Java 8實(shí)戰(zhàn)》這本書(shū)的思考與學(xué)習(xí)的總結(jié)筆記,含少量?jī)?nèi)容的摘錄。
總結(jié)
以上是生活随笔為你收集整理的【Java 8 in Action】Stream的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 程序员幽默:当代程序员的主要矛盾是什么?
- 下一篇: socket的阻塞模式和非阻塞模式(se