031 广播变量与累加器
生活随笔
收集整理的這篇文章主要介紹了
031 广播变量与累加器
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.廣播變量機制
將傳遞給task的值,變成傳遞給executor。
為什么可以共用,因為task是executor下的線程。
只讀的變量,在task中不允許修改
?
2.累加器介紹
在只寫的變量,在task中只允許被修改,不允許讀的操作。
但是在driver中就只能讀操作。
?
?3.程序
需求一:對應于MR中的累加器,累積計算次數
需求二:將累加器做成共享變量來使用。避免了shuffle過程,提高了效率。
1 package com.ibeifeng.senior.accumulator 2 3 import org.apache.spark.{AccumulableParam, SparkConf, SparkContext} 4 5 import scala.collection.mutable 6 import scala.util.Random 7 8 /** 9 * Spark累加器 10 * Created by ibf on 02/15. 11 */ 12 object AccumulatorDemo { 13 def main(args: Array[String]): Unit = { 14 val conf = new SparkConf() 15 //.setMaster("local[*]") // local模式下默認不進行失敗重啟機制 16 .setMaster("local[*,4]") // 開啟local模式的失敗重啟機制,重啟次數4-1=3次 17 .setAppName("accumulator") 18 val sc = SparkContext.getOrCreate(conf) 19 20 // =============================== 21 val rdd = sc.parallelize(Array( 22 "hadoop,spark,hbase", 23 "spark,hbase,hadoop", 24 "", 25 "spark,hive,hue", 26 "spark,hadoop", 27 "spark,,hadoop,hive", 28 "spark,hbase,hive", 29 "hadoop,hbase,hive", 30 "hive,hbase,spark,hadoop", 31 "hive,hbase,hadoop,hue" 32 ), 5) 33 34 // 需求一:實現WordCount程序,同時統計輸入的記錄數量以及最終輸出結果的數量 35 val inputRecords = sc.accumulator(0, "Input Record Size") 36 val outputRecords = sc.accumulator(0, "Output Record Size") 37 rdd.flatMap(line => { 38 // 累計數量 39 inputRecords += 1 40 val nline = if (line == null) "" else line 41 // 進行數據分割、過濾、數據轉換 42 nline.split(",") 43 .map(word => (word.trim, 1)) // 數據轉換 44 .filter(_._1.nonEmpty) // word非空,進行數據過濾 45 }) 46 .reduceByKey(_ + _) 47 .foreachPartition(iter => { 48 iter.foreach(record => { 49 // 累計數據 50 outputRecords += 1 51 println(record) 52 }) 53 }) 54 55 println(s"Input Size:${inputRecords.value}") 56 println(s"Ouput Size:${outputRecords.value}") 5758 // 需求二:假設wordcount的最終結果可以在driver/executor節點的內存中保存下,要求不通過reduceByKey相關API實現wordcount程序 59 /** 60 * 1. 每個分區進行wordcount的統計,將結果保存到累加器中 61 * 2. 當分區全部執行完后,各個分區的累加器數據進行聚合操作 62 */ 63 val mapAccumulable = sc.accumulable(mutable.Map[String, Int]())(MapAccumulableParam)//MapAccumulableParam是強制轉換 64 try 65 rdd.foreachPartition(iter => { 66 val index = Random.nextInt(2) // index的取值范圍[0,1] 67 iter.foreach(line => { 68 val r = 1 / index 69 print(r) 70 val nline = if (line == null) "" else line 71 // 進行數據分割、過濾、數據轉換 72 nline.split(",") 73 .filter(_.trim.nonEmpty) // 過濾空單詞 74 .map(word => { 75 mapAccumulable += word // 統計word出現的次數 76 }) 77 }) 78 }) 79 catch { 80 case e: Exception => println(s"異常:${e.getMessage}") 81 } 82 println("result================") 83 mapAccumulable.value.foreach(println) 84 85 Thread.sleep(100000) 86 } 87 } 88 89 90 object MapAccumulableParam extends AccumulableParam[mutable.Map[String, Int], String] { 91 /** 92 * 添加一個string的元素到累加器中 93 * 94 * @param r 95 * @param t 96 * @return 97 */ 98 override def addAccumulator(r: mutable.Map[String, Int], t: String): mutable.Map[String, Int] = { 99 r += t -> (1 + r.getOrElse(t, 0)) 100 } 101 102 /** 103 * 合并兩個數據 104 * 105 * @param r1 106 * @param r2 107 * @return 108 */ 109 override def addInPlace(r1: mutable.Map[String, Int], r2: mutable.Map[String, Int]): mutable.Map[String, Int] = { 110 r2.foldLeft(r1)((a, b) => { 111 a += b._1 -> (a.getOrElse(b._1, 0) + b._2) 112 }) 113 } 114 115 /** 116 * 返回初始值 117 * 118 * @param initialValue 119 * @return 120 */ 121 override def zero(initialValue: mutable.Map[String, Int]): mutable.Map[String, Int] = initialValue 122 }
?
轉載于:https://www.cnblogs.com/juncaoit/p/6542166.html
總結
以上是生活随笔為你收集整理的031 广播变量与累加器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hihocoder1479 三等分
- 下一篇: LIstview滑动时不加载图片,停止时