大数据实战第十六课(上)-Spark-Core04
一、上次課回顧
二、Shuffle剖析
- 2.1 Shuffle簡介
- 2.2 Shuffle背景
- 2.3 Shuffle Performance Impact(Shuffle 性能上的影響)
三、shuffle在Spark-shell操作
- 3.1 IDEA下進行分組
- 3.2 coalesce和repartition 在生產(chǎn)中的使用
- 3.3 reduceByKey和groupByKey分析
- 3.4 圖解reduceByKey和groupByKey的shuffle過程
- 3.5 探究源碼reduceByKey和groupByKey的combiner
四、擴展:aggregateByKey算子
- 4.1 collectAsMap
一、上次課回顧
大數(shù)據(jù)實戰(zhàn)第十五課(上)之-Spark-Core03:
https://blog.csdn.net/zhikanjiani/article/details/91045640#id_4.2
YARN HADOOP_CONF_DIR
對于yarn模式是否需要在$SPARK_HOME/conf下的slaves下修改localhosts為Hadoop002,。
跑yarn的時候只需要這臺機器作為客戶端就行了;為什么spark on yarn說的是它僅僅只需要一個客戶端。
問:Spark on yarn是否需要啟動這些東西?
在$SPARK_HOME/sbin/start-all.sh
/start-master.sh start-slaves.sh slaves
跑Spark on yarn,哐哐哐要把spark節(jié)點啟動起來。
只要gateway+spark submit就行了,根本不需要啟動什么進程就行。
二、Shuffle剖析
2.1 Shuffle簡介
- 回顧:一個action會觸發(fā)一個job,一個job遇到shuffle會分裂出一個stage,stage中是一堆task。
參見官網(wǎng):http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
需求:
給了你一堆通話記錄call records ==> 統(tǒng)計本月打出去了多少電話
進入手機通話界面:通訊人、通話時間、通話時長、通話記錄。
spark中統(tǒng)計分析都是基于wc,(天時間+撥打,1), 天時間+撥打作為一個key,進行reduceByKey()操作。
相同的天時間+撥打 ==> shuffle到同一個reduce上去,你能進行累加操作么?是不能的
引出:某一種具有特定特征的數(shù)據(jù)匯聚到某一個節(jié)點進行計算,此處進行+1操作
注意:能避免shuffle的操作盡量避免。
- Shuffle operations
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism(機制) for re-distributing data(重新分發(fā)數(shù)據(jù)) so that it’s grouped differently across partitions. This typically involves copying data across executors(拷貝數(shù)據(jù)到機器上,會有磁盤和網(wǎng)絡(luò)IO) and machines, making the shuffle a complex and costly operation(是的shuffle成為了一個復(fù)雜的并且成本高的操作).
重新分發(fā)數(shù)據(jù)還跨分區(qū)的一個操作,這個典型的操作還涉及到拷貝數(shù)據(jù)到不同的機器上,還會有磁盤IO和網(wǎng)絡(luò)IO,所以shuffle是一個復(fù)雜的并且成本高的操作。
2.2 Shuffle背景
- 我們以reduceByKey來理解shuffle操作中會發(fā)生什么.
- reduceByKey操作生成一個新的RDD,每一個key所對應(yīng)的的值都會被組合成一個元組
- (相同特征的key會被分到一個reduce上去處理).
- 不是所有的key對應(yīng)的value都是保存在相同的分區(qū)下的(帶來的挑戰(zhàn)是:結(jié)果是跨分區(qū)的,它們必須要在同一個地點協(xié)同工作。)
- 有哪些操作可能會產(chǎn)生一些Shuffle?
2.3 Shuffle Performance Impact(性能上的影響)
- Spark產(chǎn)生一系列的task ==> spark會產(chǎn)生一堆的stage,shuffle產(chǎn)生新的stage,stage產(chǎn)生一堆的task
- 本質(zhì)上,獨立的map結(jié)果保存在內(nèi)存上,reduce端會讀取相關(guān)排序數(shù)據(jù)(map端輸出的)。
三、Shuffle在Spark-shell操作
1、啟動Spark-shell:
scala> val info = sc.textFile("hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt") info: org.apache.spark.rdd.RDD[String] = hdfs://hadoop002:9000/wordcount/input/ruozeinput.txt MapPartitionsRDD[1] at textFile at <console>:24scala> info.partitions.length res0: Int = 2scala> val info1 = info.coalesce(1) info1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at <console>:25scala> info1.partitions.length res1: Int = 1scala> val info2 = info.coalesce(4) info2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at <console>:25scala> info2.partitions.length res2: Int = 2scala> val info3 = info.coalesce(4,true) info3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at <console>:25scala> info3.partitions.length res3: Int = 4scala> info3.collect res4: Array[String] = Array(hello world, hello, hello world john)解釋coalesce方法、
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
- 傳入一個分區(qū)數(shù),傳入一個true或者false,可傳可不傳,
coalesce(numPartitions, shuffle = true)
}
- 調(diào)用的就是coalesce,肯定是會僅從shuffle的。
- scala> info3.collect
res4: Array[String] = Array(hello world, hello, hello world john)
-
scala> val info4 = info.repartition(5)
info4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :25 -
scala> info4.collect
res6: Array[String] = Array(hello world john, hello world, hello) -
scala> info.partitions.length
res7: Int = 2
2個分區(qū)變?yōu)?個分區(qū),對數(shù)據(jù)重新做分發(fā),使用coalesce,避免你做一個shuffle的動作
3.1 IDEA下進行分組:
package spark01import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf()sparkConf.setAppName("LogApp").setMaster("local[2]")val sc = new SparkContext(sparkConf)val students = sc.parallelize(List("黃帆","梅宇豪","秦朗","楊朝珅","王乾","沈兆乘","沈其文","陳思文"),3)students.mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext){ //迭代分區(qū)stus += ("~~~~" + partition.next() + ",哪個組:" + (index+1))}stus.iterator}).foreach(println) //進行打印sc.stop()}} mapPartitionWithIndex():意思是分分區(qū),加一個組編號 在parallelize中設(shè)置并行度,明確是3個組;需求一:
部門裁員,三個組變成二個組,進行如下修改:
- students.mapPartitionsWithIndex((index,partition) ==>
變更如下 :
students.coalesce(2).mapPartitionsWithIndex((index,partition)
需求二:
部門裁員前是三個組,把他們重新分組變成5個組
students.repartition(5).mapPartitionsWithIndex((index,partition)
為了直觀顯示partition和repartition操作:
可以運行如下代碼:
package Sparkcore04import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RepartitionApp {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf();sparkConf.setAppName("LogApp").setMaster("local[2]");val sc = new SparkContext(sparkConf);val students = sc.parallelize(List("梅宇豪","黃帆","楊超神","薛思雨","朱昱璇","周一虹","王曉嵐","沈兆乘","陳思文"),3);students.mapPartitionsWithIndex((index,partition) =>{val stus = new ListBuffer[String]while(partition.hasNext){stus += ("~~~~" + partition.next() + ",哪個組:" + (index+1))}stus.iterator}).foreach(println)println("---------------------------分割線---------------------------")students.repartition(4).mapPartitionsWithIndex((index,partition) => {val stus = new ListBuffer[String]while(partition.hasNext) {stus += ("~~~" + partition.next() + ",新組" + (index+1))}stus.iterator}).foreach(println)sc.stop()}}3.2 coalesce和repartition 在生產(chǎn)中的使用:
假設(shè)一個RDD中有300個分區(qū),每個分區(qū)中只有一條記錄"id=100“,
此時做了一個filter操作(id > 99),結(jié)果就是還是有300個partition,每個partition中只有一條數(shù)據(jù)
變換起始條件:
- rePartition應(yīng)用場景:可以把數(shù)據(jù)打散,提升并行度。
3.3 ReduceByKey和groupByKey分析
1、手寫一個word count:
在secureCRT上啟動spark-shell --master local[2]
執(zhí)行如下:sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+).collect
查看DAG圖:第一個算子textFile、第二個算子flatMap、第三個算子map,遇到reduceByKey,一拆前面一個stage后面一個stage
兩個stage,做reduceByKey的時候按照(_,1)的數(shù)據(jù)先寫出來,再讀進去。
reduceByKey的數(shù)據(jù)結(jié)構(gòu)是:[String,Int]:代表的是單詞出現(xiàn)的個數(shù)
2、reduceByKey和groupByKey的數(shù)據(jù)結(jié)構(gòu):
-
scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).reduceByKey(+)
res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at :25 -
scala> sc.textFile(“file:///home/hadoop/data/ruozeinput.txt”).flatMap(.split("\t")).map((,1)).groupByKey()
res5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[14] at groupByKey at :25
reduceByKey完成wordcount:
res10: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
groupByKey完成wordcount:
res11: Array[(String, Int)] = Array((hello,3), (world,2), (john,1))
小結(jié):
對比UI中的兩張圖:reduceByKey讀進來53B,shuffle的數(shù)據(jù)161B;而groupBykey讀進來的數(shù)據(jù)是53B,shuffle的數(shù)據(jù)卻是172B.
groupByKey所有的數(shù)據(jù)未經(jīng)計算
reduceByKey做了局部聚合操作,本地做了combiner,combiner的結(jié)果再經(jīng)過shuffle,所以數(shù)據(jù)量會少一些。
3.4 圖解reduceByKey和groupByKey的shuffle過程
假設(shè)有三個map的數(shù)據(jù):第一個(a,1)(b,1) 第二個:(a,1)(b,1) (a,1)(b,1) 第三個:(a,1)(b,1) (a,1)(b,1) (a,1)(b,1)groupByKey的shuffle過程:
reduceByKey的shuffle過程:
為啥reduceByKey的數(shù)據(jù)量要少一點,因為在map端先做了聚合減少了shuffle的數(shù)據(jù)量。
擴展aggregateByKey算子:
有些方法使用reduceByKey解決不了,引出新的算子:
源碼面前了無秘密:
groupByKey中的源碼:
在pairRDDFunctions.scala中定義的groupByKey方法:
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn’t use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
我們注意到combine的默認值就是false.
reduceByKey中的源碼:
- def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, “mergeCombiners must be defined”) // required as of Spark 0.9.0
我們注意到combine的默認值就是true.
4.1 collectAsMap
注釋:所有的數(shù)據(jù)都會被加載到driver的內(nèi)存,會扛不住掛掉
/*** Return the key-value pairs in this RDD to the master as a Map.** Warning: this doesn't return a multimap (so if you have multiple values to the same key, only* one value per key is preserved in the map returned)** @note this method should only be used if the resulting data is expected to be small, as* all the data is loaded into the driver's memory.*/def collectAsMap(): Map[K, V] = self.withScope {val data = self.collect()val map = new mutable.HashMap[K, V]map.sizeHint(data.length)data.foreach { pair => map.put(pair._1, pair._2) }map}在RDD.scala中:
記住:只要看到了源碼中有runJob,那么它一定就會觸發(fā)action.
/*** Return the number of elements in the RDD.*/def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum /*** Return an array that contains all of the elements in this RDD.** @note This method should only be used if the resulting array is expected to be small, as* all the data is loaded into the driver's memory.*/def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}Array.concat(results: _) ==> 這邊并不是可變參數(shù)
點擊concat進入下一層源碼:
-def concat[T: ClassTag](xss: Array[T]): Array[T] //這個才是可變參數(shù)的定義
在Scala04課程中有所體現(xiàn)。
println(sum(1.to(10) :_* ))
總結(jié)
以上是生活随笔為你收集整理的大数据实战第十六课(上)-Spark-Core04的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 转角遇到爱之经典对白与漫画文字
- 下一篇: Android(root)设备HTTPS