Spark编程指南笔记
Spark編程指南筆記
標(biāo)簽:?spark?編程?筆記?| 發(fā)表時(shí)間:2015-02-02 16:00 | 作者: 分享到: 出處:http://blog.javachen.com/rss.xml本文是參考Spark官方編程指南(Spark 版本為1.2)整理出來的學(xué)習(xí)筆記,主要是用于加深對(duì) Spark 的理解,并記錄一些知識(shí)點(diǎn)。
1. 一些概念
每一個(gè) Spark 的應(yīng)用,都是由一個(gè)驅(qū)動(dòng)程序構(gòu)成,它運(yùn)行用戶的 main 函數(shù),在一個(gè)集群上執(zhí)行各種各樣的并行操作。
Spark 提出的最主要抽象概念是?彈性分布式數(shù)據(jù)集,它是一個(gè)有容錯(cuò)機(jī)制(劃分到集群的各個(gè)節(jié)點(diǎn)上)并可以被并行操作的元素集合。目前有兩種類型的RDD:
- 并行集合:接收一個(gè)已經(jīng)存在的 Scala 集合,然后進(jìn)行各種并行計(jì)算。
- 外部數(shù)據(jù)集:外部存儲(chǔ)系統(tǒng),例如一個(gè)共享的文件系統(tǒng),HDFS、HBase以及任何支持 Hadoop InputFormat 的數(shù)據(jù)源。
這兩種類型的 RDD 都可以通過相同的方式進(jìn)行操作。用戶可以讓 Spark 保留一個(gè) RDD 在內(nèi)存中,使其能在并行操作中被有效的重復(fù)使用,并且,RDD 能自動(dòng)從節(jié)點(diǎn)故障中恢復(fù)。
Spark 的第二個(gè)抽象概念是?共享變量,可以在并行操作中使用。在默認(rèn)情況下,Spark 通過不同節(jié)點(diǎn)上的一系列任務(wù)來運(yùn)行一個(gè)函數(shù),它將每一個(gè)函數(shù)中用到的變量的拷貝傳遞到每一個(gè)任務(wù)中。有時(shí)候,一個(gè)變量需要在任務(wù)之間,或任務(wù)與驅(qū)動(dòng)程序之間被共享。
Spark 支持兩種類型的共享變量:?廣播變量,可以在內(nèi)存的所有的結(jié)點(diǎn)上緩存變量;?累加器:只能用于做加法的變量,例如計(jì)數(shù)或求和。
2. 編寫程序
初始化 Spark
Spark 程序需要做的第一件事情,就是創(chuàng)建一個(gè) SparkContext 對(duì)象,它將告訴 Spark 如何訪問一個(gè)集群。這個(gè)通常是通過下面的構(gòu)造器來實(shí)現(xiàn)的:
new SparkContext(master, appName, [sparkHome], [jars])參數(shù)說明:
- master:用于指定所連接的 Spark 或者 Mesos 集群的 URL。
- appName?:應(yīng)用的名稱,將會(huì)在集群的 Web 監(jiān)控 UI 中顯示。
- sparkHome:可選,你的集群機(jī)器上 Spark 的安裝路徑(所有機(jī)器上路徑必須一致)。
- jars:可選,在本地機(jī)器上的 JAR 文件列表,其中包括你應(yīng)用的代碼以及任何的依賴,Spark 將會(huì)把他們部署到所有的集群結(jié)點(diǎn)上。
在 python 中初始化,示例代碼如下:
//conf = SparkContext("local", "Hello Spark") conf = SparkConf().setAppName("Hello Spark").setMaster("local") sc = SparkContext(conf=conf)說明:如果部署到集群,在分布式模式下運(yùn)行,最后兩個(gè)參數(shù)是必須的。
第一個(gè)參數(shù)可以是以下任一種形式:
| local | 默認(rèn)值,使用一個(gè) Worker 線程本地化運(yùn)行(完全不并行) |
| local[K] | 使用 K 個(gè) Worker 線程本地化運(yùn)行(理想情況下,K 應(yīng)該根據(jù)運(yùn)行機(jī)器的 CPU 核數(shù)設(shè)定) |
| spark://HOST:PORT | 連接到指定的 Spark 單機(jī)版集群 master 進(jìn)程所在的主機(jī)和端口,端口默認(rèn)是7077 |
| mesos://HOST:PORT | 連接到指定的 Mesos 集群。host 參數(shù)是Moses master的hostname。端口默認(rèn)是5050 |
如果你在一個(gè)集群上運(yùn)行 spark-shell,則 master 參數(shù)默認(rèn)為?local,在啟動(dòng)之前你可以通過修改配置文件指定?ADD_JAR?環(huán)境變量將 JAR 文件們加載在集群上,這個(gè)變量需要包括一個(gè)用逗號(hào)分隔的 JAR 文件列表。
運(yùn)行代碼
運(yùn)行代碼有幾種方式,一是通過 spark-shell 來運(yùn)行 scala 代碼,一是編寫 java 代碼并打成包以 spark on yarn 方式運(yùn)行,還有一種是通過 pyspark 來運(yùn)行 python 代碼。
更多內(nèi)容,參考?Spark安裝和使用。
3. 彈性分布式數(shù)據(jù)集
3.1 并行集合
并行集合是通過調(diào)用 SparkContext 的?parallelize?方法,在一個(gè)已經(jīng)存在的 Scala 集合上創(chuàng)建一個(gè) Seq 對(duì)象。
parallelize 方法還可以接受一個(gè)參數(shù)?slices,表示數(shù)據(jù)集切分的份數(shù)。Spark 將會(huì)在集群上為每一份數(shù)據(jù)起一個(gè)任務(wù)。典型地,你可以在集群的每個(gè) CPU 上分布 2-4個(gè) slices。一般來說,Spark 會(huì)嘗試根據(jù)集群的狀況,來自動(dòng)設(shè)定 slices 的數(shù)目,當(dāng)然,你也可以手動(dòng)設(shè)置。
Scala 示例程序:
scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)scala> var distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14scala> distData.reduce((a, b) => a + b) res4: Int = 15Java 示例程序:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data); Integer sum=distData.reduce((a, b) -> a + b);Python 示例程序:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b)3.2 外部數(shù)據(jù)源
Spark可以從存儲(chǔ)在 HDFS,或者 Hadoop 支持的其它文件系統(tǒng)(包括本地文件,Amazon S3, Hypertable, HBase 等等)上的文件創(chuàng)建分布式數(shù)據(jù)集。Spark 支持?TextFile、?SequenceFiles?以及其他任何?Hadoop InputFormat?格式的輸入。
TextFile 的 RDD 可以通過下面方式創(chuàng)建,該方法接受一個(gè)文件的 URI 地址,該地址可以是本地路徑,或者?hdfs://、?s3n://?等 URL 地址。
// scala 語(yǔ)法 val distFile = sc.textFile("data.txt")// java 語(yǔ)法 JavaRDD<String> distFile = sc.textFile("data.txt");// python 語(yǔ)法 distFile = sc.textFile("data.txt")一些說明:
- 引用的路徑必須是絕對(duì)路徑,并且必須在每一個(gè) worker 節(jié)點(diǎn)上保持一致。
- 輸入的地址可以是一個(gè)目錄,也可以是正則匹配表達(dá)式,也可以是壓縮的文件。
- textFile 方法也可以通過輸入一個(gè)可選的第二參數(shù),來控制文件的分片數(shù)目。默認(rèn)情況下,Spark 為每一塊文件創(chuàng)建一個(gè)分片(HDFS 默認(rèn)的塊大小為64MB),但是你也可以通過傳入一個(gè)更大的值,來指定一個(gè)更高的片值,但不能指定一個(gè)比塊數(shù)更小的片值。
除了 TextFile,Spark 還支持其他格式的輸入:
- SparkContext.wholeTextFiles?方法可以讀取一個(gè)包含多個(gè)小文件的目錄,并以 filename,content 鍵值對(duì)的方式返回結(jié)果。
- 對(duì)于 SequenceFiles,可以使用 SparkContext 的?sequenceFile[K, V]` 方法創(chuàng)建。像 IntWritable 和 Text 一樣,它們必須是 Hadoop 的 Writable 接口的子類。另外,對(duì)于幾種通用 Writable 類型,Spark 允許你指定原生類型來替代。例如:sequencFile[Int, String] 將會(huì)自動(dòng)讀取 IntWritable 和 Texts。
- 對(duì)于其他類型的 Hadoop 輸入格式,你可以使用?SparkContext.hadoopRDD?方法,它可以接收任意類型的 JobConf 和輸入格式類,鍵類型和值類型。按照像 Hadoop 作業(yè)一樣的方法設(shè)置輸入源就可以了。
- RDD.saveAsObjectFile?和?SparkContext.objectFile?提供了以 Java 序列化的簡(jiǎn)單方式來保存 RDD。雖然這種方式?jīng)]有 Avro 高效,但也是一種簡(jiǎn)單的方式來保存任意的 RDD。
3.3 RDD 操作
RDD支持兩種操作:
- 轉(zhuǎn)換:從現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集;
- 動(dòng)作:在數(shù)據(jù)集上運(yùn)行計(jì)算后,返回一個(gè)值給驅(qū)動(dòng)程序。
例如,map 是一種轉(zhuǎn)換,它將數(shù)據(jù)集每一個(gè)元素都傳遞給函數(shù),并返回一個(gè)新的分布數(shù)據(jù)集表示結(jié)果,而 reduce 是一種動(dòng)作,通過一些函數(shù)將所有的元素疊加起來,并將最終結(jié)果返回給運(yùn)行程序。
Spark 中的?所有轉(zhuǎn)換都是惰性的,也就是說,他們并不會(huì)直接計(jì)算結(jié)果。相反的,它們只是記住應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的這些轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給運(yùn)行程序的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。
默認(rèn)情況下,每一個(gè)轉(zhuǎn)換過的 RDD 都會(huì)在你運(yùn)行一個(gè)動(dòng)作時(shí)被重新計(jì)算。不過,你也可以使用?persist?或者?cache?方法,持久化一個(gè) RDD 在內(nèi)存中。在這種情況下,Spark 將會(huì)在集群中,保存相關(guān)元素,下次你查詢這個(gè) RDD 時(shí),它將能更快速訪問。除了持久化到內(nèi)存,Spark 也支持在磁盤上持久化數(shù)據(jù)集,或在節(jié)點(diǎn)之間復(fù)制數(shù)據(jù)集。
Scala 示例:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)Java 示例:
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(s -> s.length()); int totalLength = lineLengths.reduce((a, b) -> a + b);Python 示例:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)代碼說明:
- 第一行定義了一個(gè)基礎(chǔ) RDD,但并沒有開始載入內(nèi)存,僅僅將 lines 指向了這個(gè)file
- 第二行也僅僅定義了 linelengths 是作為 map 的結(jié)果,但也沒有開始運(yùn)行 map 這個(gè)過程
- 直到第三句話才開始運(yùn)行,各個(gè) worker 節(jié)點(diǎn)開始運(yùn)行自己的 map、reduce 過程
你也可以調(diào)用?lineLengths.persist()?來持久化 RDD。
除了使用 lambda 表達(dá)式,也可以通過函數(shù)來運(yùn)行轉(zhuǎn)換或者動(dòng)作,使用函數(shù)需要注意局部變量的作用域問題。
例如下面的 Python 代碼中的 field 變量:
class MyClass(object):def __init__(self):self.field = "Hello"def doStuff(self, rdd):field = self.fieldreturn rdd.map(lambda s: field + x)如果使用 Java 語(yǔ)言,則需要用到匿名內(nèi)部類:
class GetLength implements Function<String, Integer> {public Integer call(String s) { return s.length(); } } class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) { return a + b; } }JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new GetLength()); int totalLength = lineLengths.reduce(new Sum());Spark 也支持鍵值對(duì)的操作,這在分組和聚合操作時(shí)候用得到。定義一個(gè)鍵值對(duì)對(duì)象時(shí),需要自定義該對(duì)象的 equals() 和 hashCode() 方法。
在 Scala 中有一個(gè)?Tuple2?對(duì)象表示鍵值對(duì),這是一個(gè)內(nèi)置的對(duì)象,通過?(a,b)?就可以創(chuàng)建一個(gè) Tuple2 對(duì)象。在你的程序中,通過導(dǎo)入?org.apache.spark.SparkContext._?就可以對(duì) Tuple2 進(jìn)行操作。對(duì)鍵值對(duì)的操作方法,可以查看?PairRDDFunctions
下面是一個(gè)用 scala 統(tǒng)計(jì)單詞出現(xiàn)次數(shù)的例子:
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)接下來,你還可以執(zhí)行?counts.sortByKey()、?counts.collect()?等操作。
如果用 Java 統(tǒng)計(jì),則代碼如下:
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);用 Python 統(tǒng)計(jì),代碼如下:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)測(cè)試
現(xiàn)在來結(jié)合上面的例子實(shí)現(xiàn)一個(gè)完整的例子。下面,我們來?分析 Nginx 日志中狀態(tài)碼出現(xiàn)次數(shù),并且將結(jié)果按照狀態(tài)碼從小到大排序。
先將測(cè)試數(shù)據(jù)上傳到 hdfs:
$ hadoop fs -put access.log然后,編寫一個(gè) python 文件,保存為 SimpleApp.py:
from pyspark import SparkContextlogFile = "access.log"sc = SparkContext("local", "Simple App")logData = sc.textFile(logFile).cache()counts = logData.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = counts.collect() for (word, count) in output: print "%s: %i" % (word, count) counts.saveAsTextFile("spark_results")sc.stop()接下來,運(yùn)行下面代碼:
$ spark-submit --master local[4] SimpleApp.py運(yùn)行成功之后,你會(huì)在終端看到以下輸出:
200: 6827 206: 120 301: 7 304: 10 403: 38 404: 125 416: 1并且,在hdfs 上 /user/spark/spark_results/part-00000 內(nèi)容如下:
(u'200', 6827) (u'206', 120) (u'301', 7) (u'304', 10) (u'403', 38) (u'404', 125) (u'416', 1)其實(shí),這個(gè)例子和官方提供的例子很相像,具體請(qǐng)看?wordcount.py。
常見的轉(zhuǎn)換
| map(func) | 返回一個(gè)新分布式數(shù)據(jù)集,由每一個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成 |
| filter(func) | 返回一個(gè)新數(shù)據(jù)集,由經(jīng)過func函數(shù)計(jì)算后返回值為 true 的輸入元素組成 |
| flatMap(func) | 類似于 map,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素,因此 func 應(yīng)該返回一個(gè)序列 |
| mapPartitions(func) | 類似于 map,但獨(dú)立地在 RDD 的每一個(gè)分塊上運(yùn)行,因此在類型為 T 的 RDD 上運(yùn)行時(shí),func 的函數(shù)類型必須是?Iterator[T] ? Iterator[U] |
| mapPartitionsWithSplit(func) | 類似于 mapPartitions, 但 func 帶有一個(gè)整數(shù)參數(shù)表示分塊的索引值。因此在類型為 T的RDD上運(yùn)行時(shí),func 的函數(shù)類型必須是?(Int, Iterator[T]) ? Iterator[U] |
| sample(withReplacement,fraction, seed) | 根據(jù) fraction 指定的比例,對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否用隨機(jī)數(shù)進(jìn)行替換,seed 用于指定隨機(jī)數(shù)生成器種子 |
| union(otherDataset) | 返回一個(gè)新的數(shù)據(jù)集,新數(shù)據(jù)集是由源數(shù)據(jù)集和參數(shù)數(shù)據(jù)集聯(lián)合而成 |
| distinct([numTasks])) | 返回一個(gè)包含源數(shù)據(jù)集中所有不重復(fù)元素的新數(shù)據(jù)集 |
| groupByKey([numTasks]) | 在一個(gè)鍵值對(duì)的數(shù)據(jù)集上調(diào)用,返回一個(gè)?(K,Seq[V])對(duì)的數(shù)據(jù)集 。注意:默認(rèn)情況下,只有8個(gè)并行任務(wù)來做操作,但是你可以傳入一個(gè)可選的 numTasks 參數(shù)來改變它 |
| reduceByKey(func, [numTasks]) | 在一個(gè)鍵值對(duì)的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè)鍵值對(duì)的數(shù)據(jù)集,使用指定的 reduce 函數(shù),將相同 key 的值聚合到一起。類似 groupByKey,reduce 任務(wù)個(gè)數(shù)是可以通過第二個(gè)可選參數(shù)來配置的 |
| sortByKey([ascending], [numTasks]) | 在一個(gè)鍵值對(duì)的數(shù)據(jù)集上調(diào)用,K 必須實(shí)現(xiàn)?Ordered?接口,返回一個(gè)按照 Key 進(jìn)行排序的鍵值對(duì)數(shù)據(jù)集。升序或降序由 ascending 布爾參數(shù)決定 |
| join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W) 類型的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的?(K, (V, W))?數(shù)據(jù)集 |
| cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W) 的數(shù)據(jù)集上調(diào)用,返回一個(gè)?(K, Seq[V], Seq[W])?元組的數(shù)據(jù)集。這個(gè)操作也可以稱之為 groupwith |
| cartesian(otherDataset) | 笛卡爾積,在類型為 T 和 U 類型的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè) (T, U) 對(duì)數(shù)據(jù)集(兩兩的元素對(duì)) |
| pipe(command, [envVars]) | 對(duì) RDD 進(jìn)行管道操作 |
| coalesce(numPartitions) | 減少 RDD 的分區(qū)數(shù)到指定值。在過濾大量數(shù)據(jù)之后,可以執(zhí)行此操作 |
| repartition(numPartitions) | 重新給 RDD 分區(qū) |
| repartitionAndSortWithinPartitions(partitioner) | 重新給 RDD 分區(qū),并且每個(gè)分區(qū)內(nèi)以記錄的 key 排序 |
常用的動(dòng)作
常用的動(dòng)作列表
| reduce(func) | 通過函數(shù) func 聚集數(shù)據(jù)集中的所有元素。這個(gè)功能必須可交換且可關(guān)聯(lián)的,從而可以正確的被并行執(zhí)行。 |
| collect() | 在驅(qū)動(dòng)程序中,以數(shù)組的形式,返回?cái)?shù)據(jù)集的所有元素。這通常會(huì)在使用 filter 或者其它操作并返回一個(gè)足夠小的數(shù)據(jù)子集后再使用會(huì)比較有用。 |
| count() | 返回?cái)?shù)據(jù)集的元素的個(gè)數(shù)。 |
| first() | 返回?cái)?shù)據(jù)集的第一個(gè)元素,類似于?take(1) |
| take(n) | 返回一個(gè)由數(shù)據(jù)集的前 n 個(gè)元素組成的數(shù)組。注意,這個(gè)操作目前并非并行執(zhí)行,而是由驅(qū)動(dòng)程序計(jì)算所有的元素 |
| takeSample(withReplacement,num, seed) | 返回一個(gè)數(shù)組,在數(shù)據(jù)集中隨機(jī)采樣 num 個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed 用于指定的隨機(jī)數(shù)生成器種子 |
| takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個(gè)元素 |
| saveAsTextFile(path) | 將數(shù)據(jù)集的元素,以 textfile 的形式,保存到本地文件系統(tǒng),HDFS或者任何其它 hadoop 支持的文件系統(tǒng)。對(duì)于每個(gè)元素,Spark 將會(huì)調(diào)用?toString?方法,將它轉(zhuǎn)換為文件中的文本行 |
| saveAsSequenceFile(path)?(Java and Scala) | 將數(shù)據(jù)集的元素,以 Hadoop sequencefile 的格式保存到指定的目錄下 |
| saveAsObjectFile(path)?(Java and Scala) | 將數(shù)據(jù)集的元素,以 Java 序列化的方式保存到指定的目錄下 |
| countByKey() | 對(duì)(K,V)類型的 RDD 有效,返回一個(gè) (K,Int) 對(duì)的 Map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù) |
| foreach(func) | 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù) func 進(jìn)行更新。這通常用于邊緣效果,例如更新一個(gè)累加器,或者和外部存儲(chǔ)系統(tǒng)進(jìn)行交互,例如HBase |
3.4 RDD持久化
Spark 最重要的一個(gè)功能,就是在不同操作間,持久化(或緩存)一個(gè)數(shù)據(jù)集在內(nèi)存中,這將使得后續(xù)的動(dòng)作變得更加迅速。緩存是用 Spark 構(gòu)建迭代算法的關(guān)鍵。 使用以下兩種方法可以標(biāo)記要緩存的 RDD:
lineLengths.persist() lineLengths.cache()取消緩存則用:
lineLengths.unpersist()每一個(gè)RDD都可以用不同的保存級(jí)別進(jìn)行保存,通過將一個(gè)?org.apache.spark.storage.StorageLevel?對(duì)象傳遞給?persist(self, storageLevel)?可以控制 RDD 持久化到磁盤、內(nèi)存或者是跨節(jié)點(diǎn)復(fù)制等等。?cache()?方法是使用默認(rèn)存儲(chǔ)級(jí)別的快捷方法,也就是?StorageLevel.MEMORY_ONLY。 完整的可選存儲(chǔ)級(jí)別如下:
| MEMORY_ONLY | 默認(rèn)的級(jí)別, 將 RDD 作為反序列化的的對(duì)象存儲(chǔ)在 JVM 中。如果不能被內(nèi)存裝下,一些分區(qū)將不會(huì)被緩存,并且在需要的時(shí)候被重新計(jì)算 |
| MEMORY_AND_DISK | 將 RDD 作為反序列化的的對(duì)象存儲(chǔ)在 JVM 中。如果不能被與內(nèi)存裝下,超出的分區(qū)將被保存在硬盤上,并且在需要時(shí)被讀取 |
| MEMORY_ONLY_SER | 將 RDD 作為序列化的的對(duì)象進(jìn)行存儲(chǔ)(每一分區(qū)占用一個(gè)字節(jié)數(shù)組)。通常來說,這比將對(duì)象反序列化的空間利用率更高,尤其當(dāng)使用fast serializer,但在讀取時(shí)會(huì)比較占用CPU |
| MEMORY_AND_DISK_SER | 與?MEMORY_ONLY_SER?相似,但是把超出內(nèi)存的分區(qū)將存儲(chǔ)在硬盤上而不是在每次需要的時(shí)候重新計(jì)算 |
| DISK_ONLY | 只將 RDD 分區(qū)存儲(chǔ)在硬盤上 |
| MEMORY_ONLY_2、?MEMORY_AND_DISK_2等 | 與上述的存儲(chǔ)級(jí)別一樣,但是將每一個(gè)分區(qū)都復(fù)制到兩個(gè)集群結(jié)點(diǎn)上 |
| OFF_HEAP | 開發(fā)中 |
Spark 的不同存儲(chǔ)級(jí)別,旨在滿足內(nèi)存使用和 CPU 效率權(quán)衡上的不同需求。我們建議通過以下的步驟來進(jìn)行選擇:
- 如果你的 RDD 可以很好的與默認(rèn)的存儲(chǔ)級(jí)別契合,就不需要做任何修改了。這已經(jīng)是 CPU 使用效率最高的選項(xiàng),它使得 RDD的操作盡可能的快。
- 如果不行,試著使用?MEMORY_ONLY_SER?并且選擇一個(gè)快速序列化的庫(kù)使得對(duì)象在有比較高的空間使用率的情況下,依然可以較快被訪問。
- 盡可能不要存儲(chǔ)到硬盤上,除非計(jì)算數(shù)據(jù)集的函數(shù),計(jì)算量特別大,或者它們過濾了大量的數(shù)據(jù)。否則,重新計(jì)算一個(gè)分區(qū)的速度,和與從硬盤中讀取基本差不多快。
- 如果你想有快速故障恢復(fù)能力,使用復(fù)制存儲(chǔ)級(jí)別。例如:用 Spark 來響應(yīng)web應(yīng)用的請(qǐng)求。所有的存儲(chǔ)級(jí)別都有通過重新計(jì)算丟失數(shù)據(jù)恢復(fù)錯(cuò)誤的容錯(cuò)機(jī)制,但是復(fù)制存儲(chǔ)級(jí)別可以讓你在 RDD 上持續(xù)的運(yùn)行任務(wù),而不需要等待丟失的分區(qū)被重新計(jì)算。
- 如果你想要定義你自己的存儲(chǔ)級(jí)別,比如復(fù)制因子為3而不是2,可以使用?StorageLevel?單例對(duì)象的?apply()方法。
4. 共享變量
5. 參考文章
- http://spark.apache.org/docs/latest/programming-guide.html
- http://rdc.taobao.org/?p=2024
- http://blog.csdn.net/u011391905/article/details/37929731
- http://segmentfault.com/blog/whuwb/1190000000723037
總結(jié)
以上是生活随笔為你收集整理的Spark编程指南笔记的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度学习:推动NLP领域发展的新引擎
- 下一篇: FFmpeg的添加logo,去logo