SPARK RDD JAVA API 用法指南
?
1.RDD介紹:
? ? RDD,彈性分布式數(shù)據(jù)集,即分布式的元素集合。在spark中,對所有數(shù)據(jù)的操作不外乎是創(chuàng)建RDD、轉(zhuǎn)化已有的RDD以及調(diào)用RDD操作進(jìn)行求值。在這一切的背后,Spark會自動將RDD中的數(shù)據(jù)分發(fā)到集群中,并將操作并行化。
? ? Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區(qū),這些分區(qū)運(yùn)行在集群中的不同節(jié)點(diǎn)上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
? ? 用戶可以使用兩種方法創(chuàng)建RDD:讀取一個外部數(shù)據(jù)集,或在驅(qū)動器程序中分發(fā)驅(qū)動器程序中的對象集合,比如list或者set。
? ? RDD的轉(zhuǎn)化操作都是惰性求值的,這意味著我們對RDD調(diào)用轉(zhuǎn)化操作,操作不會立即執(zhí)行。相反,Spark會在內(nèi)部記錄下所要求執(zhí)行的操作的相關(guān)信息。我們不應(yīng)該把RDD看做存放著特定數(shù)據(jù)的數(shù)據(jù)集,而最好把每個RDD當(dāng)做我們通過轉(zhuǎn)化操作構(gòu)建出來的、記錄如何計(jì)算數(shù)據(jù)的指令列表。數(shù)據(jù)讀取到RDD中的操作也是惰性的,數(shù)據(jù)只會在必要時讀取。轉(zhuǎn)化操作和讀取操作都有可能多次執(zhí)行。
2.創(chuàng)建RDD數(shù)據(jù)集
? ? (1)讀取一個外部數(shù)據(jù)集
JavaRDD<String> lines=sc.textFile(inputFile);? ? (2)分發(fā)對象集合,這里以list為例
List<String> list=new ArrayList<String>(); list.add("a"); list.add("b"); list.add("c"); JavaRDD<String> temp=sc.parallelize(list); //上述方式等價于 JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));3.RDD操作
(1)轉(zhuǎn)化操作
? ? 用java實(shí)現(xiàn)過濾器轉(zhuǎn)化操作:
List<String> list=new ArrayList<String>(); //建立列表,列表中包含以下自定義表項(xiàng) list.add("error:a"); list.add("error:b"); list.add("error:c"); list.add("warning:d"); list.add("hadppy ending!"); //將列表轉(zhuǎn)換為RDD對象 JavaRDD<String> lines = sc.parallelize(list); //將RDD對象lines中有error的表項(xiàng)過濾出來,放在RDD對象errorLines中 JavaRDD<String> errorLines = lines.filter(new Function<String, Boolean>() {public Boolean call(String v1) throws Exception {return v1.contains("error");}} ); //遍歷過濾出來的列表項(xiàng) List<String> errorList = errorLines.collect(); for (String line : errorList)System.out.println(line);? ? ? ?
輸出:
error:a
error:b
error:c
可見,列表list中包含詞語error的表項(xiàng)都被正確的過濾出來了。
(2)合并操作
將兩個RDD數(shù)據(jù)集合并為一個RDD數(shù)據(jù)集
接上述程序示例:
union輸出:
error:a
error:b
error:c
warning:d
可見,將原始列表項(xiàng)中的所有error項(xiàng)和warning項(xiàng)都過濾出來了。
(3)獲取RDD數(shù)據(jù)集中的部分或者全部元素
①獲取RDD數(shù)據(jù)集中的部分元素 ? .take(int num) ?返回值List<T> ??
獲取RDD數(shù)據(jù)集中的前num項(xiàng)。
/** * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. */ def take(num: Int): JList[T]程序示例:接上
JavaRDD<String> unionLines=errorLines.union(warningLines);for(String line :unionLines.take(2))System.out.println(line);輸出:
error:a
error:b
可見,輸出了RDD數(shù)據(jù)集unionLines的前2項(xiàng)
②獲取RDD數(shù)據(jù)集中的全部元素 .collect() 返回值 List<T>
程序示例:
List<String> unions=unionLines.collect(); for(String line :unions)System.out.println(line);遍歷輸出RDD數(shù)據(jù)集unions的每一項(xiàng)
4.向spark傳遞函數(shù)
| 函數(shù)名 | 實(shí)現(xiàn)的方法 | 用途 |
| Function<T,R> | R call(T) | 接收一個輸入值并返回一個輸出值,用于類似map()和filter()的操作中 |
| Function<T1,T2,R> | R call(T1,T2) | 接收兩個輸入值并返回一個輸出值,用于類似aggregate()和fold()等操作中 |
| FlatMapFunction<T,R> | Iterable <R> call(T) | 接收一個輸入值并返回任意個輸出,用于類似flatMap()這樣的操作中 |
?①Function<T,R>
JavaRDD<String> errorLines=lines.filter(new Function<String, Boolean>() {public Boolean call(String v1)throws Exception {return v1.contains("error");}} );過濾RDD數(shù)據(jù)集中包含error的表項(xiàng),新建RDD數(shù)據(jù)集errorLines
②FlatMapFunction<T,R>?
List<String> strLine=new ArrayList<String>(); strLine.add("how are you"); strLine.add("I am ok"); strLine.add("do you love me") JavaRDD<String> input=sc.parallelize(strLine); JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) throws Exception {return Arrays.asList(s.split(" "));}} );將文本行的單詞過濾出來,并將所有的單詞保存在RDD數(shù)據(jù)集words中。
?③?Function<T1,T2,R>
List<String> strLine=new ArrayList<String>(); strLine.add("how are you"); strLine.add("I am ok"); strLine.add("do you love me"); JavaRDD<String> input=sc.parallelize(strLine); JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) throws Exception {return Arrays.asList(s.split(" "));}} ); JavaPairRDD<String,Integer> counts=words.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2(s, 1);}} ); JavaPairRDD <String,Integer> results=counts.reduceByKey(new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}} ) ;上述程序是spark中的wordcount實(shí)現(xiàn)方式,其中的reduceByKey操作的Function2函數(shù)定義了遇到相同的key時,value是如何reduce的->直接將兩者的value相加。
*注意:
可以將我們的函數(shù)類定義為使用匿名內(nèi)部類,就像上述程序?qū)崿F(xiàn)的那樣,也可以創(chuàng)建一個具名類,就像這樣:
class ContainError implements Function<String,Boolean>{public Boolean call(String v1) throws Exception {return v1.contains("error");} } JavaRDD<String> errorLines=lines.filter(new ContainError()); for(String line :errorLines.collect())System.out.println(line);具名類也可以有參數(shù),就像上述過濾出含有”error“的表項(xiàng),我們可以自定義到底含有哪個詞語,就像這樣,程序就更有普適性了。
?
5.針對每個元素的轉(zhuǎn)化操作:
? ? 轉(zhuǎn)化操作map()接收一個函數(shù),把這個函數(shù)用于RDD中的每個元素,將函數(shù)的返回結(jié)果作為結(jié)果RDD中對應(yīng)的元素。關(guān)鍵詞:轉(zhuǎn)化
? ? 轉(zhuǎn)化操作filter()接受一個函數(shù),并將RDD中滿足該函數(shù)的元素放入新的RDD中返回。關(guān)鍵詞:過濾
示例圖如下所示:
①map()
計(jì)算RDD中各值的平方
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> result=rdd.map(new Function<Integer, Integer>() {public Integer call(Integer v1) throwsException {return v1*v1;}} ); System.out.println( StringUtils.join(result.collect(),","));輸出:
1,4,9,16
filter()
②?去除RDD集合中值為1的元素:
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> results=rdd.filter( new Function<Integer, Boolean>() {public Boolean call(Integer v1) throws Exception {return v1!=1;}} ); System.out.println(StringUtils.join(results.collect(),","));結(jié)果:
2,3,4
③ 有時候,我們希望對每個輸入元素生成多個輸出元素。實(shí)現(xiàn)該功能的操作叫做flatMap()。和map()類似,我們提供給flatMap()的函數(shù)被分別應(yīng)用到了輸入的RDD的每個元素上。不過返回的不是一個元素,而是一個返回值序列的迭代器。輸出的RDD倒不是由迭代器組成的。我們得到的是一個包含各個迭代器可以訪問的所有元素的RDD。flatMap()的一個簡單用途是將輸入的字符串切分成單詞,如下所示:?
JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you")); JavaRDD<String> words=rdd.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) throws Exception {return Arrays.asList(s.split(" "));}} ); System.out.println(StringUtils.join(words.collect(),'\n'));輸出:
hello
world
hello
you
world
i
love
you
6.集合操作
?
RDD中的集合操作
| 函數(shù) | 用途 |
| RDD1.distinct() | 生成一個只包含不同元素的新RDD。需要數(shù)據(jù)混洗。 |
| RDD1.union(RDD2) | 返回一個包含兩個RDD中所有元素的RDD |
| RDD1.intersection(RDD2) | 只返回兩個RDD中都有的元素 |
| RDD1.substr(RDD2) | 返回一個只存在于第一個RDD而不存在于第二個RDD中的所有元素組成的RDD。需要數(shù)據(jù)混洗。 |
集合操作對笛卡爾集的處理:
?
| RDD1.cartesian(RDD2) | 返回兩個RDD數(shù)據(jù)集的笛卡爾集 |
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2)); JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2)); JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2); for(Tuple2<Integer,Integer> tuple:rdd.collect())System.out.println(tuple._1()+"->"+tuple._2());輸出:
1->1
1->2
2->1
2->2
7.行動操作
(1)reduce操作
? ? reduce()接收一個函數(shù)作為參數(shù),這個函數(shù)要操作兩個RDD的元素類型的數(shù)據(jù)并返回一個同樣類型的新元素。一個簡單的例子就是函數(shù)+,可以用它來對我們的RDD進(jìn)行累加。使用reduce(),可以很方便地計(jì)算出RDD中所有元素的總和,元素的個數(shù),以及其他類型的聚合操作。
? ? 以下是求RDD數(shù)據(jù)集所有元素和的程序示例:
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer sum =rdd.reduce(new Function2<Integer, Integer, Integer>() {public Integercall(Integer v1, Integer v2) throws Exception {return v1+v2;}} ); System.out.println(sum.intValue());輸出:55
(2)fold()操作
? ? 接收一個與reduce()接收的函數(shù)簽名相同的函數(shù),再加上一個初始值來作為每個分區(qū)第一次調(diào)用時的結(jié)果。你所提供的初始值應(yīng)當(dāng)是你提供的操作的單位元素,也就是說,使用你的函數(shù)對這個初始值進(jìn)行多次計(jì)算不會改變結(jié)果(例如+對應(yīng)的0,*對應(yīng)的1,或者拼接操作對應(yīng)的空列表)。
? ? 程序?qū)嵗?#xff1a;
①計(jì)算RDD數(shù)據(jù)集中所有元素的和:
zeroValue=0;//求和時,初始值為0。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer sum =rdd.fold(0,new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}} ); System.out.println(sum);②計(jì)算RDD數(shù)據(jù)集中所有元素的積:
zeroValue=1;//求積時,初始值為1。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer result =rdd.fold(1,new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {return v1*v2;}} ); System.out.println(result);(3)aggregate()操作
? ? aggregate()函數(shù)返回值類型不必與所操作的RDD類型相同。
? ? 與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然后通過一個函數(shù)把RDD中的元素合并起來放入累加器。考慮到每個節(jié)點(diǎn)是在本地進(jìn)行累加的,最終,還需要提供第二個函數(shù)來將累加器兩兩合并。
以下是程序?qū)嵗?#xff1a;
public class AvgCount implements Serializable{ public int total;public int num;public AvgCount(int total,int num){this.total=total;this.num=num; } public double avg(){return total/(double)num; } static Function2<AvgCount,Integer,AvgCount> addAndCount= new Function2<AvgCount, Integer, AvgCount>() {public AvgCount call(AvgCount a, Integer x) throws Exception {a.total+=x;a.num+=1;return a;} }; static Function2<AvgCount,AvgCount,AvgCount> combine=new Function2<AvgCount, AvgCount, AvgCount>() {public AvgCount call(AvgCount a, AvgCount b) throws Exception {a.total+=b.total;a.num+=b.num;return a;}};public static void main(String args[]){SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");JavaSparkContext sc = new JavaSparkContext(conf);AvgCount intial =new AvgCount(0,0);JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));AvgCount result=rdd.aggregate(intial,addAndCount,combine);System.out.println(result.avg());}}這個程序示例可以實(shí)現(xiàn)求出RDD對象集的平均數(shù)的功能。其中addAndCount將RDD對象集中的元素合并起來放入AvgCount對象之中,combine提供兩個AvgCount對象的合并的實(shí)現(xiàn)。我們初始化AvgCount(0,0),表示有0個對象,對象的和為0,最終返回的result對象中total中儲存了所有元素的和,num儲存了元素的個數(shù),這樣調(diào)用result對象的函數(shù)avg()就能夠返回最終所需的平均數(shù),即avg=tatal/(double)num。
8.持久化緩存
? ? 因?yàn)镾park RDD是惰性求值的,而有時我們希望能多次使用同一個RDD。如果簡單地對RDD調(diào)用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3啻问褂猛唤M數(shù)據(jù)。
? ? 為了避免多次計(jì)算同一個RDD,可以讓Spark對數(shù)據(jù)進(jìn)行持久化。當(dāng)我們讓Spark持久化存儲一個RDD時,計(jì)算出RDD的節(jié)點(diǎn)會分別保存它們所求出的分區(qū)數(shù)據(jù)。
? ? 出于不同的目的,我們可以為RDD選擇不同的持久化級別。默認(rèn)情況下persist()會把數(shù)據(jù)以序列化的形式緩存在JVM的堆空間中
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??不同關(guān)鍵字對應(yīng)的存儲級別表
| 級別 | 使用的空間 | cpu時間 | 是否在內(nèi)存 | 是否在磁盤 | 備注 |
| MEMORY_ONLY | 高 | 低 | 是 | 否 | 直接儲存在內(nèi)存 |
| MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 序列化后儲存在內(nèi)存里 |
| MEMORY_AND_DISK | 低 | 中等 | 部分 | 部分 | 如果數(shù)據(jù)在內(nèi)存中放不下,溢寫在磁盤上 |
| MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 數(shù)據(jù)在內(nèi)存中放不下,溢寫在磁盤中。內(nèi)存中存放序列化的數(shù)據(jù)。 |
| DISK_ONLY | 低 | 高 | 否 | 是 | 直接儲存在硬盤里面 |
程序示例:將RDD數(shù)據(jù)集持久化在內(nèi)存中。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5)); rdd.persist(StorageLevel.MEMORY_ONLY()); System.out.println(rdd.count()); System.out.println(StringUtils.join(rdd.collect(),','));RDD還有unpersist()方法,調(diào)用該方法可以手動把持久化的RDD從緩存中移除。
9.不同的RDD類型
? ? Java中有兩個專門的類JavaDoubleRDD和JavaPairRDD,來處理特殊類型的RDD,這兩個類還針對這些類型提供了額外的函數(shù),折讓你可以更加了解所發(fā)生的一切,但是也顯得有些累贅。
? ? 要構(gòu)建這些特殊類型的RDD,需要使用特殊版本的類來替代一般使用的Function類。如果要從T類型的RDD創(chuàng)建出一個DoubleRDD,我們就應(yīng)當(dāng)在映射操作中使用DoubleFunction<T>來替代Function<T,Double>。
程序?qū)嵗?#xff1a;以下是一個求RDD每個對象的平方值的程序?qū)嵗?#xff0c;將普通的RDD對象轉(zhuǎn)化為DoubleRDD對象,最后調(diào)用DoubleRDD對象的max()方法,返回生成的平方值中的最大值。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5)); JavaDoubleRDD result=rdd.mapToDouble(new DoubleFunction<Integer>() {public double call(Integer integer) throws Exception {return (double) integer*integer;}} ); System.out.println(result.max());總結(jié)
以上是生活随笔為你收集整理的SPARK RDD JAVA API 用法指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark中flatMap函数用法
- 下一篇: ALS算法讲解