1.2、創(chuàng)建RDD
1)由一個已經(jīng)存在的Scala集合創(chuàng)建。 val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2)由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase等 val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)
1.3、RDD編程API
RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數(shù)據(jù)集(例如一個文件)上的轉換動作。只有當發(fā)生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
常用的Transformation:
轉換含義 map(func) 返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉換后組成 filter(func) 返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成 flatMap(func) 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) mapPartitions(func) 類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U] mapPartitionsWithIndex(func) 類似于mapPartitions,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換,seed用于指定隨機數(shù)生成器種子 union(otherDataset) 對源RDD和參數(shù)RDD求并集后返回一個新的RDD intersection(otherDataset) 對源RDD和參數(shù)RDD求交集后返回一個新的RDD distinct([numTasks])) 對源RDD進行去重后返回一個新的RDD groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數(shù)可以通過第二個可選的參數(shù)來設置 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現(xiàn)Ordered接口,返回一個按照key進行排序的(K,V)的RDD sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活 join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K(Iterable,Iterable))類型的RDD cartesian(otherDataset) 笛卡爾積 pipe(command, [envVars]) coalesce(numPartitions) repartition(numPartitions) repartitionAndSortWithinPartitions(partitioner)
1.3.2、Action
動作含義 reduce(func) 通過func函數(shù)聚集RDD中的所有元素,這個功能必須是可交換且可并聯(lián)的 collect() 在驅動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素 count() 返回RDD的元素個數(shù) first() 返回RDD的第一個元素(類似于take(1)) take(n) 返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組 takeSample(withReplacement,num, [seed]) 返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的num個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed用于指定隨機數(shù)生成器種子 takeOrdered(n, [ordering]) saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。 saveAsObjectFile(path) countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數(shù)。 foreach(func) 在數(shù)據(jù)集的每一個元素上,運行函數(shù)func進行更新。
1.4 練習Spark rdd的api
連接Spark-Shell:
[root
@hadoop1 spark-
2.1 .
1 -bin-hadoop2.
7 ]
練習1
val rdd1 = sc.parallelize(List(
5 ,
6 ,
4 ,
7 ,
3 ,
8 ,
2 ,
9 ,
1 ,
10 ))
val rdd2 = rdd1.map(_ *
2 ).sortBy(x => x,
true )
val rdd3 = rdd2.filter(_ >=
10 )
rdd3.collect
練習2:
val rdd1 = sc
.parallelize (Array(
"a b c" ,
"d e f" ,
"h i j" ))
//將rdd1里面的每一個元素先切分在壓平
val rdd2 = rdd1
.flatMap (_
.split (
' ' ))
rdd2
.collect
練習3:
val rdd1 = sc.parallelize(List(
5 ,
6 ,
4 ,
3 ))
val rdd2 = sc.parallelize(List(
1 ,
2 ,
3 ,
4 ))
val rdd3 = rdd1.union(rdd2)
val rdd4 = rdd1.intersection(rdd2)
rdd3.distinct.collect
rdd4.collect
練習4:
val rdd1 = sc.parallelize(List((
"tom" ,
1 ), (
"jerry" ,
3 ), (
"kitty" ,
2 )))
val rdd2 = sc.parallelize(List((
"jerry" ,
2 ), (
"tom" ,
1 ), (
"shuke" ,
2 )))
val rdd3 = rdd1.join(rdd2)
rdd3.collect
val rdd4 = rdd1 union rdd2
rdd4.collect
val rdd5 = rdd4.groupByKey
rdd5.collect
練習5:
val rdd1 = sc.parallelize(List((
"tom" ,
1 ), (
"tom" ,
2 ), (
"jerry" ,
3 ), (
"kitty" ,
2 )))
val rdd2 = sc.parallelize(List((
"jerry" ,
2 ), (
"tom" ,
1 ), (
"shuke" ,
2 )))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect
練習6:
val rdd1 = sc.parallelize(List(
1 ,
2 ,
3 ,
4 ,
5 ))
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
練習7:
val rdd1 = sc
.parallelize (List((
"tom" ,
1 ), (
"jerry" ,
3 ), (
"kitty" ,
2 ), (
"shuke" ,
1 )))
val rdd2 = sc
.parallelize (List((
"jerry" ,
2 ), (
"tom" ,
3 ), (
"shuke" ,
2 ), (
"kitty" ,
5 )))
val rdd3 = rdd1
.union (rdd2)
//按key進行聚合
val rdd4 = rdd3
.reduceByKey (_ + _)
rdd4
.collect
//按value的降序排序
val rdd5 = rdd4
.map (t => (t._2, t._1))
.sortByKey (false)
.map (t => (t._2, t._1))
rdd5
.collect
練習8:
mapPartitionsdef mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]該函數(shù)和map函數(shù)類似,只不過映射函數(shù)的參數(shù)由RDD中每一個元素變成了RDD中每一個分區(qū)的迭代器。如果在映射的過程中需要頻繁創(chuàng)建額外的對象,使用mapPartitions要比map高效的多比如:將RDD中的所有元素通過JDBC連接寫入數(shù)據(jù)庫,如果使用map函數(shù),可能要為每一個元素都創(chuàng)建一個collection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區(qū)建立一個connection.參數(shù)preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息。
//rdd1有兩個分區(qū)
scala> var rdd1 = sc
.makeRDD (
1 to
5 ,
2 )rdd1: org
.apache .spark .rdd .RDD [Int] = ParallelCollectionRDD[
63 ] at makeRDD at <console>:
24
scala> rdd1
.collect res27: Array[Int] = Array(
1 ,
2 ,
3 ,
4 ,
5 )
//rdd3將rdd1中每個分區(qū)中的數(shù)值累加(通過mapPartitions來實現(xiàn))
scala> var rdd3 = rdd1
.mapPartitions {
x => {| var result = List[Int]()| var i =
0 | while(
x .hasNext ) {| i +=
x .next ()| }| result.::(i)
.iterator | }}rdd3: org
.apache .spark .rdd .RDD [Int] = MapPartitionsRDD[
64 ] at mapPartitions at <console>:
26
//查看合并結果后的rdd3的值
scala> rdd3
.collect res28: Array[Int] = Array(
3 ,
12 )
//查看rdd3的分區(qū)大小
scala> rdd3
.partitions .size
res29: Int =
2
練習9:
mapPartitionsWithIndexdef mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函數(shù)作用通mapPartitions,不過提供了兩個參數(shù),第一個參數(shù)為分區(qū)的索引
例如:
scala> var rdd1 = sc
.makeRDD (
1 to
25 ,
4 )rdd1: org
.apache .spark .rdd .RDD [Int] = ParallelCollectionRDD[
66 ] at makeRDD at <console>:
24 scala> var rdd2 = rdd1
.mapPartitionsWithIndex {| (
x ,iter) => {| var result = List[String]()| var i =
0 | while(iter
.hasNext ) {| i += iter
.next ()| }| result.::(
x +
"|" + i)
.iterator | }| }rdd2: org
.apache .spark .rdd .RDD [String] = MapPartitionsRDD[
67 ] at mapPartitionsWithIndex at <console>:
26 //獲取結果值(從返回的結果中可以看到)
scala> rdd2
.collect res30: Array[String] = Array(
0 |
21 ,
1 |
57 ,
2 |
93 ,
3 |
154 )再如:
scala> val func = (index:Int,iter:Iterator[(Int)])=> {| iter
.toList .map (
x =>
"[partID:" + index +
",val:" +
x +
"]" )
.iterator | }
scala> val rdd1 = sc
.parallelize (List(
1 ,
2 ,
3 ,
4 ,
5 ,
6 ,
7 ,
8 ,
9 ),
2 )
scala> rdd1
.mapPartitionsWithIndex (func)
.collect res0: Array[String] = Array([partID:
0 ,val:
1 ], [partID:
0 ,val:
2 ], [partID:
0 ,val:
3 ], [partID:
0 ,val:
4 ], [partID:
1 ,val:
5 ], [part], [partID:
1 ,val:
7 ], [partID:
1 ,val:
8 ], [partID:
1 ,val:
9 ])
練習8: aggregate函數(shù)將每個分區(qū)里的元素進行聚合,然后用combine函數(shù)將每個分區(qū)的結果和初始值(zerorValue)進行combine操作。這個函數(shù)最終返回的類型不需要和RDD中元素類型一致。 函數(shù)原型: def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
aggregate
聚合,先在分區(qū)內進行聚合,然后再將每個分區(qū)的結果一起結果進行聚合scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter
.toList .map (
x =>
"[partID:" + index +
",val:" +
x +
"]" )
.iterator | }func1: (index: Int, iter: Iterator[Int])Iterator[String]//創(chuàng)建一個并行化的RDD,有兩個分區(qū)
scala> val rdd1 = sc
.parallelize (List(
1 ,
2 ,
3 ,
4 ,
5 ,
6 ,
7 ,
8 ,
9 ),
2 )rdd1: org
.apache .spark .rdd .RDD [Int] = ParallelCollectionRDD[
77 ] at parallelize at <console>:
24
//通過下面的代碼可以看到rdd1中內容再兩個分區(qū)內的分布情況,通過下面的結果可以看出有兩個分區(qū),分別是partID:
0 和partID:
1
scala> rdd1
.mapPartitionsWithIndex (func1)
.collect res56: Array[String] = Array([partID:
0 ,val:
1 ], [partID:
0 ,val:
2 ], [partID:
0 ,val:
3 ], [partID:
0 ,val:
4 ], [partID:
1 ,val:
5 ], [partID:
1 ,val:
6 ], [partID:
1 ,val:
7 ], [partID:
1 ,val:
8 ], [partID:
1 ,val:
9 ])
//下面的執(zhí)行步驟是:
//一:
0 和
1 取出最大值
1 ,
1 和
2 取出最大值
2 ,
2 和
3 取出最大值
3 ,
3 和
4 取出最大值
4 ===》第一個分區(qū)的最大值是
4
//二:
0 和
5 取出最大值
5 ,
5 和
6 取出最大值
6 ,
6 和
7 取出最大值
7 ,
7 和
8 取出最大值
8 ,
8 和
9 取出最大值
9 ====>第二個分區(qū)的最大值是
9
//三:后面的執(zhí)行邏輯是:_+_,就是說將兩個分區(qū)的最大結果值求和,執(zhí)行的結果是:(
0 ) +
4 +
9 =
13
scala> rdd1
.aggregate (
0 )(math
.max (_,_),_+_)
res57: Int =
13 //下面的執(zhí)行步驟是:
//一:
3 和
1 取出最大值
3 ,
3 和
2 取出最大值
3 ,
3 和
3 取出最大值
3 ,
3 和
4 取出最大值
4 ===》第一個分區(qū)的最大值是
4
//二:
3 和
5 取出最大值
5 ,
5 和
6 取出最大值
6 ,
6 和
7 取出最大值
7 ,
7 和
8 取出最大值
8 ,
8 和
9 取出最大值
9 ====>第二個分區(qū)的最大值是
9
//三:后面的執(zhí)行邏輯是:_+_,就是說將兩個分區(qū)的最大結果值求和,執(zhí)行的結果是:(
3 )+
4 +
9 =
16
scala> rdd1
.aggregate (
3 )(math
.max (_,_),_+_)
res62: Int =
16 //下面的執(zhí)行步驟是:
//一:
5 和
1 取出最大值
5 ,
5 和
2 取出最大值
5 ,
5 和
3 取出最大值
5 ,
5 和
4 取出最大值
5 ===》第一個分區(qū)的最大值是
5
//二:
5 和
5 取出最大值
5 ,
5 和
6 取出最大值
6 ,
6 和
7 取出最大值
7 ,
7 和
8 取出最大值
8 ,
8 和
9 取出最大值
9 ====>第二個分區(qū)的最大值是
9
//三:后面的執(zhí)行邏輯是:_+_,就是說將兩個分區(qū)的最大結果值求和,執(zhí)行的結果是:(
5 )+
5 +
9 =
19
scala> rdd1
.aggregate (
5 )(math
.max (_,_),_+_)
res58: Int =
19 再如:
//下面的執(zhí)行步驟是:
//一:
8 和
1 取出最大值
8 ,
8 和
2 取出最大值
8 ,
8 和
3 取出最大值
8 ,
8 和
4 取出最大值
8 ===》第一個分區(qū)的最大值是
8
//二:
8 和
5 取出最大值
8 ,
8 和
6 取出最大值
8 ,
8 和
7 取出最大值
8 ,
8 和
8 取出最大值
8 ,
8 和
9 取出最大值
9 ====>第二個分區(qū)的最大值是
9
//三:后面的執(zhí)行邏輯是:_+_,就是說將兩個分區(qū)的最大結果值求和,執(zhí)行的結果是:(
8 )+
8 +
9 =
25
scala> rdd1
.aggregate (
8 )(math
.max (_,_),_+_)
res58: Int =
19 再如:
//下面的執(zhí)行步驟是:
//一:
10 和
1 取出最大值
10 ,
10 和
2 取出最大值
10 ,
10 和
3 取出最大值
10 ,
10 和
4 取出最大值
10 ===》第一個分區(qū)的最大值是
10
//二:
10 和
5 取出最大值
10 ,
10 和
6 取出最大值
10 ,
10 和
7 取出最大值
10 ,
10 和
8 取出最大值
10 ,
10 和
9 取出最大值
10 ====>第二個分區(qū)的最大值是
10
//三:后面的執(zhí)行邏輯是:_+_,就是說將兩個分區(qū)的最大結果值求和,執(zhí)行的結果是:(
10 )+
10 +
10 =
30
scala> rdd1
.aggregate (
10 )(math
.max (_,_),_+_)
res58: Int =
30 ================================================================================
下面是字符串的聚合
scala> val rdd2 = sc
.parallelize (List(
"a" ,
"b" ,
"c" ,
"d" ,
"e" ,
"f" ),
2 )rdd2: org
.apache .spark .rdd .RDD [String] = ParallelCollectionRDD[
79 ] at parallelize at <console>:
24 scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {| iter
.toList .map (
x =>
"[partID:" + index +
",val:" +
x +
"]" )
.iterator | }
fun2: (index: Int, iter: Iterator[String])Iterator[String]
//通過下面的結果可以知道:
"a" ,
"b" ,
"c" 在partID:
0 中,
"d" ,
"e" ,
"f" 在partID:
1 中
scala> rdd2
.mapPartitionsWithIndex (fun2)
.collect
res63: Array[String] = Array([partID:
0 ,val:a], [partID:
0 ,val:b], [partID:
0 ,val:c], [partID:
1 ,val:d], [partID:
1 ,val:e], [partID:
1 ,val:f])
//下面的運行順序是:
//一、
"" 和
"a" 相加得
"a" ,
"a" 和
"b" 相加得
"ab" ,
"ab" 和
"c" 相加得
"abc" ,第一個分區(qū)得到的結果是:
"abc"
//一、
"" 和
"d" 相加得
"d" ,
"d" 和
"e" 相加得
"de" ,
"ed" 和
"f" 相加得
"def" ,第一個分區(qū)得到的結果是:
"def"
//三、由于是并行的計算,所以可能是第一個分區(qū)先執(zhí)行完,此時的結果是:
"" +
"abc" +
"def" ===》
"abcdef"
scala> rdd2
.aggregate (
"" )(_+_,_+_)
res64: String = abcdef
scala> rdd2
.aggregate (
"" )(_+_,_+_)
res65: String = defabc//下面的運行順序是:
//一、
"=" 和
"a" 相加得
"=a" ,
"=a" 和
"b" 相加得
"=ab" ,
"=ab" 和
"c" 相加得
"=abc" ,第一個分區(qū)得到的結果是:
"=abc"
//一、
"=" 和
"d" 相加得
"=d" ,
"=d" 和
"e" 相加得
"=de" ,
"=ed" 和
"f" 相加得
"=def" ,第一個分區(qū)得到的結果是:
"=def"
//三、由于是并行的計算,所以可能是第一個分區(qū)先執(zhí)行完,此時的結果是:
"=" +
"=abc" +
"=def" ===》
"==abc=def"
//下面的結果中分別是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算結果一致
scala> rdd2
.aggregate (
"=" )(_ + _, _ + _)
res68: String = ==def=abc
scala> rdd2
.aggregate (
"=" )(_ + _, _ + _)
res69: String = ==abc=defval rdd3 = sc
.parallelize (List(
"12" ,
"23" ,
"345" ,
"4567" ),
2 )
//通過下面可以知道有兩個分區(qū),并且每個分區(qū)中有不同的值
scala> rdd3
.mapPartitionsWithIndex (fun2)
.collect
res70: Array[String] = Array([partID:
0 ,val:
12 ], [partID:
0 ,val:
23 ], [partID:
1 ,val:
345 ], [partID:
1 ,val:
4567 ])
//下面的運行步驟是(scala>
"" .length 結果是res72: Int =
0 ),(scala>
"12" .length 結果是res73:Int=
2 ):
//一:
"" .length 和
"12" .length 求出最大值
2 ,得到字符串是
"2"
//二:
"" .length 和
"345" .length 求出最大值
3 ,得到字符串是
"3"
//三:得到的結果最后執(zhí)行
x +
y ,由于是并行計算所以可能是
"24" 或者
"42"
scala> rdd3
.aggregate (
"" )((
x ,
y ) => math
.max (
x .length ,
y .length )
.toString , (
x ,
y ) =>
x +
y )
res75: String =
24
scala> rdd3
.aggregate (
"" )((
x ,
y ) => math
.max (
x .length ,
y .length )
.toString , (
x ,
y ) =>
x +
y )
res76: String =
42 //下面求最小值:
scala> val rdd4 = sc
.parallelize (List(
"12" ,
"23" ,
"345" ,
"" ),
2 )
rdd4: org
.apache .spark .rdd .RDD [String] = ParallelCollectionRDD[
84 ] at parallelize at <console>:
24
scala> rdd4
.mapPartitionsWithIndex (fun2)
.collect
res79: Array[String] = Array([partID:
0 ,val:
12 ], [partID:
0 ,val:
23 ], [partID:
1 ,val:
345 ], [partID:
1 ,val:])
//運行過程是:
//一:
"" .length 和
"12" .length 求出最小值
0 ,得到字符串是
"0"
//二:
"" .length 和
"345" .length 求出最小值
0 ,得到字符串是
"0"
//三:得到的結果最后執(zhí)行
x +
y ,由于是并行計算所以可能是
"01" 或
"10"
scala> rdd4
.aggregate (
"" )((
x ,
y ) => math
.min (
x .length ,
y .length )
.toString , (
x ,
y ) =>
x +
y )
res85: String =
10 scala> rdd4
.aggregate (
"" )((
x ,
y ) => math
.min (
x .length ,
y .length )
.toString , (
x ,
y ) =>
x +
y )
res86: String =
01 val rdd5 = sc
.parallelize (List(
"12" ,
"23" ,
"" ,
"345" ),
2 )
//運行過程是:
//一:
"" .length 和
"12" .length 求出最小值
0 ,得到字符串是
"0"
//二:
"" .length 和
"" .length 求出最小值
0 ,得到字符串是
"0"
//三:得到的結果最后執(zhí)行
x +
y ,由于是并行計算所以可能是
"1" 或
rdd5
.aggregate (
"" )((
x ,
y ) => math
.min (
x .length ,
y .length )
.toString , (
x ,
y ) =>
x +
y )再如案例:
scala> def seqOP(a:Int, b:Int) : Int = {| println(
"seqOp: " + a +
"\t" + b)| math
.min (a,b)| }
seqOP: (a: Int, b: Int)Intscala> def combOp(a:Int, b:Int): Int = {| println(
"combOp: " + a +
"\t" + b)| a + b| }
combOp: (a: Int, b: Int)Intscala> val
z = sc. parallelize ( List (
1 ,
2 ,
3 ,
4 ,
5 ,
6 ) ,
2 )
//這里要注意的是上面的
z 是Int類型的,所以下面要用于集合迭代的類型也是Int類型的。
scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {| iter
.toList .map (
x =>
"[partID:" + index +
",val:" +
x +
"]" )
.iterator | }
fun2: (index: Int, iter: Iterator[Int])Iterator[String]
//通過下面的方式顯示出每個值所在的分區(qū)
scala>
z .mapPartitionsWithIndex (fun2)
.collect
res94: Array[String] = Array([partID:
0 ,val:
1 ], [partID:
0 ,val:
2 ], [partID:
0 ,val:
3 ], [partID:
1 ,val:
4 ], [partID:
1 ,val:
5 ], [partID:
1 ,val:
6 ])
//下面的含義是:兩個分區(qū)每個里面先單獨執(zhí)行seqOP,兩個都執(zhí)行完成之后,再執(zhí)行comOp邏輯,所以下面的運行過程是:
//一、
3 和
1 執(zhí)行seqOP的最小值是
1 ,
1 和
2 執(zhí)行seqOP間的最小值是
1 ,
1 和
3 執(zhí)行seqOP的最小值是
1 ,第一個分區(qū)得到的結果是
1
//二、
3 和
4 執(zhí)行seqOP的最小值是
3 ,
3 和
5 執(zhí)行seqOP間的最小值是
3 ,
3 和
6 執(zhí)行seqOP的最小值是
3 ,第一個分區(qū)得到的結果是
3
//三、接著執(zhí)行comOp邏輯,(
3 )和分區(qū)一種的
1 執(zhí)行combOp得到的結果是:
3 +
1 =
4 ,
4 接著和分區(qū)二中的
3 執(zhí)行combOp得到的結果是
4 +
3 =
7 ,所以最后的結果是:
7
scala>
z .aggregate (
3 )(seqOP, combOp)
combOp: 3 1
combOp: 4 3
res95: Int =
7 //再次驗證:
//一、
2 和
1 執(zhí)行seqOP的最小值是
1 ,
1 和
2 執(zhí)行seqOP間的最小值是
1 ,
1 和
3 執(zhí)行seqOP的最小值是
1 ,第一個分區(qū)得到的結果是
1
//二、
2 和
4 執(zhí)行seqOP的最小值是
2 ,
2 和
5 執(zhí)行seqOP間的最小值是
2 ,
2 和
6 執(zhí)行seqOP的最小值是
2 ,第一個分區(qū)得到的結果是
2
//三、接著執(zhí)行comOp邏輯,(
2 )和分區(qū)一種的
1 執(zhí)行combOp得到的結果是:
2 +
1 =
3 ,
3 接著和分區(qū)二中的
2 執(zhí)行combOp得到的結果是
3 +
2 =
5 ,所以最后的結果是:
5
scala>
z .aggregate (
2 )(seqOP, combOp)
[Stage
105 :> (
0 +
0 ) /
2 ]combOp:
2 1
combOp: 3 2
res96: Int =
5 //下面的同樣:
scala> def seqOp(a:String, b:String) : String = {| println(
"seqOp: " + a +
"\t" + b)| math
.min (a
.length , b
.length )
.toString | }
seqOp: (a: String, b: String)Stringscala> def combOp(a:String, b:String) : String = {| println(
"combOp: " + a +
"\t" + b)| a + b| }
combOp: (a: String, b: String)Stringscala> val
z = sc. parallelize ( List (
"12" ,
"23" ,
"345" ,
"4567" ) ,
2 )
scala>
z . aggregate (
"" )(seqOp, combOp)
seqOp: 345
seqOp: 12
seqOp: 0 4567
seqOp: 0 23
combOp: 1
combOp: 1 1 res25: String =
11
練習10: aggregateByKey
val pairRDD = sc.parallelize(List( (
"cat" ,
2 ), (
"cat" ,
5 ), (
"mouse" ,
4 ),(
"cat" ,
12 ), (
"dog" ,
12 ), (
"mouse" ,
2 )),
2 )
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {iter.toList.map(x =>
"[partID:" + index +
", val: " + x +
"]" ).iterator
}
scala> pairRDD.mapPartitionsWithIndex(func2).collect
res99: Array[String] = Array([partID:
0 ,
val : (cat,
2 )], [partID:
0 ,
val : (cat,
5 )], [partID:
0 ,
val : (mouse,
4 )], [partID:
1 ,
val : (cat,
12 )], [partID:
1 ,
val : (dog,
12 )], [partID:
1 ,
val : (mouse,
2 )])
pairRDD.aggregateByKey(
0 )(math.max(_, _), _ + _).collect
pairRDD.aggregateByKey(
100 )(math.max(_, _), _ + _).collect
練習11: checkpoint (知識點可以查看:http://blog.csdn.net/tototuzuoquan/article/details/74838936) 為當前RDD設置檢查點。該函數(shù)將會創(chuàng)建一個二進制的文件,并存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴于父RDD中的信息將全部被移出。對RDD進行checkpoint操作并不會馬上被執(zhí)行,必須執(zhí)行Action操作才能觸發(fā)。 函數(shù)原型: def checkpoint() 實例:
scala> val data = sc.parallelize(1 to 100000,15)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24
scala> sc.setCheckpointDir("/iteblog")
17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem.
scala> data.checkpoint
scala> data.count
res105: Long = 100000[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog
Found 1 items
drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
Found 1 items
drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
Found 15 items
-
rw-r--r-- 3 root supergroup 71219 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00000
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00001
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00002
-
rw-r--r-- 3 root supergroup 71219 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00003
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00004
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00005
-
rw-r--r-- 3 root supergroup 71219 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00006
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00007
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-0000 8
-
rw-r--r-- 3 root supergroup 71219 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-0000 9
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00010
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00011
-
rw-r--r-- 3 root supergroup 71219 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00012
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00013
-
rw-r--r-- 3 root supergroup 71229 2017 -07 -07 19 : 17 /iteblog/0ca2df38-2 dc6-451 b-94 d8-fbc442c1611c/rdd-94 /part-00014
[root@hadoop2 hadoop-2.8.0]#
執(zhí)行完count之后,會在/iteblog目錄下產(chǎn)生出多個(數(shù)量和你分區(qū)個數(shù)有關)二進制的文件。
scala> sc.setCheckpointDir("hdfs://mycluster/wordcount/ck")scala> val rdd = sc.textFile("hdfs://mycluster/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[100] at reduceByKey at <console>:24scala> rdd.checkpointscala> rdd.isCheckpointed
res108: Boolean = falsescala> rdd.count
res109: Long = 289 scala> rdd.isCheckpointed
res110: Boolean = truescala> rdd.getCheckpointFile
res111: Option[String] = Some(hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100)scala> [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/ck/8de9de76-3343-4166-bd3f-4ed0da31209e/rdd-100
Found 10 items
-
rw-r--r-- 3 root supergroup 147 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /_partitioner
-
rw-r--r-- 3 root supergroup 867 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00000
-
rw-r--r-- 3 root supergroup 721 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00001
-
rw-r--r-- 3 root supergroup 1091 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00002
-
rw-r--r-- 3 root supergroup 1030 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00003
-
rw-r--r-- 3 root supergroup 944 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00004
-
rw-r--r-- 3 root supergroup 810 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00005
-
rw-r--r-- 3 root supergroup 964 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00006
-
rw-r--r-- 3 root supergroup 1011 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-00007
-
rw-r--r-- 3 root supergroup 974 2017 -07 -07 19 : 28 hdfs: //mycluster/wordcount /ck/ 8 de9de76-3343 -4166 -bd3f-4 ed0da31209e/rdd-100 /part-0000 8
練習12: coalesce, repartition
coalesce:對RDD中的分區(qū)重新進行合并 函數(shù)原型: def coalesce(numPartitions: Int, shuffle: Boolean = false) (implicit ord: Ordering[T] = null): RDD[T] 返回一個新的RDD,且該RDD的分區(qū)個數(shù)等于numPartitions個數(shù)。如果shuffle設置為true,這回進行shuffle。
scala> var data = sc
.parallelize (List(
1 ,
2 ,
3 ,
4 ))
data: org
.apache .spark .rdd .RDD [Int] = ParallelCollectionRDD[
104 ] at parallelize at <console>:
24 scala> data
.partitions .length
res115: Int =
6 scala> val result = data
.coalesce (
2 ,false)
result: org
.apache .spark .rdd .RDD [Int] = CoalescedRDD[
105 ] at coalesce at <console>:
26 scala> result
.partitions .length
res116: Int =
2 scala> result
.toDebugString
res117: String =
(
2 ) CoalescedRDD[
105 ] at coalesce at <console>:
26 []| ParallelCollectionRDD[
104 ] at parallelize at <console>:
24 []scala> val result1 = data
.coalesce (
2 ,true)
result1: org
.apache .spark .rdd .RDD [Int] = MapPartitionsRDD[
109 ] at coalesce at <console>:
26 scala> result1
.toDebugString
res118: String =
(
2 ) MapPartitionsRDD[
109 ] at coalesce at <console>:
26 []| CoalescedRDD[
108 ] at coalesce at <console>:
26 []| ShuffledRDD[
107 ] at coalesce at <console>:
26 []+-(
6 ) MapPartitionsRDD[
106 ] at coalesce at <console>:
26 []| ParallelCollectionRDD[
104 ] at parallelize at <console>:
24 []scala>
從上面可以看出shuffle為false的時候并不進行shuffle操作;而為true的時候會進行shuffle操作。RDD
.partitions .length 可以獲取相關RDD的分區(qū)數(shù)。再如下面的例子:
scala> val rdd1 = sc
.parallelize (
1 to
10 ,
10 )
rdd1: org
.apache .spark .rdd .RDD [Int] = ParallelCollectionRDD[
102 ] at parallelize at <console>:
24 scala> rdd1
.collect
res112: Array[Int] = Array(
1 ,
2 ,
3 ,
4 ,
5 ,
6 ,
7 ,
8 ,
9 ,
10 )scala> rdd1
.partitions .length
res113: Int =
10 scala> val rdd2 = rdd1
.coalesce (
2 ,false)
rdd2: org
.apache .spark .rdd .RDD [Int] = CoalescedRDD[
103 ] at coalesce at <console>:
26 scala> rdd1
.partitions .length
res114: Int =
10 scala>
練習13: collectAsMap 功能和collect函數(shù)類似,該函數(shù)用于Pair RDD,最終返回Map類型的結果 函數(shù)原型: def collectAsMap(): Map[K, V]
scala> val rdd = sc
.parallelize (List((
"a" ,
1 ),(
"b" ,
2 )))
rdd: org
.apache .spark .rdd .RDD [(String, Int)] = ParallelCollectionRDD[
111 ] at parallelize at <console>:
24 scala> rdd
.collectAsMap
res119: scala
.collection .Map [String,Int] = Map(b ->
2 , a ->
1 )scala>從結果我們可以看出,如果RDD中同一個key中存在多個Value,那么后面的Value將會把前面的Value覆蓋,最終得到的結果就是Key唯一,而且對應一個Value。
練習14: combineByKey 使用用戶設置好的聚合函數(shù)對每個Key中的Value進行組合(combine)。可以將輸入類型為RDD[(K,V)]轉成RDD[(K,C)] 函數(shù)原型: def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 第一個和第二個函數(shù)都是基于第三個函數(shù)實現(xiàn)的,使用的是HashPartitioner,Serializer為null。而第三個函數(shù)我們可以指定分區(qū),如果需要使用Serializer的話也可以指定。combineByKey函數(shù)比較重要,我們熟悉地諸如aggregateByKey、foldByKey、reduceByKey等函數(shù)都是基于函數(shù)實現(xiàn)的。默認情況在Map端進行組合操作。
scala> val data = sc
.parallelize (List((
1 ,
"www" ), (
1 ,
"iteblog" ), (
1 ,
"com" ), (
2 ,
"bbs" ), (
2 ,
"iteblog" ), (
2 ,
"com" ), (
3 ,
"good" )))
data: org
.apache .spark .rdd .RDD [(Int, String)] =ParallelCollectionRDD[
15 ] at parallelize at <console>:
12 scala> val result = data
.combineByKey (List(_), (
x : List [String],
y : String) =>
y ::
x , (
x : List[String],
y : List[String]) =>
x :::
y )
result: org
.apache .spark .rdd .RDD [(Int, List[String])] =ShuffledRDD[
19 ] at combineByKey at <console>:
14 scala> result
.collect
res20: Array[(Int, List[String])] = Array((
1 ,List(www, iteblog,
com )),(
2 ,List(bbs, iteblog,
com )), (
3 ,List(good)))scala> val data = sc
.parallelize (List((
"iteblog" ,
1 ), (
"bbs" ,
1 ), (
"iteblog" ,
3 )))
data: org
.apache .spark .rdd .RDD [(String, Int)] =ParallelCollectionRDD[
24 ] at parallelize at <console>:
12 scala> val result = data
.combineByKey (
x =>
x , (
x : Int,
y :Int) =>
x +
y , (
x :Int,
y : Int) =>
x +
y )
result: org
.apache .spark .rdd .RDD [(String, Int)] =ShuffledRDD[
25 ] at combineByKey at <console>:
14 scala> result
.collect
res27: Array[(String, Int)] = Array((iteblog,
4 ), (bbs,
1 ))再如:
val rdd1 = sc
.textFile (
"hdfs://mycluster/wordcount/input" )
.flatMap (_
.split (
" " ))
.map ((_,
1 ))
val rdd2 = rdd1
.combineByKey (
x =>
x , (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd2
.collect val rdd3 = rdd1
.combineByKey (
x =>
x +
10 , (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
rdd3
.collect val rdd4 = sc
.parallelize (List(
"dog" ,
"cat" ,
"gnu" ,
"salmon" ,
"rabbit" ,
"turkey" ,
"wolf" ,
"bear" ,
"bee" ),
3 )
val rdd5 = sc
.parallelize (List(
1 ,
1 ,
2 ,
2 ,
2 ,
1 ,
2 ,
2 ,
2 ),
3 )
val rdd6 = rdd5
.zip (rdd4)
val rdd7 = rdd6
.combineByKey (List(_), (
x : List[String],
y : String) =>
x :+
y , (m: List[String], n: List[String]) => m ++ n)
練習15 countByKey
scala> val rdd1 = sc
.parallelize (List((
"a" ,
1 ),(
"b" ,
2 ),(
"b" ,
2 ),(
"c" ,
2 ),(
"c" ,
1 )))
rdd1: org
.apache .spark .rdd .RDD [(String, Int)] = ParallelCollectionRDD[
0 ] at parallelize at <console>:
24 scala> rdd1
.countByKey
res0: scala
.collection .Map [String,Long] = Map(a ->
1 , b ->
2 , c ->
2 ) scala> rdd1
.countByValue
res1: scala
.collection .Map [(String, Int),Long] = Map((b,
2 ) ->
2 , (a,
1 ) ->
1 , (c,
2 ) ->
1 , (c,
1 ) ->
1 )scala>
練習16: filterByRange
scala> val rdd1 = sc
.parallelize (List((
"e" ,
5 ),(
"c" ,
3 ),(
"d" ,
4 ),(
"c" ,
2 ),(
"a" ,
1 )))
rdd1: org
.apache .spark .rdd .RDD [(String, Int)] = ParallelCollectionRDD[
6 ] at parallelize at <console>:
24 scala> val rdd2 = rdd1
.filterByRange (
"b" ,
"d" )
rdd2: org
.apache .spark .rdd .RDD [(String, Int)] = MapPartitionsRDD[
7 ] at filterByRange at <console>:
26 scala> rdd2
.collect
res2: Array[(String, Int)] = Array((c,
3 ), (d,
4 ), (c,
2 ))
練習17: flatMapValues
scala> a
.flatMapValues (_
.split (
" " ))
res5: org
.apache .spark .rdd .RDD [(String, String)] = MapPartitionsRDD[
10 ] at flatMapValues at <console>:
27 scala> a
.flatMapValues (_
.split (
" " ))
.collect
res6: Array[(String, String)] = Array((a,
1 ), (a,
2 ), (b,
3 ), (b,
4 ))
練習18: foldByKey
scala > val rdd1 = sc.parallelize(
List (
"dog" ,
"wolf" ,
"cat" ,
"bear" ),
2 )
rdd1 : org.apache.spark.rdd.
RDD [
String ] =
ParallelCollectionRDD [
12 ] at parallelize at <console>:
24 scala > val rdd2 = rdd1.map(x => (x.length,x))
rdd2 : org.apache.spark.rdd.
RDD [(
Int ,
String )] =
MapPartitionsRDD [
13 ] at map at <console>:
26 scala > rdd2.collect
res7 :
Array [(
Int ,
String )] =
Array ((
3 ,dog), (
4 ,wolf), (
3 ,cat), (
4 ,bear))
scala > val rdd3 = rdd2.foldByKey(
"" )(_+_)
rdd3 : org.apache.spark.rdd.
RDD [(
Int ,
String )] =
ShuffledRDD [
14 ] at foldByKey at <console>:
28 scala > rdd3.collect
res8 :
Array [(
Int ,
String )] =
Array ((
4 ,bearwolf), (
3 ,dogcat))
scala > rdd3.collect
res9 :
Array [(
Int ,
String )] =
Array ((
4 ,wolfbear), (
3 ,catdog))
scala > val rdd = sc.textFile(
"hdfs://mycluster/wordcount/input/2.txt" ).flatMap(_.split(
" " )).map((_,
1 ))
rdd : org.apache.spark.rdd.
RDD [(
String ,
Int )] =
MapPartitionsRDD [
18 ] at map at <console>:
24 scala > rdd.foldByKey(
0 )(_+_)
res10 : org.apache.spark.rdd.
RDD [(
String ,
Int )] =
ShuffledRDD [
19 ] at foldByKey at <console>:
27 scala > rdd.foldByKey(
0 )(_+_).collect
res11 :
Array [(
String ,
Int )] =
Array ((role,
1 ), (
Play ,
1 ), (fraud,
1 ), (level,
1 ), (business,
2 ), (improve,
1 ), (platforms,
1 ), (order,
1 ), (big,
1 ), (with,
1 ), (scientist,,
1 ), (active,
1 ), (valuable,
1 ), (
data ,5), (information ,1) , (Cooperate ,1) , (Collecting ,1) , (framework ,1) , (E -commerce /payment ,1) , (acquired ,1) , (root ,1) , (accurate ,1) , (solutions ,1) , (analysis ;Maintenance ,1) , (problems ,1) , (them ,1) , (Analyze ,1) , (models ,1) , (analysis ,3) , (realize ,1) , (actual ,1) , (weight ,1) , (compare ,1) , (risk ,1) , (anti -fraud ,1) , (key ,1) , (related ,1) , (base ,1) , (Support ,1) , (against ,1) , (automatic ,1) , (to ,2) , (platform ,2) , (company's ,1) , (in ,2) , (needs ,,1) , (provide ,2) , (implement ,1) , (affecting ,1) , (strategy ,1) , (of ,1) , (reports ,1) , (management ,1) , (detection ,,1) , (for ,1) , (work ,,1) , (cause ,1) , (an ,1) , (verify ,1) ,...
scala >
foreachPartition val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd1.foreachPartition(x => println(x.reduce(_ + _)))
練習19: keyBy
scala> val rdd1 = sc
.parallelize (List(
"dog" ,
"salmon" ,
"salmon" ,
"rat" ,
"elephant" ),
3 )
rdd1: org
.apache .spark .rdd .RDD [String] = ParallelCollectionRDD[
21 ] at parallelize at <console>:
24 scala> val rdd2 = rdd1
.keyBy (_
.length )
rdd2: org
.apache .spark .rdd .RDD [(Int, String)] = MapPartitionsRDD[
22 ] at keyBy at <console>:
26 scala> rdd2
.collect
res12: Array[(Int, String)] = Array((
3 ,dog), (
6 ,salmon), (
6 ,salmon), (
3 ,rat), (
8 ,elephant))scala>
練習20: keys values
scala> val rdd1 = sc
.parallelize (List(
"dog" ,
"tiger" ,
"lion" ,
"cat" ,
"panther" ,
"eagle" ),
2 )
rdd1: org
.apache .spark .rdd .RDD [String] = ParallelCollectionRDD[
23 ] at parallelize at <console>:
24 scala> val rdd2 = rdd1
.map (
x => (
x .length ,
x ))
rdd2: org
.apache .spark .rdd .RDD [(Int, String)] = MapPartitionsRDD[
24 ] at map at <console>:
26 scala> rdd2
.keys .collect
res13: Array[Int] = Array(
3 ,
5 ,
4 ,
3 ,
7 ,
5 )scala> rdd2
.values .collect
res14: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
總結
以上是生活随笔 為你收集整理的Spark rdd 介绍,和案例介绍 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內容還不錯,歡迎將生活随笔 推薦給好友。