spark之4:基础指南(源自官方文档)
spark之4:基礎指南(源自官方文檔)
@(SPARK)[spark, 大數據]
- spark之4基礎指南源自官方文檔
- 一簡介
- 二接入Spark
- 三初始化Spark
- 一使用Shell
- 四彈性分布式數據集RDDs
- 一并行集合
- 二外部數據集
- 三RDD操作
- 1基礎操作
- 2向Spark傳遞函數
- 3理解閉包
- 例子
- 本地模式VS集群模式
- 打印RDD中的元素
- 4鍵值對的使用
- 5轉換
- 6動作
- 7洗牌操作
- 背景
- 性能影響
- 四RDD持久化
- 1如何選擇存儲級別
- 2移除數據
- 五共享變量
- 二累加器
- 六把代碼部署到集群上
- 七從JavaScala中啟動Spark作業
- 八單元測試
參考:
英文:https://spark.apache.org/docs/latest/programming-guide.html
中文:http://www.cnblogs.com/lujinhong2/p/4651025.html 1.2.1版本的
以下大部分內容來自于中文版本,便其是針對spark1.2.1的,如有疑問,請直接參考英文版本。
一、簡介
總的來說,每一個Spark的應用,都是由一個驅動程序(driver program)構成,它運行用戶的main函數,在一個集群上執行各種各樣的并行操作。Spark提出的最主要抽象概念是彈性分布式數據集 (resilientdistributed dataset,RDD),它是一個元素集合,劃分到集群的各個節點上,可以被并行操作。RDDs的創建可以從HDFS(或者任意其他支持Hadoop文件系統)上的一個文件開始,或者通過轉換驅動程序(driver program)中已存在的Scala集合而來。用戶也可以讓Spark保留一個RDD在內存中,使其能在并行操作中被有效的重復使用。最后,RDD能自動從節點故障中恢復。
Spark的第二個抽象概念是共享變量(shared variables),可以在并行操作中使用。在默認情況下,Spark通過不同節點上的一系列任務來運行一個函數,它將每一個函數中用到的變量的拷貝傳遞到每一個任務中。有時候,一個變量需要在任務之間,或任務與驅動程序之間被共享。Spark支持兩種類型的共享變量:廣播變量,可以在內存的所有的結點上緩存變量;累加器:只能用于做加法的變量,例如計數或求和。
本指南將用每一種Spark支持的語言來展示這些特性。這都是很容易來跟著做的如果你啟動了Spark的交互式Shell或者Scala的bin/spark-shell或者Python的bin/pyspark。
二、接入Spark
Spark1.5需要和Scala2.10一起使用。如果你要用Scala來編寫應用,你需要用一個相應版本的Scala(例如2.10.X)。
要寫一個Spark應用程序,你需要在添加Spark的Maven依賴,Spark可以通過Maven中心庫來獲得:
groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.5.1除此之外,如果你想訪問一個HDFS集群,你需要根據你的HDFS版本,添加一個hadoop-client的依賴。一些通用的HDFS版本標簽在第三方發行版頁面列出。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>最后,你需要將一些Spark的類和隱式轉換導入到你的程序中。通過如下語句:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConfJava
Spark需要運行在Java6及更高版本上。如果你正在使用Java8,Spark支持使用Lambda表達式簡潔地編寫函數,或者你可以使用在org.apache.spark.api.java.function包中的類。
要使用Java編寫Spark應用程序,你需要添加一個Spark的依賴。Spark可以通過Maven中心庫獲得:
groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.5.1此外,如果你想訪問一個HDFS集群,你需要根據你的HDFS版本,添加一個hadoop-client的依賴。一些通用的HDFS版本標簽在第三方發行版頁面列出。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>最后,你需要將Spark的類導入到你的程序中。添加如下行:
importorg.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf三、初始化Spark
Spark程序需要做的第一件事情,就是創建一個SparkContext對象,它將告訴Spark如何訪問一個集群。要創建一個SparkContext你首先需要建立一個SparkConf對象,這個對象包含你的程序的信息。
每個JVM只能有一個活動的SparkContext。在創建一個新的SparkContext之前你必須stop()活動的SparkContext。
val conf = newSparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)appName是你的應用的名稱,將會在集群的Web監控UI中顯示。master參數,是一個用于指定所連接的Spark,Mesos or Mesos 集群URL的字符串,也可以是一個如下面所描述的用于在local模式運行的特殊字符串“local”。在實踐中,當運行在一個集群上時,你不會想把master硬編碼到程序中,而是啟動spark-submit來接收它。然而,對于本地測試和單元測試,你可以通過“local”模式運行Spark。
Spark程序需要做的第一件事情,就是創建一個JavaSparkContext對象,它將告訴Spark如何訪問一個集群。要創建一個SparkContext你首先需要建立一個SparkConf對象,這個對象包含你的程序的信息。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = newJavaSparkContext(conf);appName是你的應用的名稱,將會在集群的Web監控UI中顯示。master參數,是一個用于指定所連接的Spark,Mesos or Mesos 集群URL的字符串,也可以是一個如下面所描述的用于在local模式運行的特殊字符串“local”。在實踐中,當運行在一個集群上時,你不會想把master硬編碼到程序中,而是啟動spark-submit來接收它。然而,對于本地測試和單元測試,你可以通過“local”模式運行Spark。
(一)使用Shell
在Spark shell中,一個特殊的解釋器感知的SparkContext已經為你創建好了,變量名叫做sc。創建自己的SparkContext將不會生效。你可以使用-master參數設置context連接到那個master,并且你可以使用-jars參數把用逗號分隔的一個jar包列表添加到classpath中。例如,如果在四核CPU上運行spark-shell,使用:
$ ./bin/spark-shell --master local[4]或者,同時在classpath中加入code.jar,使用:
$ ./bin/spark-shell --master local[4] --jarscode.jar想要獲得完整的選項列表,運行spark-shell –help。在背后,spark-shell調用更一般的spark-submit腳本。
四、彈性分布式數據集(RDDs)
Spark圍繞的概念是彈性分布式數據集(RDD),是一個有容錯機制并可以被并行操作的元素集合。目前有兩種創建RDDs的方法:并行化一個在你的驅動程序中已經存在的集合,或者引用在外部存儲系統上的數據集,例如共享文件系統,HDFS,HBase,或者任何以Hadoop輸入格式提供的數據源。
(一)并行集合
并行集合是通過調用SparkContext的parallelize方法,在一個已經存在的集合上創建的(一個Scala Seq對象)。集合的對象將會被拷貝,創建出一個可以被并行操作的分布式數據集。例如,下面展示了怎樣創建一個含有數字1到5的并行集合:
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)一旦創建了分布式數據集(distData),就可以對其執行并行操作。例如,我們可以調用distData.reduce((a,b)=>a+b)來累加數組的元素。后續我們會進一步地描述對分布式數據集的操作。
并行集合的一個重要參數是分區數(the number of partitions),表示數據集切分的份數。Spark將在集群上為每個分區數據起一個任務。典型情況下,你希望集群的每個CPU分布2-4個分區(partitions)。通常,Spark會嘗試基于集群狀況自動設置分區數。然而,你也可以進行手動設置,通過將分區數作為第二個參數傳遞給parallelize方法來實現。(例如:sc.parallelize(data,10))。注意:代碼中的一些地方使用屬于“分片(分區的近義詞)”來保持向后兼容。
Java
并行集合是通過對存在于驅動程序中的集合調用JavaSparkContext的parallelize方法來構建的。構建時會拷貝集合中的元素,創建一個可以被并行操作的分布式數據集。例如,這里演示了如何創建一個包含數字1到5的并行集合:
List data = Arrays.asList(1, 2,3, 4, 5);
JavaRDD distData =sc.parallelize(data);
一旦創建了分布式數據集(distData),就可以對其執行并行操作。例如,我們可以調用distData.reduce((a,b)=>a+b)來累加數組的元素。后續我們會進一步地描述對分布式數據集的操作。
注意:在本指南中,我們會經常使用簡潔地Java8的lambda語法來指明Java函數,而在Java的舊版本中,你可以實現org.apache.spark.api.java.function包中的接口。下面我們將在把函數傳遞到Spark中描述更多的細節。
并行集合的一個重要參數是分區數(the number of partitions),表示數據集切分的份數。Spark將在集群上為每個分區數據起一個任務。典型情況下,你希望集群的每個CPU分布2-4個分區(partitions)。通常,Spark會嘗試基于集群狀況自動設置分區數。然而,你也可以進行手動設置,通過將分區數作為第二個參數傳遞給parallelize方法來實現。(例如:sc.parallelize(data,10))。注意:代碼中的一些地方使用屬于“分片(分區的近義詞)”來保持向后兼容。
(二)外部數據集
Spark可以從Hadoop支持的任何存儲源中構建出分布式數據集,包括你的本地文件系統,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其他任何一種Hadoop InputFormat。
Text file RDDs的創建可以使用SparkContext的textFile方法。該方法接受一個文件的URI地址(或者是機器上的一個本地路徑,或者是一個hdfs://,s3n://等URI)作為參數,并讀取文件的每一行數據,放入集合中,下面是一個調用例子:
scala> val distFile =sc.textFile("data.txt") distFile: RDD[String] = MappedRDD@1d4cee08一旦創建完成,就可以在distFile上執行數據集操作。例如,要相對所有行的長度進行求和,我們可以通過如下的map和reduce操作來完成:
distFile.map(s => s.length).reduce((a, b)=> a + b)Spark讀文件時的一些注意事項:
如果文件使用本地文件系統上的路徑,那么該文件必須在工作節點的相同路徑下也可以訪問。可以將文件拷貝到所有的worker節點上,或者使用network-mounted共享文件系統。
Spark的所有基于文件的輸入方法,包括textFile,支持在目錄上運行,壓縮文件和通配符。例如,你可以使用textFile(”/my/directory”),textFile(“/my/directory/.txt”),和textFile(“/my/directory/.gz”)。
textFile方法也帶有可選的第二個參數,用于控制文件的分區數。默認情況下,Spark會為文件的每一個block創建一個分區,但是你也可以通過傳入更大的值,來設置更高的分區數。注意,你設置的分區數不能比文件的塊數小。
除了text文件,Spark的Scala API也支持其他幾種數據格式:
SparkContext.wholeTextFiles可以讓你讀取包含多個小text文件的目錄,并且每個文件對應返回一個(filename,content)對。而對應的textFile方法,文件的每一行對應返回一條記錄(record)。
對于Sequence文件,使用SparkContext的sequenceFile[K,V]方法,其中K和V分別對應文件中key和values的類型。這些類型必須是Hadoop的Writable接口的子類,如IntWritable和Text。另外,Spark允許你使用一些常見的Writables的原生類型;例如,sequenceFile[Int,String]會自動的轉換為類型IntWritables和Texts。
對于其他的Hadoop InputFormats,你可以使用SparkContext.hadoopRDD方法,它可以接受一個任意類型的JobConf和輸入格式類,key類和value類。像Hadoop Job設置輸入源那樣去設置這些參數即可。對基于“新”的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也可以使用SparkContex.newHadoopRDD。
RDD.saveAsObjectFile和SparkContext.objectFile支持由序列化的Java對象組成的簡單格式來保存RDD。雖然這不是一種像Avro那樣有效的序列化格式,但是她提供了一種可以存儲任何RDD的簡單方式。
Java
Spark可以從Hadoop支持的任何存儲源中構建出分布式數據集,包括你的本地文件系統,HDFS,Cassandre,HBase,Amazon S3等。Spark支持text files,Sequence files,以及其他任何一種Hadoop InputFormat。
Text file RDDs的創建可以使用SparkContext的textFile方法。該方法接受一個文件的URI地址(或者是機器上的一個本地路徑,或者是一個hdfs://,s3n://等URI)作為參數,并讀取文件的每一行數據,放入集合中,下面是一個調用例子:
JavaRDD<String> distFile =sc.textFile("data.txt");一旦創建完成,就可以在distFile上執行數據集操作。例如,要相對所有行的長度進行求和,我們可以通過如下的map和reduce操作來完成:
distFile.map(s -> s.length()).reduce((a, b)-> a + b)(三)RDD操作
RDDs支持兩種操作:轉換(transformations),可以從已有的數據集創建一個新的數據集;而動作(actions),在數據集上運行計算后,會向驅動程序返回一個值。例如,map就是一種轉換,它將數據集每一個元素都傳遞給函數,并返回一個新的分布數據集來表示結果。另一方面,reduce是一種動作,通過一些函數將所有的元素聚合起來,并將最終結果返回給驅動程序(不過還有一個并行的reduceByKey,能返回一個分布式數據集)。
Spark中的所有轉換都是惰性的,也就是說,它們并不會馬上計算結果。相反的,它們只是記住應用到基礎數據集(例如一個文件)上的這些轉換動作。只有當發生一個要求返回結果給驅動程序的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。例如,我們對map操作創建的數據集進行reduce操作時,只會向驅動返回reduce操作的結果,而不是返回更大的map操作創建的數據集。
默認情況下,每一個轉換過的RDD都會在你對它執行一個動作時被重新計算。不過,你也可以使用持久化或者緩存方法,把一個RDD持久化到內存中。在這種情況下,Spark會在集群中保存相關元素,以便你下次查詢這個RDD時,能更快速地訪問。對于把RDDs持久化到磁盤上,或在集群中復制到多個節點也是支持的。
1、基礎操作
為了描述RDD的基礎操作,可以考慮下面的簡單程序:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b)=> a + b)第一行通過一個外部文件定義了一個基本的RDD。這個數據集未被加載到內存,也未在上面執行操作:lines僅僅指向這個文件。第二行定義了lineLengths作為map轉換結果。此外,由于惰性,不會立即計算lineLengths。最后,我們運行reduce,這是一個動作。這時候,Spark才會將這個計算拆分成不同的task,并運行在獨立的機器上,并且每臺機器運行它自己的map部分和本地的reducatin,僅僅返回它的結果給驅動程序。
如果我們希望以后可以復用lineLengths,可以添加:
lineLengths.persist()在reduce之前,這將導致lineLengths在第一次被計算之后,被保存在內存中。
Java
為了描述RDD的基礎操作,可以考慮下面的簡單程序:
JavaRDD<String> lines =sc.textFile("data.txt"); JavaRDD<Integer> lineLengths =lines.map(s -> s.length()); int totalLength = lineLengths.reduce((a, b)-> a + b);第一行通過一個外部文件定義了一個基本的RDD。這個數據集未被加載到內存,也未在上面執行操作:lines僅僅指向這個文件。第二行定義了lineLengths作為map轉換結果。此外,由于惰性,不會立即計算lineLengths。最后,我們運行reduce,這是一個動作。這時候,Spark才會將這個計算拆分成不同的task,并運行在獨立的機器上,并且每臺機器運行它自己的map部分和本地的reducatin,僅僅返回它的結果給驅動程序。
如果我們希望以后可以復用lineLengths,可以添加:
lineLengths.persist();在reduce之前,這將導致lineLengths在第一次被計算之后,被保存在內存中。
2、向Spark傳遞函數
Scala
Spark的API,在很大程度上依賴于把驅動程序中的函數傳遞到集群上運行。這有兩種推薦的實現方式:
●使用匿名函數的語法,這可以用來替換簡短的代碼。
●使用全局單例對象的靜態方法。比如,你可以定義函數對象objectMyFunctions,然后傳遞該對象的方法MyFunction.func1,如下所示:
object MyFunctions { def func1(s: String): String = { ... } } myRdd.map(MyFunctions.func1)注意:由于可能傳遞的是一個類實例方法的引用(而不是一個單例對象),在傳遞方法的時候,應該同時傳遞包含該方法的對象。比如,考慮:
class MyClass { def func1(s: String): String = { ... } defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } }這里,如果我們創建了一個類實例new MyClass,并且調用了實例的doStuff方法,該方法中的map處調用了這個MyClass實例的func1方法,所以需要將整個對象傳遞到集群中。類似于寫成:
rdd.map(x=>this.func1(x))。類似地,訪問外部對象的字段時將引用整個對象:
class MyClass { valfield = "Hello" defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }等同于寫成rdd.map(x=>this.field+x),引用了整個this。為了避免這種問題,最簡單的方式是把field拷貝到本地變量,而不是去外部訪問它:
def doStuff(rdd: RDD[String]): RDD[String] = { valfield_ = this.field rdd.map(x => field_ + x) }3、理解閉包
關于Spark的一個更困難的問題是理解當在一個集群上執行代碼的時候,變量和方法的范圍以及生命周期。修改范圍之外變量的RDD操作經常是造成混亂的源頭。在下面的例子中我們看一下使用foreach()來增加一個計數器的代碼,不過同樣的問題也可能有其他的操作引起。
例子
考慮下面的單純的RDD元素求和,根據是否運行在一個虛擬機上,它們的行為完全不同。一個平常的例子是在local模式(–master=local[n])下運行Spark對比將Spark程序部署到一個集群上(例如通過spark-submit提交到YARN)。
var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter)本地模式VS集群模式
主要的挑戰是,上述代碼的行為是未定義的。在使用單個JVM的本地模式中,上面的代碼會在RDD中計算值的總和并把它存儲到計數器中。這是因為RDD和計數器變量在驅動節點的同一個內存空間中。
然而,在集群模式下,發生的事情更為復雜,上面的代碼可能不會按照目的工作。要執行作業,Spark將RDD操作分成任務——每個任務由一個執行器操作。在執行前,Spark計算閉包。閉包是指執行器要在RDD上進行計算時必須對執行節點可見的那些變量和方法(在這里是foreach())。這個閉包被序列化并發送到每一個執行器。在local模式下,只有一個執行器因此所有東西都分享同一個閉包。然而在其他的模式中,就不是這個情況了,運行在不同工作節點上的執行器有它們自己的閉包的一份拷貝。
這里發生的事情是閉包中的變量被發送到每個執行器都是被拷貝的,因此,當計數器在foreach函數中引用時,它不再是驅動節點上的那個計數器了。在驅動節點的內存中仍然有一個計數器,但它對執行器來說不再是可見的了!執行器只能看到序列化閉包中的拷貝。因此,計數器最終的值仍然是0,因為所有在計數器上的操作都是引用的序列化閉包中的值。
在這種情況下要確保一個良好定義的行為,應該使用累加器。Spark中的累加器是一個專門用來在執行被分散到一個集群中的各個工作節點上的情況下安全更新變量的機制。本指南中的累加器部分會做詳細討論。
一般來說,閉包-構造像循環或者本地定義的方法,不應該用來改變一些全局狀態。Spark沒有定義或者是保證改變在閉包之外引用的對象的行為。一些這樣做的代碼可能會在local模式下起作用,但那僅僅是個偶然,這樣的代碼在分布式模式下是不會按照期望工作的。如果需要一些全局的參數,可以使用累加器。
打印RDD中的元素
另一個常見的用法是使用rdd.foreach(println)方法或者rdd.map(println)方法試圖打印出RDD中的元素。在一臺單一的機器上,這樣會產生期望的輸出并打印出RDD中的元素。然而,在集群模式中,被執行器調用輸出到stdout的輸出現在被寫到了執行器的stdout,并不是在驅動上的這一個,因此驅動上的stdout不會顯示這些信息!要在驅動上打印所有的元素,可以使用collect()方法首先把RDD取回到驅動節點如:rdd.collect().foreach(println)。然而,這可能導致驅動內存溢出,因為collect()將整個RDD拿到了單臺機器上;如果你只需要打印很少幾個RDD的元素,一個更安全的方法是使用take()方法:rdd.take(100).foreach(println)。
4、鍵值對的使用
雖然,在包含任意類型的對象的RDDs中,可以使用大部分的Spark操作,但也有一些特殊的操作只能在鍵值對的RDDs上使用。最常見的一個就是分布式的洗牌(shuffle)操作,諸如基于key值對元素進行分組或聚合的操作。
在Scala中,包含二元組(Tuple2)對象(可以通過簡單地(a,b)代碼,來構建內置于語言中的元組的RDDs支持這些操作),只要你在程序中導入了org.apache.spark.SparkContext._,就能進行隱式轉換。PairRDDFunction類支持鍵值對的操作,如果你導入了隱式轉換,該類型就能自動地對元組RDD的元素進行轉換。
比如,下列代碼在鍵值對上使用了reduceByKey操作,來計算在一個文件中每行文本出現的總次數:
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a +b)我們也可以使用counts.sortByKey(),比如,將鍵值對以字典序進行排序。最后使用counts.collect()轉換成對象的數組形式,返回給驅動程序。
注意:在鍵值對操作中,如果使用了自定義對象作為建,你必須確保該對象實現了自定義的equals()和對應的hashCode()方法。更多詳情請查看Object.hashCode()文檔大綱中列出的規定。
5、轉換
下表中列出了 Spark支持的一些常見的轉換 (Transformations)。詳情請參考 RDDAPI文檔 (Scala, Java, Python)和 pair RDD函數文檔 (Scala, Java)。
見 https://spark.apache.org/docs/latest/programming-guide.html#transformations
以后再補充中文,詳細請見常用api demo
6、動作
下表中列出了 Spark支持的一些常見的動作 (actions)。詳情請參考 RDD API文檔
見 https://spark.apache.org/docs/latest/programming-guide.html#transformations
以后再補充中文,詳細請見常用api demo
7、洗牌操作
Spark觸發一個事件后進行的一些操作成為洗牌。洗牌是Spark重新分配數據的機制,這樣它就可以跨分區分組。這通常涉及在執行器和機器之間復制數據,這就使得洗牌是一個復雜和高代價的操作。
背景
為了理解在洗牌的時候發生了什么,我們可以考慮reduceByKey操作的例子。reduceByKey操作產生了一個新的RDD,在這個RDD中,所有的單個的值被組合成了一個元組,key和執行一個reduce函數后的結果中與這個key有關的所有值。面臨的挑戰是一個key的所有的值并不都是在同一個分區上的,甚至不是一臺機器上的,但是他們必須是可連接的以計算結果。
在Spark中,數據一般是不會跨分區分布的,除非是在一個特殊的地方為了某種特定的目的。在計算過程中,單個任務將在單個分區上操作——因此,為了組織所有數據執行單個reduceByKey中的reduce任務,Spark需要執行一個all-to-all操作。它必須讀取所有分區,找到所有key的值,并跨分區把這些值放到一起來計算每個key的最終結果——這就叫做洗牌。
盡管在每個分區中新洗牌的元素集合是確定性的,分區本身的順序也同樣如此,這些元素的順序就不一定是了。如果期望在洗牌后獲得可預測的有序的數據,可以使用:
mapPartitions 來排序每個分區,例如使用.sorted
repartitionAndSortWithinPartitions 在重新分區的同時有效地將分區排序
sortBy來創建一個全局排序的RDD
可以引起洗牌的操作有重分區例如repartition和coalesce,‘ByKey操作(除了計數)像groupByKey和reduceByKey,還有join操作例如cogroup和join。
性能影響
Shuffle是一個代價高昂的操作,因為它調用磁盤I/O,數據序列化和網絡I/O。要組織shuffle的數據,Spark生成一個任務集合——map任務來組織數據,并使用一組reduce任務集合來聚合它。它的命名來自與MapReduce,但并不直接和Spark的map和reduce操作相關。
在內部,單個的map任務的結果被保存在內存中,直到他們在內存中存不下為止。然后,他們基于目標分區進行排序,并寫入到一個單個的文件中。在reduce這邊,任務讀取相關的已經排序的塊。
某些shuffle操作會消耗大量的堆內存,因為他們用在內存中的數據結構在轉換操作之前和之后都要對數據進行組織。特別的,reduceByKey和aggregateByKey在map側創建這些結構,‘ByKey操作在reduce側生成這些結構。當數據在內存中存不下時,Spark會將他們存儲到磁盤,造成額外的磁盤開銷和增加垃圾收集。
Shuffle也會在磁盤上產生大量的中間文件。在Spark1.3中,這些文件直到Spark停止運行時才會從Spark的臨時存儲中清理掉,這意味著長時間運行Spark作業會消耗可觀的磁盤空間。這些做了之后如果lineage重新計算了,那shuffle不需要重新計算了。在配置Spark上下文時,臨時存儲目錄由spark.local.dir配置參數指定。
Shuffle的行為可以通過調整各種配置參數來調整。請看Spark配置指南中的Shuffle Behavior部分。
(四)RDD持久化
Spark最重要的一個功能,就是在不同操作間,將一個數據集持久化(persisting) (或緩存(caching))到內存中。當你持久化(persist)一個 RDD,每一個節點都會把它計算的所有分區(partitions)存儲在內存中,并在對數據集 (或者衍生出的數據集)執行其他動作(actioins)時重用。這將使得后續動作(actions)的執行變得更加迅速(通常快 10 倍)。緩存(Caching)是用 Spark 構建迭代算法和快速地交互使用的關鍵。
你可以使用 persist()或 cache()方法來持久化一個 RDD。在首次被一個動作(action)觸發計算后,它將會被保存到節點的內存中。 Spark 的緩存是帶有容錯機制的,如果 RDD丟失任何一個分區的話,會自動地用原先構建它的轉換(transformations)操作來重新進行計算。
此外,每一個被持久化的 RDD都可以用不同的存儲級別(storage level)進行存儲,比如,允許你持久化數據集到硬盤,以序列化的 Java對象(節省空間)存儲到內存,跨節點復制,或者以off-heap的方式存儲在 Tachyon。這些級別的選擇,是通過將一個 StorageLevel對象 (Scala Java, Python)傳遞到 persist()方法中進行設置的。 cache()方法是使用默認存儲級別的快捷方法,也就是 StorageLevel.MEMORY_ONLY (將反序列化 (deserialized)的對象存入內存)。完整的可選存儲級別如下:
見 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
以后再補充中文
1、如何選擇存儲級別?
Spark 的存儲級別旨在滿足內存使用和CPU效率權衡上的不同需求。我們建議通過以下方法進行選擇:
●如果你的 RDDs可以很好的與默認的存儲級別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經是 CPU使用效率最高的選項,它使得RDDs的操作盡可能的快。
●如果不行,試著使用 MEMORY_ONLY_SER,并且選擇一個快速序列化庫使對象在有比較高的空間使用率(space-efficient)的情況下,依然可以較快被訪問。
●盡可能不要存儲到硬盤上,除非計算數據集的函數的計算量特別大,或者它們過濾了大量的數據。否則,重新計算一個分區的速度,可能和從硬盤中讀取差不多快。
●如果你想有快速的故障恢復能力,使用復制存儲級別(例如:用 Spark來響應 web應用的請求)。所有的存儲級別都有通過重新計算丟失的數據來恢復錯誤的容錯機制,但是復制的存儲級別可以讓你在 RDD 上持續地運行任務,而不需要等待丟失的分區被重新計算。
●在大量的內存或多個應用程序的環境下,試驗性的 OFF_HEAP模式具有以下幾個優點:
o 允許多個 executors共享 Tachyon中相同的內存池。
o 極大地降低了垃圾收集器(garbage collection)的開銷。
o 即使個別的 executors崩潰了,緩存的數據也不會丟失。
2、移除數據
Spark 會自動監控各個節點上的緩存使用情況,并使用最近最少使用算法(least-recently-used (LRU))刪除老的數據分區。如果你想手動移除一個 RDD,而不是等它自動從緩存中清除,可以使用 RDD.unpersist()方法。
五、共享變量
一般來說,當一個函數被傳遞給一個在遠程集群節點上運行的 Spark操作(例如 map或 reduce) 時,它操作的是這個函數用到的所有變量的獨立拷貝。這些變量會被拷貝到每一臺機器,而且在遠程機器上對變量的所有更新都不會被傳播回驅動程序。通常看來,讀-寫任務間的共享變量顯然不夠高效。然而,Spark還是為兩種常見的使用模式,提供了兩種有限的共享變量:廣播變量(broadcast variables)和累加器(accumulators)。
(一)廣播變量
廣播變量允許程序員保留一個只讀的變量,緩存在每一臺機器上,而不是每個任務保存一份拷貝。它們可以這樣被使用,例如,以一種高效的方式給每個節點一個大的輸入數據集。Spark會嘗試使用一種高效的廣播算法來傳播廣播變量,從而減少通信的代價。
Spark動作的執行是通過一個階段的集合,通過分布式的Shuffle操作分離。Spark自動廣播在每個階段里任務需要的共同數據。以這種方式廣播的數據以序列化的形式緩存并在運行每個任務之前進行反序列化。這意味著顯式地創建廣播變量只在當多個階段之間需要相同的數據或者是當用反序列化的形式緩存數據特別重要的時候。
廣播變量是通過調用 SparkContext.broadcast(v)方法從變量 v創建的。廣播變量是一個 v的封裝器,它的值可以通過調用 value方法獲得。如下代碼展示了這個:
Scala
scala>val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala>broadcastVar.value res0:Array[Int] = Array(1, 2, 3)在廣播變量被創建后,它應該在集群運行的任何函數中,代替 v值被調用,從而 v值不需要被再次傳遞到這些節點上。另外,對象 v不能在廣播后修改,這樣可以保證所有節點具有相同的廣播變量的值(比如,后續如果變量被傳遞到一個新的節點)。
(二)累加器
累加器是一種只能通過具有結合性的操作(associative operation)進行“加(added)”的變量,因此可以高效地支持并行。它們可以用來實現計數器(如 MapReduce 中)和求和器。 Spark原生就支持數值類型的累加器,開發者可以自己添加新的支持類型。如果創建了一個命名的累加器(accumulators),這些累加器將會顯示在 Spark UI 界面上。這對于了解當前運行階段(stages)的進展情況是非常有用的(注意:這在 Python中尚未支持)。
一個累加器可以通過調用 SparkContext.accumulator(v)方法從一個初始值 v中創建。運行在集群上的任務,可以通過使用 add方法或 +=操作符(在 Scala和 Python)來給它加值。然而,它們不能讀取這個值。只有驅動程序可以使用 value方法來讀取累加器的值。
以下代碼展示了如何利用一個累加器,將一個數組里面的所有元素相加:
scala> val accum = sc.accumulator(0,"My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s scala> accum.value res2: Int = 10雖然代碼可以使用內置支持的 Int類型的累加器,但程序員也可以通過子類化(subclassing) AccumulatorParam來創建自己的類型。AccumulatorParam接口有兩個方法: zero,為你的數據類型提供了一個“零值(zero value)”,以及 addInPlace提供了兩個值相加的方法。比如,假設我們有一個表示數學上向量的 Vector類,我們可以這么寫:
object VectorAccumulatorParam extendsAccumulatorParam[Vector] { defzero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } defaddInPlace(v1: Vector, v2: Vector): Vector = { v1 +=v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(newVector(...))(VectorAccumulatorParam)在 Scala中, Spark也支持更通用的 Accumulable接口去累加數據,其結果類型和累加的元素不同(比如,構建一個包含所有元素的列表),并且SparkContext.accumulableCollection方法可以累加普通的 Scala集合(collection)類型。
因為累加器的更新只在action中執行,Spark確保每個任務對累加器的更新都只會被應用一次,例如,重啟任務將不會更新這個值。在轉換中,用戶應該清楚如果任務或者作業階段是重復運行的,每個任務的更新可能會應用不止一次。
累加器不會改變Spark的懶惰評價模型。如果它們在一個RDD的操作中正在被更新,他們的值只會被更新一次,RDD作為動作的一部分被計算。因此,累加器更新當在執行一個懶惰轉換,例如map()時,并不保證被執行。下面的代碼段演示了這個屬性:
val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } // Here, accum is still 0 because no actionshave caused the `map` to be computed.六、把代碼部署到集群上
應用程序提交指南(application submission guide)描述了如何將應用程序提交到一個集群,簡單地說,一旦你將你的應用程序打包成一個JAR(對于 Java/Scala)或者一組的 .py或 .zip文件 (對于 Python), bin/spark-submit 腳本可以讓你將它提交到支持的任何集群管理器中。
七、從Java/Scala中啟動Spark作業
Org.apache.spark.launcher包中提供了相關類來啟動Spark作業作為子線程的簡單的Java API。
八、單元測試
Spark 對單元測試非常友好,可以使用任何流行的單元測試框架。在你的測試中簡單地創建一個 SparkContext,并將 master URL設置成local,運行你的各種操作,然后調用 SparkContext.stop()結束測試。確保在 finally塊或測試框架的 tearDown方法中調用 context的 stop方法,因為 Spark不支持在一個程序中同時運行兩個contexts。
九、下一步
你可以在 Spark的網站上看到 spark程序的樣例。另外,Spark在 examples目錄 (Scala, Java, Python,R)中也包含了一些樣例。你可以通過將類名傳遞給 spark的 bin/run-example腳本來運行 Java和 Scala的樣例,例如:
/bin/run-example SparkPi為了幫助優化你的程序,在配置(configuration)和調優(tuning)的指南上提供了最佳實踐信息。它們在確保將你的數據用一個有效的格式存儲在內存上,是非常重要的。對于部署的幫助信息,可以查看集群模式概述(cluster mode overview),描述了分布式操作以及支持集群管理器所涉及的組件。
最后,完整的 API文檔可以查看 Scala, Java,Python和R。
總結
以上是生活随笔為你收集整理的spark之4:基础指南(源自官方文档)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: git基础指南
- 下一篇: 构建scala+IDEA+sbt开发环境