spark 简单实战_SparkCore入门实战 (二)
一、鍵值對RDD數據分區器
鍵值對RDD數據分區器Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬于哪個分區和Reduce的個數
注意:(1)只有Key-Value類型的RDD才有分區器的,非Key-Value類型的RDD分區器的值是None
(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬于那個分區的。
1、獲取RDD分區
可以通過使用RDD的partitioner 屬性來獲取 RDD 的分區方式。它會返回一個 scala.Option 對象, 通過get方法獲取其中的值。相關源碼如下:
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
(1)創建一個pairRDD
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[14] at parallelize at :24
(2)查看RDD的分區器
scala> pairs.partitioner
res13: Option[org.apache.spark.Partitioner] = None
(3)導入HashPartitioner類
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
(4)使用HashPartitioner對RDD進行重新分區
scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at partitionBy at :27
(5)查看重新分區后RDD的分區器
scala> partitioned.partitioner
res14: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
2、Hash分區HashPartitioner分區的原理:對于給定的key,計算其hashCode,并除以分區的個數取余,如果余數小于0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。
使用Hash分區的實操
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at :25
scala> nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect
res16: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[18] at partitionBy at :27
scala> hashpar.count
res17: Long = 6
scala> hashpar.partitioner
res18: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)
scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res19: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
scala> nopar.partitioner
res20: Option[org.apache.spark.Partitioner] = None
3、 Ranger分區HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。
RangePartitioner作用:將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。實現過程為:
第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每個分區的最大key值,形成一個Array[KEY]類型的數組變量rangeBounds;
第二步:判斷key在rangeBounds中所處的范圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是可以排序的
4、自定義分區
要實現自定義的分區器,你需要繼承 org.apache.spark.Partitioner 類并實現下面三個方法。(1)numPartitions: Int:返回創建出來的分區數。
(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。
(3)equals():Java 判斷相等性的標準方法。這個方法的實現非常重要,Spark 需要用這個方法來檢查你的分區器對象是否和其他分區器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區方式是否相同。
需求:將相同后綴的數據寫入相同的文件,通過將相同后綴的數據分區到相同的分區并保存輸出來實現。
(1)創建一個pairRDD
scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at :25
(2)定義一個自定義分區類
scala> :paste
// Entering paste mode (ctrl-D to finish)class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
//覆蓋分區數 override def numPartitions: Int = numParts
//覆蓋分區號獲取函數 override def getPartition(key: Any): Int = {
val ckey: String = key.toString
ckey.substring(ckey.length-1).toInt%numParts
}
}
// Exiting paste mode, now interpreting.
defined class CustomerPartitioner
(3)將RDD使用自定義的分區類進行重新分區
scala> val par = data.partitionBy(new CustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at :27
(4)查看重新分區后的數據分布
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。Spark 中有許多依賴于數據混洗的方法,比如 join() 和 groupByKey(),它們也可以接收一個可選的 Partitioner 對象來控制輸出數據的分區方式。
二、Spark連接HBase數據讀取與保存
數據讀取與保存Spark的數據讀取及數據保存可以從兩個維度來作區分:文件格式以及文件系統。
文件格式分為:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系統分為:本地文件系統、HDFS、HBASE以及數據庫
1、文件類數據讀取與保存
Text文件
(1)數據讀取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://hadoop105:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at :24
(2)數據保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")
Json文件如果JSON文件中每一行就是一個JSON記錄,那么可以通過將JSON文件當做文本文件來讀取,然后利用相關的JSON庫對每一條數據進行JSON解析。
注意:使用RDD讀取JSON文件處理很復雜,同時SparkSQL集成了很好的處理JSON文件的方式,所以應用中多是采用SparkSQL處理JSON文件。
準備文件數據:
(1)在in文件夾下創建user.json文件數據,編輯內容:
{"name":"123","age": 20}
{"name":"456","age": 20}
{"name":"789","age": 20}
方式一
代碼實現:
package com.study.bigdatabase
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.mortbay.util.ajax.JSON
//檢查點object Spark06_RDD_Serializable {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
//讀取文件 val json = sc.textFile("in/user.json")
//解析json數據 val result = json.map(JSON.parse)
result.foreach(println)
//釋放資源 sc.stop()
}
}
啟動程序,控制臺打印:
方式二:
命令方式:
(1)導入解析json所需的包
scala> import scala.util.parsing.json.JSON
(2)上傳json文件到HDFS
[root@hadoop105 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)讀取文件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at :24
(4)解析json數據
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at :27
(5)打印
scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
2、 Sequence文件SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,可以調用 sequenceFile keyClass, valueClass。
注意:SequenceFile文件只針對PairRDD
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at :26
(2)將RDD保存為Sequence文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看該文件
[root@hadoop105 seqFile]$ pwd
/opt/module/spark/seqFile
[root@hadoop105 seqFile]$ ll
總用量 8
-rw-r--r-- 1 atguigu atguigu 108 10月 9 10:29 part-00000
-rw-r--r-- 1 atguigu atguigu 124 10月 9 10:29 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:29 _SUCCESS
[root@hadoop105 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritable
(4)讀取Sequence文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at :24
(5)打印讀取后的Sequence文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
3、對象文件對象文件是將對象序列化后保存的文件,采用Java的序列化機制。可以通過objectFile [k,v] (path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也可以通過調用saveAsObjectFile() 實現對對象文件的輸出。因為是序列化所以要指定類型。
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at :26
(2)將RDD保存為Object文件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看該文件
[root@hadoop105 objectFile]$ pwd
/opt/module/spark/objectFile
[root@hadoop105 objectFile]$ ll
總用量 8
-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00000
-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:37 _SUCCESS
[root@hadoop105 objectFile]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l
(4)讀取Object文件
scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at :24
(5)打印讀取后的Sequence文件
scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
4、文件系統類數據讀取與保存HDFS
Spark的整個生態系統與Hadoop是完全兼容的,所以對于Hadoop所支持的文件類型或者數據庫類型,Spark也同樣支持.另外,由于Hadoop的API有新舊兩個版本,所以Spark為了能夠兼容Hadoop所有的版本,也提供了兩套創建操作接口.對于外部存儲創建操作而言,hadoopRDD和newHadoopRDD是最為抽象的兩個函數接口,主要包含以下四個參數.(1)輸入格式(InputFormat):制定數據輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
(2)鍵類型: 指定[K,V]鍵值對中K的類型
(3)值類型: 指定[K,V]鍵值對中V的類型
(4)分區值:指定由外部存儲生成的RDD的partition數量的最小值,如果沒有指定,系統會使用默認值defaultMinSplits注意:其他創建操作的API接口都是為了方便最終的Spark程序開發者而設置的,是這兩個接口的高效實現版本.例如,對于textFile而言,只有path這個指定文件路徑的參數,其他參數在系統內部指定了默認值。
a、在Hadoop中以壓縮形式存儲的數據,不需要指定解壓方式就能夠進行讀取,因為Hadoop本身有一個解壓器會根據壓縮文件的后綴推斷解壓算法進行解壓.
b、如果用Spark從Hadoop中讀取某種類型的數據不知道怎么讀取的時候,上網查找一個使用map-reduce的時候是怎么讀取這種這種數據的,然后再將對應的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就行了
5、MySQL數據庫連接
支持通過Java JDBC訪問關系型數據庫。需要通過JdbcRDD進行,示例如下:
數據庫準備:
(1)添加依賴
mysql
mysql-connector-java
5.1.27
代碼實現:
package com.study.bigdatabase
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
//檢查點object Spark07_RDD_MySql {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val driver="com.mysql.jdbc.Driver"
var url="jdbc:mysql://hadoop105:3306/RDD"
val userName="root"
val password="123456"
//創建 JDBCRDD,方法數據庫 var sql ="select name,age from user where id >= ? and id <= ?"
val jdbcRDD = new JdbcRDD(
sc,
() => {
//獲取數據庫連接對象 Class.forName(driver)
java.sql.DriverManager.getConnection(url, userName, password)
},
sql,
1,
3,
2,
(rs)=>{
println(rs.getString(1)+","+rs.getInt(2))
}
)
jdbcRDD.collect()
//釋放資源 sc.stop()
}
}
啟動程序,控制臺打印信息:
若代碼中sql這樣編寫,會出錯:
var sql ="select name,age from user"
運行程序,查看信息:
保存數據
準備user表:空數據
代碼實現:
package com.study.bigdatabase
import java.sql.Connection
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
//檢查點object Spark07_RDD_MySql {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val driver="com.mysql.jdbc.Driver"
var url="jdbc:mysql://hadoop105:3306/RDD"
val userName="root"
val password="123456"
//保存數據 val dataRDD:RDD[(String,Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
dataRDD.foreach{
case ( name,age ) =>{
Class.forName(driver)
val connection:Connection = java.sql.DriverManager.getConnection(url, userName, password)
val sql="insert into user (name, age) values (?,?)"
val statement = connection.prepareStatement(sql)
statement.setString(1,name)
statement.setInt(2,age)
statement.executeUpdate()
statement.close()
connection.close()
}
}
//釋放資源 sc.stop()
}
}
啟動程序運行,控制臺打印信息:
6、HBase數據庫由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat類的實現,Spark 可以通過Hadoop輸入格式訪問HBase。這個輸入格式會返回鍵值對數據,其中鍵的類型為org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型為org.apache.hadoop.hbase.client.Result。
(1)添加依賴
org.apache.hbase
hbase-server
1.3.1
org.apache.hbase
hbase-client
1.3.1
(2) 分析源碼圖與步驟
拷貝在項目工程resource文件下
在編寫代碼之前,我們在Linux環境的Hbase創建一張rddtable表:
hbase(main):001:0> create 'rddtable','info'
0 row(s) in 1.7850 seconds
=> Hbase::Table - rddtable
隨后,在這張表中,插入數據:
hbase(main):001:0> put 'rddtable','1001','info:name',"zhanfsan"
0 row(s) in 0.5160 seconds
Spark與HBase連接,代碼實現:
package com.study.bigdatabase
import org.apache.hadoop.hbase
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//object Spark08_RDD_HBase {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
val hbaseRDD:RDD[(ImmutableBytesWritable,Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
hbaseRDD.foreach{
case (rowkey,result) => {
//取數據 val cells:Array[Cell] = result.rawCells()
for (cell
println(Bytes.toString((CellUtil.cloneValue(cell))))
}
}
}
//釋放資源 sc.stop()
}
}
啟動程序,運行查看一下:
7、在HBase添加數據(插入數據)
代碼實現:
package com.study.bigdatabase
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//object Spark08_RDD_HBase {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val conf = HBaseConfiguration.create()
val dataRDD:RDD[(String,String)] = sc.makeRDD(List(("1002", "zhangsan"), ("1003", "lisi"),
("1004", "wangwu")))
val putRDD:RDD[(ImmutableBytesWritable,Put)] = dataRDD.map{
case (rowkey,name) =>{
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name))
(new ImmutableBytesWritable(Bytes.toBytes(rowkey)),put)
}
}
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"rddtable")
putRDD.saveAsHadoopDataset(jobConf)
//釋放資源 sc.stop()
}
}
啟動程序,運行查看一下:無報錯
在HBase查看,掃描rddtable表,有數據了:
hbase(main):003:0> scan 'rddtable'
ROW COLUMN+CELL
1001 column=info:name, timestamp=1581087691942, value=zhanfsan
1002 column=info:name, timestamp=1581112909827, value=zhangsan
1003 column=info:name, timestamp=1581112909815, value=lisi
1004 column=info:name, timestamp=1581112909792, value=wangwu
4 row(s) in 4.5400 seconds
三、Spark 三大數據結構RDD:分布式數據集
廣播變量:分布式只讀共享變量
累加器: 分布式只寫共享變量
1、累加器累加器用來對信息進行聚合,通常在向 Spark傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。如果我們想實現所有分片處理時更新共享變量的功能,那么累加器可以實現我們想要的效果。
代碼具體實現:
package com.study.bigdatabase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
//object Spark09_ShareData {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val dataRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
/*第一種普通寫法:val i: Int = dataRDD.reduce(_ + _)println(i)*/
//第二種方法:使用累加器來共享變量 var sum:Int =0
//(1)創建累加器對象 val accumulator: LongAccumulator = sc.longAccumulator
dataRDD.foreach{
case i =>{
//(2)執行累加器的累加功能 accumulator.add(i)
}
}
println("sum ="+accumulator.value)
//釋放資源 sc.stop()
}
}
啟動程序運行,控制臺打印信息:
2、自定義累加器自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。實現自定義類型累加器需要繼承AccumulatorV2并至少覆寫下例中出現的方法,下面這個累加器可以用于在程序運行過程中收集一些文本類信息,最終以Set[String]的形式返回。
(1)分析源碼圖:
代碼實現:
package com.study.bigdatabase
import java.util
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}
//自定義累加器object Spark10_ShareData {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//創建Spark上下文對象 val sc = new SparkContext(config)
val dataRDD: RDD[String] = sc.makeRDD(List("hadoop", "hive", "hbase", "Scala","Spark"), 2)
// TODO 創建累加器 val wordAccumnlator = new WordAccumnlator
// TODO 注冊累加器 sc.register(wordAccumnlator)
dataRDD.foreach{
case word =>{
// TODO 執行累加器累加功能 wordAccumnlator.add(word)
}
}
// TODO 獲取累加器的值 println("sum ="+wordAccumnlator.value)
//釋放資源 sc.stop()
}
}
//聲明累加器//1、繼承AccumulatorV2//2、實現抽象方法//3、創建累加器class WordAccumnlator extends AccumulatorV2[String, util.ArrayList[String]]{
val list = new util.ArrayList[String]()
//當前累加器是否初始化狀態 override def isZero: Boolean = {
list.isEmpty
}
//復制累加器對象 override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
new WordAccumnlator()
}
//重置累加器對象 override def reset(): Unit = {
list.clear()
}
//向累加器中增加數據 override def add(v: String): Unit = {
if (v.contains("h")) {
list.add(v)
}
}
//合并累加器 override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
list.addAll(other.value)
}
//獲取累加器的結果 override def value: util.ArrayList[String] = list
}
啟動程序,控制臺打印信息:
以”h“累加聚中在一起
3、廣播變量(調優策略)廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手。 在多個并行操作中使用同一個變量,但是 Spark會為每個任務分別發送。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
使用廣播變量的過程如下:(1) 通過對一個類型 T 的對象調用 SparkContext.broadcast 創建出一個 Broadcast[T] 對象。 任何可序列化的類型都可以這么實現。
(2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。
(3) 變量只會被發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)。
4、擴展
RDD相關概念關系輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。當Spark讀取這些文件作為輸入時,會根據具體數據格式對應的InputFormat進行解析,一般是將若干個Block合并成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關系。隨后這些具體的Task每個都會被分配到集群上的某個節點的某個Executor去執行。
(1)每個節點可以起一個或多個Executor。
(2)每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。
(3)每個Task執行的結果就是生成了目標RDD的一個partiton。
注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程。而 Task被執行的并發度 = Executor數目 * 每個Executor核數。至于partition的數目:(1)對于數據讀入階段,例如sc.textFile,輸入文件被劃分為多少InputSplit就會需要多少初始Task。
(2)在Map階段partition數目保持不變。
(3)在Reduce階段,RDD的聚合會觸發shuffle操作,聚合后的RDD的partition數目跟具體操作有關,例如repartition操作會聚合成指定分區數,還有一些算子是可配置的。
RDD在計算的時候,每個分區都會起一個task,所以rdd的分區數目決定了總的的task數目。申請的計算節點(Executor)數目和每個計算節點核數,決定了你同一時刻可以并行執行的task。
比如的RDD有100個分區,那么計算的時候就會生成100個task,你的資源配置為10個計算節點,每個兩2個核,同一時刻可以并行的task數目為20,計算這個RDD就需要5個輪次。如果計算資源不變,你有101個task的話,就需要6個輪次,在最后一輪中,只有一個task在執行,其余核都在空轉。如果資源不變,你的RDD只有2個分區,那么同一時刻只有2個task運行,其余18個核空轉,造成資源浪費。這就是在spark調優中,增大RDD分區數目,增大任務并行度的做法。
總結
以上是生活随笔為你收集整理的spark 简单实战_SparkCore入门实战 (二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: excel: 开放的XML文件格式
- 下一篇: Linux如何删除名字带不可见字符的文件