学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)
文章目錄
- 一、創建RDD
- 1.1、啟動Spark shell
- 1.2、創建RDD
- 1.2.1、從集合中創建RDD
- 1.2.2、從外部存儲中創建RDD
- 任務1:
- 二、RDD算子
- 2.1、map與flatMap算子應用
- 2.1.1、map
- 2.1.2、flatMap
- 2.1.3、mapPartitions
- 2.2、sortBy與filter算子應用
- 2.2.1、sortBy
- 2.2.2、filter
- 任務2:
- 2.3、交集與并集計算的算子應用
- 2.3.1、distinct
- 2.3.2、union
- 2.3.3、intersection
- 2.3.4、subtract
- 2.3.5、cartesian
- 任務3:
- 2.4、 鍵值對RDD常用算子
- 2.4.1、 創建鍵值對RDD
- 2.4.2、mapValues
- 2.4.3、groupByKey
- 2.4.4、reduceByKey
- 2.4.5、join
- 2.5、常用Action類型算子
- 2.5.1、lookup
- 2.5.2、collect
- 2.5.3、take
- 2.5.4、count
- 任務4:
- 三、文件讀取與存儲
- 3.1、saveAsTextFile
- 3.2、repartition
- 3.3、saveAsSequenceFile
- 3.4、sequenceFile
- 綜合練習1
一、創建RDD
1.1、啟動Spark shell
進入Spark命令行交互界面:spark-shell
退出交互界面::q
查看客戶端:(http://master:8080)
設置日志級別
1.2、創建RDD
在Spark中創建RDD的創建方式大概可以分為三種:
- 從集合中創建RDD
- 從外部存儲創建RDD
- 從其他RDD創建
1.2.1、從集合中創建RDD
parallelize():通過parallelize函數把一般數據結構加載為RDD parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]Parallelize Rdd默認分區數:sc.defaultParallelism,可通過spark.default.parallelism設置sc.defaultParallelism的值,沒有配置spark.default.parallelism時的默認值等于cpu的核數
例1
例2
1.2.2、從外部存儲中創建RDD
通過textFile直接加載數據文件為RDD
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]讀取HDFS時默認分區數:rdd的分區數 = max(hdfs文件的block數目, sc.defaultMinPartitions),sc.defaultMinPartitions=min(sc.defaultParallelism,2)
從本地文件讀取:本地file的分片規則,應該按照hdfs的block大小劃分,但實測的結果是固定按照32M來分片
讀取HDFS上文件:
讀取本地文件:
(注意:讀取本地文件時,要確保每個集群上都要有文件,否則會報錯)
任務1:
1、HDFS上有三份文件,分別為student.txt(學生信息表),result_bigdata.txt(大數據基礎成績表),result_math.txt(數學成績表)
加載student.txt為名稱為student的RDD數據,result_bigdata.txt為名稱為bigdata的RDD數據,result_math.txt為名稱為math的RDD數據
數據:
程序:
二、RDD算子
2.1、map與flatMap算子應用
2.1.1、map
map(func)- Transformation類型算子
- map: 將原來RDD的每個數據項通過map中的用戶自定義函數f轉換成一個新的RDD,map操作不會改變RDD的分區數目
示例:使用map函數對RDD中每個元素進行倍數操作
2.1.2、flatMap
flatMap(func)- Transformation類型算子
- flatMap:對集合中的每個元素進行map操作再扁平化
示例:使用flatMap分割單詞
2.1.3、mapPartitions
mapPartitions(func)- Transformation類型算子
- 和map功能類似,但是輸入的元素是整個分區,即傳入函數的操作對象是每個分區的Iterator集合,該操作不會導致Partitions數量的變化
示例:取出每個分區中大于3的值
2.2、sortBy與filter算子應用
2.2.1、sortBy
sortBy(f:(T) => K, ascending, numPartitions)- Transformation類型算子
- 是可以對標準RDD進行排序
- sortBy()可接受三個參數:
- f:(T) => K:左邊是要被排序對象中的每一個元素,右邊返回的值是元素中要進行排序的值。
- ascending:決定排序后RDD中的元素是升序還是降序,默認是true,也就是升序,false為降序排序。
- numPartitions:該參數決定排序后的RDD的分區個數,默認排序后的分區個數和排序之前的個數相等。
示例:按照每個元素的第二個值進行降序排序
2.2.2、filter
filter(func)- Transformation類型算子
- 保留通過函數func,返回值為true的元素,組成新的RDD
- 過濾掉data RDD中元素小于或等于2的元素
示例:
任務2:
根據任務1得到的RDD bigdata及math,取出成績排名前5的學生成績信息
2.3、交集與并集計算的算子應用
2.3.1、distinct
distinct([numPartitions]))- Transformation類型算子
- 針對RDD中重復的元素,只保留一個元素
示例:
2.3.2、union
union(otherDataset)- 合并RDD,需要保證兩個RDD元素類型一致
示例:合并rdd1和rdd2
2.3.3、intersection
intersection(otherDataset)- 找出兩個RDD的共同元素,也就是找出兩個RDD的交集
示例:找出c_rdd1和c_rdd2中相同的元素
2.3.4、subtract
subtract (otherDataset)- 獲取兩個RDD之間的差集
示例:找出rdd1與rdd2之間的差集
2.3.5、cartesian
cartesian(otherDataset)- 笛卡爾積就是將兩個集合的元素兩兩組合成一組
示例:
任務3:
1、找出考試成績得過100分的學生ID,最終的結果需要集合到一個RDD中。
2、找出兩門成績都得100分的學生ID,結果匯總為一個RDD。
2.4、 鍵值對RDD常用算子
雖然大部分Spark的RDD操作都支持所有種類的單值RDD,但是有少部分特殊的操作只能作用于鍵值對類型的RDD。
顧名思義,鍵值對RDD由一組組的鍵值對組成,這些RDD被稱為PairRDD。PairRDD提供了并行操作各個鍵或跨節點重新進行數據分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規約每個鍵對應的數據,還有join()方法,可以把兩個RDD中鍵相同的元素組合在一起,合并為一個RDD。
2.4.1、 創建鍵值對RDD
將一個普通的RDD轉化為一個PairRDD時可以使用map函數來進行操作,傳遞的函數需要返回鍵值對。
做為鍵值對類型的RDD,包含了鍵跟值兩個部分。Spark提供了兩個方法分別獲取鍵值對RDD的鍵跟值。keys返回一個僅包含鍵的RDD,values返回一個僅包含值的RDD。
2.4.2、mapValues
mapValues(func)- 類似map,針對鍵值對(Key,Value)類型的數據中的Value進行map操作,而不對Key進行處理
示例:
2.4.3、groupByKey
groupByKey([numPartitions])- 按鍵分組,在(K,V)對組成的RDD上調用時,返回(K,Iterable)對組成的新的RDD。
示例:將rdd按鍵進行分組
2.4.4、reduceByKey
- 將鍵值對RDD按鍵分組后進行聚合
- 當在(K,V)類型的鍵值對組成的RDD上調用時,返回一個(K,V)類型鍵值對組成的新RDD
- 其中新RDD每個鍵的值使用給定的reduce函數func進行聚合,該函數必須是(V,V)=>V類型
示例:統計每個鍵出現的次數
2.4.5、join
- 把鍵值對數據相同鍵的值整合起來
- 其他連接有:leftOuterJoin, rightOuterJoin, and fullOuterJoin
join: 把鍵值對數據相同鍵的值整合起來
2.5、常用Action類型算子
2.5.1、lookup
lookup(key: K)- Action類型算子
- 作用于(K,V)類型的RDD上,返回指定K的所有V值
示例:
2.5.2、collect
collect()- 返回RDD中所有的元素
- collectAsMap(): Map[K, V]
示例:
2.5.3、take
take(num)- 返回RDD前面num條記錄
示例:
2.5.4、count
count()- 計算RDD中所有元素個數
任務4:
1、輸出每位學生的總成績,要求將兩個成績表中學生ID相同的成績相加。
2、輸出每位學生的平均成績,要求將兩個成績表中學生ID相同的成績相加并計算出平均分。
3、合并每個學生的總成績和平均成績。
三、文件讀取與存儲
3.1、saveAsTextFile
saveAsTextFile(path: String)- 把RDD保存到HDFS中
3.2、repartition
repartition(numPartitions: Int)- 可以增加或減少此RDD中的并行級別。在內部,它使用shuffle重新分發數據。
- 如果要減少此RDD中的分區數,請考慮使用coalesce,這樣可以避免執行shuffle。
- coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
3.3、saveAsSequenceFile
saveAsSequenceFile(path)- 保存成序列化文件
- 將數據集的元素作為Hadoop SequenceFile編寫,只支持鍵值對RDD
3.4、sequenceFile
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)- 讀取序列化文件
綜合練習1
綜合練習:基于3個基站的日志數據,要求計算某個手機號碼在一天之內出現時間最多的兩個地點。
模擬了一些簡單的日志數據,共4個字段:手機號碼,時間戳,基站id,連接類型(1表示建立連接,0表示斷開連接):
基站A:
基站B:
基站C:
程序:
總結
以上是生活随笔為你收集整理的学习笔记Spark(四)—— Spark编程基础(创建RDD、RDD算子、文件读取与存储)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学习笔记Spark(三)—— Spark
- 下一篇: 学习笔记Spark(六)—— Spark