【 RDD持久化】
文章目錄
- RDD持久化
- 操作:
- 總結:
- cache vs persist 區別:
- 存儲級別選擇:
- 移除rdd持久化 :
- 2.血緣關系
- 依賴關系分類:
- 4.shuffle 算子:
- 案例分析:
spark_05_RDD持久化
RDD持久化
rdda => rddb =>rddc action rdda => rddb =>rdde actionrddb 持久化操作 =》 調優的
操作:
1.persist() or cache() methods 2.觸發action之后 會對rdd數據進行持久化的總結:
1.cache() 不是action算子 是lazy 是懶加載的rdda => action job
rdda => cache => action job => rdd持久化 生效
rdda => action job rdda的數據從 rdd持久化的地方加載數據
rdda => rddb => rddc
rdda => rddb => rdde
rdda => rddb => rddf
rddb.cache 之后 rddb之后數據就不用從頭開機計算 提升計算效率
補充:
對rdd做持久化 就是對rdd里面的分區做持久化
好處:
1.much faster 【計算效率】
2. reuse 復用
cache vs persist 區別:
1.cache底層就是調用 persist算子 2.spark-core 持久化 默認存儲級別:StorageLevel.MEMORY_ONLYStorageLevel:
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
存儲級別選擇:
1.MEMORY_ONLY 首選 2.MEMORY_ONLY_SER 次選 1.Java serialization: By default,2.Kryo: 1.注冊 class移除rdd持久化 :
1.lru 2.手動 : RDD.unpersist(true) 立即執行的 eager2.血緣關系
lineage:rdda => rddb => rddc一個rdd是如何從父rdd計算得來的
textFile(path) => map => fiter => … => collect
每一個轉換都會形成一個rdd
好處:
容錯(性能 + 容錯)
容錯:
1.假如說RDDB 分區 6 8 元素 在計算的時候 掛了
一個鏈路 200個轉換 算到 第199個轉換 數據壞了 ,如果 從頭計算 也是挺麻煩的一件事情 :
1.spark-core 提供cache
而且 持久化的數據集 也支持 容錯
3.依賴關系:
rdda => rddb
不同的依賴 會導致 生成rdd分區數發生變化的
依賴關系分類:
1.寬依賴:1.一個父rdd的parition會被子rdd的parition使用多次 2.會產生shuffle 會有新的stage產生 2.窄依賴:1.一個父rdd的parition至多被子rdd的partition使用一次2.不會產生shuffle,都是在一個stage里面完成的shuffle:數據重新洗牌和算子:轉換算子
1.窄依賴算子
2.寬依賴算子
補充:
通過寬窄依賴可以知道 spark-core里面 轉換算子 哪些算子 可能引起shuffle
寬依賴:
xxxbykey shuffle
其他: join reduce
窄依賴:
map filter xxx
spark: stage是如何劃分? ****
spark-core 產生 寬依賴 就會劃分stage
算子:引起shuffle 就會劃分stage
一個shuffle算子 會劃分2個stage
兩個shuffle算子 會產生幾個stage???
3
4.shuffle 算子:
“生產上能使用窄依賴算子 就不使用寬依賴算子”:
1.不準確
1.生產上大部分 需求 必須使用寬依賴的
?
引起shuffle的算子:
? 1.xxxbykey =》
? 2. repartition and coalesce【不準確】
? 3. join:
? map join 不會引發shuffle
? reduce join /common join =》 引起shuffle
生產上調整 計算的并行度?
coalesce: 一般用于 減少rdd的分區數 =》 窄依賴 =》 不會引起shuffle
repartition:增加rdd的分區數
coalesce(num,shuffle=true)
思考:
可不可以使用coalesce 增加rdd分區數 ? 可以的
repartition 減少rdd的分區數? 不能
思考:
coalesce 增加rdd分區數 ? 走不走shuffle? 必然走shuffle
注意:
生產上用于調整 計算的并行度
思考:
rdd =》 hdfs 200個小文件
rdd.coalesce(10) 10個文件
application :
driver:
executor:
yarn container
rdd: => executor
code => executor 判斷code 操作對象是不是rdd里面的元素
=> driver
案例分析:
網站訪問量排名:
domain uid flow
www.baidu.com,uid01,1
www.baidu.com,uid01,10
www.baidu.com,uid02,3
www.baidu.com,uid02,5
www.github.com,uid01,11
www.github.com,uid01,10
www.github.com,uid02,30
www.github.com,uid02,50
www.bibili.com,uid01,110
www.bibili.com,uid01,10
www.bibili.com,uid02,2
www.bibili.com,uid02,3
需求:
每個域名每個用戶的訪問量的top3
sql=> 分組+聚合+ 開窗
code =》 分組+聚合 + topn
((www.bibili.com,uid02),5)
((www.github.com,uid02),80)
((www.baidu.com,uid02),8)
((www.github.com,uid01),21)
((www.baidu.com,uid01),11)
((www.bibili.com,uid01),120)
top2:
uid02 www.github.com 80
uid02 www.bibili.com 5
存在安全隱患:
x.toList.
sparkcore 進行數據分析:
rdd進行操作 不要使用scala里面的結合進行存儲
思想:
分而治之 類似 mr 分組
sparkcore: 沒講的
1.廣播變量 =》 sss
2.累加器 =》 項目
需求:
spark-core =》 wc
input:hdfs
todo:wc
output:hdfs
idea:
spark code =》 jar =》 服務器上
部署spark作業:
1.jar
2.spark-submit 提交作業
// val in = “hdfs://bigdata32:9000/input/”
// val out = “hdfs://bigdata32:9000/output/”
提交spark作業:
spark-submit
–class com.dl2262.sparkcore.day02.WCApp
–master local[2]
–name wordcount
/home/hadoop/project/spark/spark-2262-1.0.jar
hdfs://bigdata32:9000/input/1 hdfs://bigdata32:9000/output/
作業:
1.yarn 提交?
2.–conf 任意Spark配置屬性
–properties-file 要從中加載額外文件的文件路徑
spark-submit \ --class com.dl2262.sparkcore.day02.WCApp \ --master yarn \ --deploy-mode client \ --name wordcount \ --driver-memory 1G /home/hadoop/project/spark/spark-2262-1.0.jar \ hdfs://bigdata32:9000/input/1 hdfs://bigdata32:9000/output/spark-submit
–class com.dl2262.sparkcore.day02.WCApp
–master local[2]
–name wordcount
–conf spark.input.path=hdfs://bigdata32:9000/input/1
–conf spark.output.path=hdfs://bigdata32:9000/output1/
/home/hadoop/project/spark/spark-2262-1.0.jar
總結
- 上一篇: 22年电赛冬令营授课
- 下一篇: 沉默的螺旋理论(转载)