第二章 Spark RDD以及编程接口
第二章 Spark RDD以及編程接口
目錄
注:學習《Spark大數據處理技術》筆記
1. Spark程序"Hello World"
1. 概述
2. 代碼實現
3. 行解
第一行
1. Spark程序運行的集群地址,如"spark://localhost:7077"
2. Spark程序的標識
3. 指明Spark程序安裝的路徑
4. Spark程序的jar包路徑
第二行
第三行
第四行
第五行
4. 重要概念
file和filterRDD變量都是RDD
RDD的初始創建都是由SparkContext來負責的,將內存中的集合或者外部文件系統作為輸入源
將一個RDD通過一定的操作變換成另一個RDD,比如file這個RDD通過一個filter操作變換成filterRDD,所以filter就是一個轉換操作
對RDD進行持久化,可以讓RDD保存在磁盤或者內存中,以便后續重復使用。比如cache接口默認將filterRDD緩存在內存中
由于Spark是惰性計算(lazy computing)的,所以對于任何RDD進行行動操作,都會觸發Spark作業的運行,從而產生最終的結果。例如對filterRDD進行count操作就是一個行動操作。行動操作分為兩類:
5. RDD與操作之間的關系
2. Spark RDD
1. 是什么?
2. 如何生成?
3. 特點?
1. 容錯處理
2. 持久化和分區
4. 一個分區的、高效容錯的而且能夠持久化的分布式數據集需要包含五個接口
1. RDD分區(partitions)
2. RDD優先位置(preferredLocations)
3. RDD依賴關系(dependencies)
窄依賴
每一個父RDD的分區最多被子RDD的一個分區所使用
寬依賴
多個子RDD的分區依賴于同一個父RDD的分區
區別
4. RDD分區計算(compute)
5. RDD分區函數(partitioner)
3. 創建操作
1. 集合創建操作
2. 存儲創建操作
指定數據輸入的類型,如TextInputFormat
指定[K,V]健值中K的類型
指定[K,V]健值中V的類型
指定由外部存儲生成的RDD的partition數量的最小值,如果沒有指定,系統會使用默認值defaultMinSplits
4. 轉換操作
mapU:ClassTag: RDD[U]
map 函數將RDD中類型為 T 的元素,一對一地映射為類型為 U 的元素。
distinct(): RDD[T]
distinct 函數返回RDD中所有不一樣的元素
flatMapU:ClassTag:RDD
flatMap 函數將RDD中的每一個元素進行一對多轉換
repartition(numPartitions:Int):RDD[T]
repartition 只是 coalesce 接口中shuffle 為 ture 的簡易實現
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
假設RDD有N個分區,需要重新劃分成M個分區
randomSplit(weights: Array[Double],seed: Long=System.nanoTime): Array][RDD[T]]
randomSplit函數是根據weights權重將一個RDD切分成多個RDD
glom: RDD[Array[T]]
而glom函數是將RDD中每一個分區中類型T的元素轉換成數組Array[T],這樣每一個分區就只有一個數組元素
union(other: RDD[T]): RDD[T]
這些是針對RDD的集合操作,union操作將兩個RDD的數據進行合并,返回兩個RDD的并集(包含兩個RDD中相同的元素,不會去重)
intersection(other: RDD[T], partitioner: Partitoner)
intersection操作返回兩個RDD集合的交集,且交集中不會包含相同的元素
subtract(other: RDD[T]): RDD[T]
如果subtract所針對的兩個集合A和B,即操作是 val result=A.subtract(B),那么result中將會包含在A中出現且B中不出現的元素。intersection和subtract一般情況都會有shuffle的過程
subtract(other:RDD[T], p:Partitioner): RDD[T]
mapPartitions[U: ClassTag](f: Iterator[T]=>Iterator[U], parservesPartitioning: Boolean = false): RDD[U]
mapPartitions與map轉換操作類似,只不過映射函數的輸入參數由RDD中的每一個元素變成了RDD中每一個分區的迭代器,那么已經有了map為什么還要mapPartitions函數呢?如果在映射的過程中需要頻繁創建額外的對象,map就顯得不高效了。例如,將RDD中的所有數據通過JDBC連接寫入數據庫中,如果使用map函數可能需要為每一個元素都創建一個connection,這樣開銷是很大的,如果利用mapPartitions接口,可以針對每一個分區創建一個connection。
mapPartitonsWithIndex[U:ClassTag](f: (Int,Iterator[T])=>Iterator[U], preservesPartitioning: Boolean=false): RDD[U]
mapPartitionsWithIndex和mapPartitions功能類似,只是輸入參數多了一個分區的ID。
zipU:ClassTag:RDD[T, U]
zip函數的功能是將兩個RDD組合成Key/Value(健/值)形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則相同系統將會拋出異常。
zipPartitions[B: ClassTag, V:ClassTag](rdd2:RDD[B], preservesPartitioning:Boolean)(f:(Iterator[T],Iterator[B])=>Iterator[V]:RDD[V])
zipPartitions是將多個RDD安裝partition組合成為新的RDD,zipPartitions需要相互組合的RDD具有相同的分區數,但是對于每個分區中的元素數量是沒有要求的
未完待續…
5. 控制操作
6. 行動操作
1. 概述
2. 集合標量行動操作
first:返回RDD中第一個元素
count:返回RDD中元素的個數
reduce(f:(T, T)=>T):對RDD中的元素進行二元計算,返回計算結果
collect()/toArray():以集合形式返回RDD的元素
take(num: Int):將RDD作為集合,返回集合中[0, num-1]下標的元素
top(num: Int):按照默認的或者是指定的排序規則,返回前num個元素
takeOrdered(num: Int):以與top相反的排序規則,返回前num個元素
aggregate[U](zeroValue: U)(seqOp: (U, T)=>U, combOp(U, U)=>U)
aggregate行動操作中主要需要提供兩個函數,一個是seqOp函數,其將RDD(RDD中的每個元素的類型是T)中的每一個分區的數據聚合成類型為U的值。另一個函數combOp將各個分區聚合起來的值合并在一起得到最終類型U的返回值。這里的RDD元素的類型T和返回值的類型U可以為同一個類型。
fold(zeroValue: T)(op: (T, T)=>T)
fold是aggregate的便利接口,其中,op操作既是seqOp操作也是combOp操作,且最終的返回類型也是T,即與RDD中每一個元素的類型是一樣的
lookup(key: K): Seq[V]
lookup是針對(K, V)類型RDD的行動操作,對于給定的健值,返回與此健值相對應的所有值
3. 存儲行動操作
1. 概述
這里一共列出七個將RDD存儲到外部介質的舊版API,前六個API都是saveAsHadoopDataset這個API的簡易實現版本,僅僅支持將RDD存儲到HDFS中,而saveAsHadoopDataset的參數類型是jobConf,所以其不僅能將RDD存儲到HDFS上,也可以將RDD存儲到其它數據庫中,如Hbase,MangoDB等
將RDD保存到HDFS中通常情況需要關注或者設置五個參數,即文件保存的路徑、key值的class類型,Value值的class類型,RDD的輸出格式(outputFormat,如TextOutputFormat),以及最后一個相關的參數codec(如DefaultCodec、Gzip等)
2. 在Spark中針對新版Hadoop API提供了三個行動操作函數
總結
以上是生活随笔為你收集整理的第二章 Spark RDD以及编程接口的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode——树:递归
- 下一篇: 终于有代表提及物业问题,到底要不要取消物