Spark编程指南(Python版)
Spark編程指南
譯者說在前面:最近在學習Spark相關的知識,在網上沒有找到比較詳細的中文教程,只找到了官網的教程。出于自己學習同時也造福其他初學者的目的,把這篇指南翻譯成了中文,筆者水平有限,文章中難免有許多謬誤,請高手不吝賜教。
本文翻譯自Spark Programming Guide,由于筆者比較喜歡Python,在日常中使用也比較多,所以只翻譯了Python部分,不過Java和Scala大同小異。
概述
從高層次上來看,每一個Spark應用都包含一個驅動程序,用于執行用戶的main函數以及在集群上運行各種并行操作。Spark提供的主要抽象是彈性分布式數據集(RDD),這是一個包含諸多元素、被劃分到不同節點上進行并行處理的數據集合。RDD通過打開HDFS(或其他hadoop支持的文件系統)上的一個文件、在驅動程序中打開一個已有的Scala集合或由其他RDD轉換操作得到。用戶可以要求Spark將RDD持久化到內存中,這樣就可以有效地在并行操作中復用。另外,在節點發生錯誤時RDD可以自動恢復。
Spark提供的另一個抽象是可以在并行操作中使用的共享變量。在默認情況下,當Spark將一個函數轉化成許多任務在不同的節點上運行的時候,對于所有在函數中使用的變量,每一個任務都會得到一個副本。有時,某一個變量需要在任務之間或任務與驅動程序之間共享。Spark支持兩種共享變量:廣播變量,用來將一個值緩存到所有節點的內存中;累加器,只能用于累加,比如計數器和求和。
這篇指南將展示這些特性在Spark支持的語言中是如何使用的(本文只翻譯了Python部分)。如果你打開了Spark的交互命令行——bin/spark-shell的Scala命令行或bin/pyspark的Python命令行都可以——那么這篇文章你學習起來將是很容易的。
連接Spark
Spark1.3.0只支持Python2.6或更高的版本(但不支持Python3)。它使用了標準的CPython解釋器,所以諸如NumPy一類的C庫也是可以使用的。
通過Spark目錄下的bin/spark-submit腳本你可以在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫然后讓你將應用提交到集群中。你可以執行bin/pyspark來打開Python的交互命令行。
如果你希望訪問HDFS上的數據,你需要為你使用的HDFS版本建立一個PySpark連接。常見的HDFS版本標簽都已經列在了這個第三方發行版頁面。
最后,你需要將一些Spark的類import到你的程序中。加入如下這行:
| 1 | frompysparkimportSparkContext, SparkConf |
初始化Spark
在一個Spark程序中要做的第一件事就是創建一個SparkContext對象來告訴Spark如何連接一個集群。為了創建SparkContext,你首先需要創建一個SparkConf對象,這個對象會包含你的應用的一些相關信息。
| 1 2 | conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) |
appName參數是在集群UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN集群的URL,如果你在本地運行那么這個參數應該是特殊的”local”字符串。在實際使用中,當你在集群中運行你的程序,你一般不會把master參數寫死在代碼中,而是通過用spark-submit運行程序來獲得這個參數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來運行Spark程序。
使用命令行
在PySpark命令行中,一個特殊的集成在解釋器里的SparkContext變量已經建立好了,變量名叫做sc。創建你自己的SparkContext不會起作用。你可以通過使用—master命令行參數來設置這個上下文連接的master主機,你也可以通過—py-files參數傳遞一個用逗號隔開的列表來將Python的.zip、.egg或.py文件添加到運行時路徑中。你還可以通過—package參數傳遞一個用逗號隔開的maven列表來給這個命令行會話添加依賴(比如Spark的包)。任何額外的包含依賴包的倉庫(比如SonaType)都可以通過傳給—repositorys參數來添加進去。Spark包的所有Python依賴(列在這個包的requirements.txt文件中)在必要時都必須通過pip手動安裝。
比如,使用四核來運行bin/pyspark應當輸入這個命令:
| 1 | $ ./bin/pyspark –master local[4] |
又比如,把code.py文件添加到搜索路徑中(為了能夠import在程序中),應當使用這條命令:
| 1 | $ ./bin/pyspark –master local[4] –py-files code.py |
想要了解命令行選項的完整信息請執行pyspark --help命令。在這些場景下,pyspark會觸發一個更通用的spark-submit腳本
在IPython這個加強的Python解釋器中運行PySpark也是可行的。PySpark可以在1.0.0或更高版本的IPython上運行。為了使用IPython,必須在運行bin/pyspark時將PYSPARK_DRIVER_PYTHON變量設置為ipython,就像這樣:
| 1 | $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark |
你還可以通過設置PYSPARK_DRIVER_PYTHON_OPTS來自省定制ipython。比如,在運行IPython Notebook
時開啟PyLab圖形支持應該使用這條命令:
| 1 | $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=“notebook –pylab inline”./bin/pyspark |
彈性分布式數據集(RDD)
Spark是以RDD概念為中心運行的。RDD是一個容錯的、可以被并行操作的元素集合。創建一個RDD有兩個方法:在你的驅動程序中并行化一個已經存在的集合;從外部存儲系統中引用一個數據集,這個存儲系統可以是一個共享文件系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的數據來源。
并行化集合
并行化集合是通過在驅動程序中一個現有的迭代器或集合上調用SparkContext的parallelize方法建立的。為了創建一個能夠并行操作的分布數據集,集合中的元素都會被拷貝。比如,以下語句創建了一個包含1到5的并行化集合:
| 1 2 | data = [1,2,3,4,5] distData = sc.parallelize(data) |
分布數據集(distData)被建立起來之后,就可以進行并行操作了。比如,我們可以調用disData.reduce(lambda a, b: a+b)來對元素進行疊加。在后文中我們會描述分布數據集上支持的操作。
并行集合的一個重要參數是將數據集劃分成分片的數量。對每一個分片,Spark會在集群中運行一個對應的任務。典型情況下,集群中的每一個CPU將對應運行2-4個分片。一般情況下,Spark會根據當前集群的情況自行設定分片數量。但是,你也可以通過將第二個參數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動確定分片數量。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下兼容性。
外部數據集
PySpark可以通過Hadoop支持的外部數據源(包括本地文件系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分布數據集。Spark支持文本文件、序列文件以及其他任何Hadoop輸入格式文件。
通過文本文件創建RDD要使用SparkContext的textFile方法。這個方法會使用一個文件的URI(或本地文件路徑,hdfs://、s3n://這樣的URI等等)然后讀入這個文件建立一個文本行的集合。以下是一個例子:
| 1 | >>>?distFile = sc.textFile(“data.txt”) |
建立完成后distFile上就可以調用數據集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:
| 1 | distFile.map(lambdas: len(s)).reduce(lambdaa, b: a + b) |
在Spark中讀入文件時有幾點要注意:
- 如果使用了本地文件路徑時,要保證在worker節點上這個文件也能夠通過這個路徑訪問。這點可以通過將這個文件拷貝到所有worker上或者使用網絡掛載的共享文件系統來解決。
- 包括textFile在內的所有基于文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數。比如,以下代碼都是合法的:
| 1 2 3 | textFile(“/my/directory”) textFile(“/my/directory/*.txt”) textFile(“/my/directory/*.gz”) |
- textFile方法也可以傳入第二個可選參數來控制文件的分片數量。默認情況下,Spark會為文件的每一個塊(在HDFS中塊的大小默認是64MB)創建一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小于文件塊的數量。
除了文本文件之外,Spark的Python API還支持多種其他數據格式:
- SparkContext.wholeTextFiles能夠讀入包含多個小文本文件的目錄,然后為每一個文件返回一個(文件名,內容)對。這是與textFile方法為每一個文本行返回一條記錄相對應的。
- RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對象格式存儲起來。串行化的過程中會以默認10個一批的數量批量處理。
- 序列文件和其他Hadoop輸入輸出格式。
注意
這個特性目前仍處于試驗階段,被標記為Experimental,目前只適用于高級用戶。這個特性在未來可能會被基于Spark SQL的讀寫支持所取代,因為Spark SQL是更好的方式。
可寫類型支持
PySpark序列文件支持利用Java作為中介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然后使用Pyrolite將java結果對象串行化。當將一個鍵值對RDD儲存到一個序列文件中時PySpark將會運行上述過程的相反過程。首先將Python對象反串行化成Java對象,然后轉化成可寫類型。以下可寫類型會自動轉換:
| 可寫類型 | Python類型 |
| ———————- | ————- |
| Text | unicode str|
| IntWritable | int |
| FloatWritable | float |
| DoubleWritable | float |
| BooleanWritable | bool |
| BytesWritable | bytearray |
| NullWritable | None |
| MapWritable | dict |
數組是不能自動轉換的。用戶需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自定義的ArrayWritable子類型轉化成Java的Object[],之后串行化成Python的元組。為了獲得Python的array.array類型來使用主要類型的數組,用戶需要自行指定轉換器。
保存和讀取序列文件
和文本文件類似,序列文件可以通過指定路徑來保存與讀取。鍵值類型都可以自行指定,但是對于標準可寫類型可以不指定。
| 1 2 3 4 | >>>?rdd = sc.parallelize(range(1,4)).map(lambdax: (x,“a”* x )) >>>?rdd.saveAsSequenceFile(“path/to/file”) >>>?sorted(sc.sequenceFile(“path/to/file”).collect()) [(1,u’a’), (2,u’aa’), (3,u’aaa’)] |
保存和讀取其他Hadoop輸入輸出格式
PySpark同樣支持寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:
| 1 2 3 4 5 6 7 8 9 | $ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark >>>?conf = {“es.resource”:“index/type”}# assume Elasticsearch is running on localhost defaults >>>?rdd = sc.newAPIHadoopRDD(“org.elasticsearch.hadoop.mr.EsInputFormat”,\ “org.apache.hadoop.io.NullWritable”,“org.elasticsearch.hadoop.mr.LinkedMapWritable”, conf=conf) >>>?rdd.first()# the result is a MapWritable that is converted to a Python dict (u’Elasticsearch ID’, {u’field1′:True, u’field2′:u’Some Text’, u’field3′:12345}) |
注意,如果這個讀入格式僅僅依賴于一個Hadoop配置和/或輸入路徑,而且鍵值類型都可以根據前面的表格直接轉換,那么剛才提到的這種方法非常合適。
如果你有一些自定義的序列化二進制數據(比如從Cassandra/HBase中讀取數據),那么你需要首先在Scala/Java端將這些數據轉化成可以被Pyrolite的串行化器處理的數據類型。一個轉換器特質已經提供好了。簡單地拓展這個特質同時在convert方法中實現你自己的轉換代碼即可。記住,要確保這個類以及訪問你的輸入格式所需的依賴都被打到了Spark作業包中,并且確保這個包已經包含到了PySpark的classpath中。
這里有一些通過自定義轉換器來使用Cassandra/HBase輸入輸出格式的Python樣例和轉換器樣例。
RDD操作
RDD支持兩類操作:轉化操作,用于從已有的數據集轉化產生新的數據集;啟動操作,用于在計算結束后向驅動程序返回結果。舉個例子,map是一個轉化操作,可以將數據集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,并且向驅動程序返回最終結果(同時還有一個并行的reduceByKey操作可以返回一個分布數據集)。
在Spark所有的轉化操作都是惰性求值的,就是說它們并不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作對象(比如:一個文件)。只有當一個啟動操作被執行,要向驅動程序返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map操作產生的數據集將會在reduce操作中用到,之后僅僅是返回了reduce的最終的結果而不是map產生的龐大數據集。
在默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來將RDD持久化到內存中,這樣Spark就可以在下次使用這個數據集時快速獲得。Spark同樣提供了對將RDD持久化到硬盤上或在多個節點間復制的支持。
基本操作
為了演示RDD的基本操作,請看以下的簡單程序:
| 1 2 3 | lines = sc.textFile(“data.txt”) lineLengths = lines.map(lambdas: len(s)) totalLength = lineLengths.reduce(lambdaa, b: a + b) |
第一行定義了一個由外部文件產生的基本RDD。這個數據集不是從內存中載入的也不是由其他操作產生的;lines僅僅是一個指向文件的指針。第二行將lineLengths定義為map操作的結果。再強調一次,由于惰性求值的緣故,lineLengths并不會被立即計算得到。最后,我們運行了reduce操作,這是一個啟動操作。從這個操作開始,Spark將計算過程劃分成許多任務并在多機上運行,每臺機器運行自己部分的map操作和reduce操作,最終將自己部分的運算結果返回給驅動程序。
如果我們希望以后重復使用lineLengths,只需在reduce前加入下面這行代碼:
| 1 | lineLengths.persist() |
這條代碼將使得lineLengths在第一次計算生成之后保存在內存中。
向Spark傳遞函數
Spark的API嚴重依賴于向驅動程序傳遞函數作為參數。有三種推薦的方法來傳遞函數作為參數。
- Lambda表達式,簡單的函數可以直接寫成一個lambda表達式(lambda表達式不支持多語句函數和無返回值的語句)。
- 對于代碼很長的函數,在Spark的函數調用中在本地用def定義。
- 模塊中的頂級函數。
比如,傳遞一個無法轉化為lambda表達式長函數,可以像以下代碼這樣:
| 1 2 3 4 5 6 7 8 | “MyScript.py”“” if__name__ ==“__main__”: def?myFunc(s): words = s.split(” “) returnlen(words) sc = SparkContext(…) sc.textFile(“file.txt”).map(myFunc) |
值得指出的是,也可以傳遞類實例中方法的引用(與單例對象相反),這種傳遞方法會將整個對象傳遞過去。比如,考慮以下代碼:
| 1 2 3 4 5 | class?MyClass(object): def?func(self, s): returns def?doStuff(self, rdd): returnrdd.map(self.func) |
在這里,如果我們創建了一個新的MyClass對象,然后對它調用doStuff方法,map會用到這個對象中func方法的引用,所以整個對象都需要傳遞到集群中。
還有另一種相似的寫法,訪問外層對象的數據域會傳遞整個對象的引用:
| 1 2 3 4 5 | class?MyClass(object): def?__init__(self): self.field =“Hello” def?doStuff(self, rdd): returnrdd.map(lambdas: self.field + x) |
此類問題最簡單的避免方法就是,使用一個本地變量緩存一份這個數據域的拷貝,直接訪問這個數據域:
| 1 2 3 | def?doStuff(self, rdd): field = self.field returnrdd.map(lambdas: field + x) |
使用鍵值對
雖然大部分Spark的RDD操作都支持所有種類的對象,但是有少部分特殊的操作只能作用于鍵值對類型的RDD。這類操作中最常見的就是分布的shuffle操作,比如將元素通過鍵來分組或聚集計算。
在Python中,這類操作一般都會使用Python內建的元組類型,比如(1, 2)。它們會先簡單地創建類似這樣的元組,然后調用你想要的操作。
比如,一下代碼對鍵值對調用了reduceByKey操作,來統計每一文本行在文本文件中出現的次數:
| 1 2 3 | lines = sc.textFile(“data.txt”) pairs = lines.map(lambdas: (s,1)) counts = pairs.reduceByKey(lambdaa, b: a + b) |
我們還可以使用counts.sortByKey(),比如,當我們想將這些鍵值對按照字母表順序排序,然后調用counts.collect()方法來將結果以對象列表的形式返回。
轉化操作
下面的表格列出了Spark支持的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala,?Java,?Python)和鍵值對RDD函數文檔(Scala,?Java)。
(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)
轉化操作 | 作用
————| ——
map(func) | 返回一個新的分布數據集,由原數據集元素經func處理后的結果組成
filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是迭代器
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然后替換或不替換
union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的所有元素
intersection(otherDataset) | 返回新數據集,是兩個集的交集
distinct([numTasks]) | 返回新的集,包括原集中的不重復元素
groupByKey([numTasks]) | 當用于鍵值對RDD時返回(鍵,值迭代器)對的數據集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用于鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算
sortByKey([ascending], [numTasks])用于鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定
join(otherDataset, [numTasks]) | 用于鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD
cogroup(otherDataset, [numTasks]) | 用于兩個鍵值對RDD時返回
(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) | 用于T和U類型RDD時返回(T, U)對類型鍵值對RDD
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片
coalesce(numPartitions) | 把RDD的分片數量降低到參數大小
repartition(numPartitions) | 重新打亂RDD中元素順序并重新分片,數量由參數決定
repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序
啟動操作
下面的表格列出了Spark支持的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala,?Java,?Python)和鍵值對RDD函數文檔(Scala,?Java)。
(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)
啟動操作 | 作用
————| ——
reduce(func) | 使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地并行運算
collect() | 向驅動程序返回數據集的元素組成的數組
count() | 返回數據集元素的數量
first() | 返回數據集的第一個元素
take(n) | 返回前n個元素組成的數組
takeSample(withReplacement, num, [seed]) | 返回一個由原數據集中任意num個元素的suzuki,并且替換之
takeOrder(n, [ordering]) | 返回排序后的前n個元素
saveAsTextFile(path) | 將數據集的元素寫成文本文件
saveAsSequenceFile(path) | 將數據集的元素寫成序列文件,這個API只能用于Java和Scala程序
saveAsObjectFile(path) | 將數據集的元素使用Java的序列化特性寫到文件中,這個API只能用于Java和Scala程序
countByCount() | 只能用于鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數
foreach(func) | 對數據集的每個元素執行func, 通常用于完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等
RDD持久化
Spark的一個重要功能就是在將數據集持久化(或緩存)到內存中以便在多個操作中重復使用。當我們持久化一個RDD是,每一個節點將這個RDD的每一個分片計算并保存到內存中以便在下次對這個數據集(或者這個數據集衍生的數據集)的計算中可以復用。這使得接下來的計算過程速度能夠加快(經常能加快超過十倍的速度)。緩存是加快迭代算法和快速交互過程速度的關鍵工具。
你可以通過調用persist或cache方法來標記一個想要持久化的RDD。在第一次被計算產生之后,它就會始終停留在節點的內存中。Spark的緩存是具有容錯性的——如果RDD的任意一個分片丟失了,Spark就會依照這個RDD產生的轉化過程自動重算一遍。
另外,每一個持久化的RDD都有一個可變的存儲級別,這個級別使得用戶可以改變RDD持久化的儲存位置。比如,你可以將數據集持久化到硬盤上,也可以將它以序列化的Java對象形式(節省空間)持久化到內存中,還可以將這個數據集在節點之間復制,或者使用Tachyon將它儲存到堆外。這些存儲級別都是通過向persist()傳遞一個StorageLevel對象(Scala,?Java,?Python)來設置的。存儲級別的所有種類請見下表:
注意:在Python中,儲存的對象永遠是通過Pickle庫序列化過的,所以設不設置序列化級別不會產生影響。
Spark還會在shuffle操作(比如reduceByKey)中自動儲存中間數據,即使用戶沒有調用persist。這是為了防止在shuffle過程中某個節點出錯而導致的全盤重算。不過如果用戶打算復用某些結果RDD,我們仍然建議用戶對結果RDD手動調用persist,而不是依賴自動持久化機制。
應該選擇哪個存儲級別?
Spark的存儲級別是為了提供內存使用與CPU效率之間的不同取舍平衡程度。我們建議用戶通過考慮以下流程來選擇合適的存儲級別:
- 如果你的RDD很適合默認的級別(MEMORY_ONLY),那么久使用默認級別吧。這是CPU最高效運行的選擇,能夠讓RDD上的操作以最快速度運行。
- 否則,試試MEMORY_ONLY_SER選項并且選擇一個快的序列化庫來使對象的空間利用率更高,同時盡量保證訪問速度足夠快。
- 不要往硬盤上持久化,除非重算數據集的過程代價確實很昂貴,或者這個過程過濾了巨量的數據。否則,重新計算分片有可能跟讀硬盤速度一樣快。
- 如果你希望快速的錯誤恢復(比如用Spark來處理web應用的請求),使用復制級別。所有的存儲級別都提供了重算丟失數據的完整容錯機制,但是復制一份副本能省去等待重算的時間。
- 在大內存或多應用的環境中,處于實驗中的OFF_HEAP模式有諸多優點:
- 這個模式允許多個執行者共享Tachyon中的同一個內存池
- 這個模式顯著降低了垃圾回收的花銷。
- 在某一個執行者個體崩潰之后緩存的數據不會丟失。
刪除數據
Spark會自動監視每個節點的緩存使用同時使用LRU算法丟棄舊數據分片。如果你想手動刪除某個RDD而不是等待它被自動刪除,調用RDD.unpersist()方法。
共享變量
通常情況下,當一個函數傳遞給一個在遠程集群節點上運行的Spark操作(比如map和reduce)時,Spark會對涉及到的變量的所有副本執行這個函數。這些變量會被復制到每個機器上,而且這個過程不會被反饋給驅動程序。通常情況下,在任務之間讀寫共享變量是很低效的。但是,Spark仍然提供了有限的兩種共享變量類型用于常見的使用場景:廣播變量和累加器。
廣播變量
廣播變量允許程序員在每臺機器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入數據集的拷貝是很低效的。Spark試圖使用高效的廣播算法來分布廣播變量,以此來降低通信花銷。
可以通過SparkContext.broadcast(v)來從變量v創建一個廣播變量。這個廣播變量是v的一個包裝,同時它的值可以功過調用value方法來獲得。以下的代碼展示了這一點:
| 1 2 3 4 5 | >>>?broadcastVar = sc.broadcast([1,2,3]) <pyspark.broadcast.Broadcast object at0x102789f10> >>>?broadcastVar.value [1,2,3] |
在廣播變量被創建之后,在所有函數中都應當使用它來代替原來的變量v,這樣就可以保證v在節點之間只被傳遞一次。另外,v變量在被廣播之后不應該再被修改了,這樣可以確保每一個節點上儲存的廣播變量的一致性(如果這個變量后來又被傳輸給一個新的節點)。
累加器
累加器是在一個相關過程中只能被”累加”的變量,對這個變量的操作可以有效地被并行化。它們可以被用于實現計數器(就像在MapReduce過程中)或求和運算。Spark原生支持對數字類型的累加器,程序員也可以為其他新的類型添加支持。累加器被以一個名字創建之后,會在Spark的UI中顯示出來。這有助于了解計算的累進過程(注意:目前Python中不支持這個特性)。
可以通過SparkContext.accumulator(v)來從變量v創建一個累加器。在集群中運行的任務隨后可以使用add方法或+=操作符(在Scala和Python中)來向這個累加器中累加值。但是,他們不能讀取累加器中的值。只有驅動程序可以讀取累加器中的值,通過累加器的value方法。
以下的代碼展示了向一個累加器中累加數組元素的過程:
| 1 2 3 4 5 6 7 8 9 | >>>?accum = sc.accumulator(0) Accumulator<id=0, value=0> >>>?sc.parallelize([1,2,3,4]).foreach(lambdax: accum.add(x)) … 10/09/2918:41:08INFO SparkContext: Tasks finishedin0.317106s scala> accum.value 10 |
這段代碼利用了累加器對int類型的內建支持,程序員可以通過繼承AccumulatorParam類來創建自己想要的類型支持。AccumulatorParam的接口提供了兩個方法:zero'用于為你的數據類型提供零值;'addInPlace'用于計算兩個值得和。比如,假設我們有一個Vector`類表示數學中的向量,我們可以這樣寫:
| 1 2 3 4 5 6 7 8 9 10 | class?VectorAccumulatorParam(AccumulatorParam): def?zero(self, initialValue): returnVector.zeros(initialValue.size) def?addInPlace(self, v1, v2): v1 += v2 returnv1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(…), VectorAccumulatorParam()) |
累加器的更新操作只會被運行一次,Spark提供了保證,每個任務中對累加器的更新操作都只會被運行一次。比如,重啟一個任務不會再次更新累加器。在轉化過程中,用戶應該留意每個任務的更新操作在任務或作業重新運算時是否被執行了超過一次。
累加器不會該別Spark的惰性求值模型。如果累加器在對RDD的操作中被更新了,它們的值只會在啟動操作中作為RDD計算過程中的一部分被更新。所以,在一個懶惰的轉化操作中調用累加器的更新,并沒法保證會被及時運行。下面的代碼段展示了這一點:
| 1 2 3 | accum = sc.accumulator(0) data.map(lambdax => acc.add(x); f(x)) # Here, acc is still 0 because no actions have cause the `map` to be computed. |
在集群上部署
這個應用提交指南描述了一個應用被提交到集群上的過程。簡而言之,只要你把你的應用打成了JAR包(Java/Scala應用)或.py文件的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會將應用提交到任意支持的集群管理器上。
單元測試
Spark對單元測試是友好的,可以與任何流行的單元測試框架相容。你只需要在測試中創建一個SparkContext,并如前文所述將master的URL設為local,執行你的程序,最后調用SparkContext.stop()來終止運行。請確保你在finally塊或測試框架的tearDown方法中終止了上下文,因為Spark不支持兩個上下文在一個程序中同時運行。
從1.0之前版本的Spark遷移
Spark1.0凍結了1.X系列Spark的核心API?,F在版本中沒有標注”experimental”或是”developer API”的API在未來的版本中仍會被支持。對Python用戶來說唯一的變化就是組管理操作,比如groupByKey,?cogroup,?join, 它們的返回值都從(鍵,值列表)對變成了(鍵, 值迭代器)對。
你還可以閱讀Spark Streaming,?MLlib和GraphX的遷移指南。
還有什么要做的
你可以在Spark的網站上看到更多的Spark樣例程序。另外,在examples目錄下還有許多樣例代碼(Scala,?Java,?Python)。你可以通過將類名稱傳給Spark的bin/run-example 腳本來運行Java和Scala語言樣例,舉例說明:
| 1 | ./bin/run-example SparkPi |
對于Python例子,使用spark-submit腳本代替:
| 1 | ./bin/spark-submit examples/src/main/python/pi.py |
為了給你優化代碼提供幫助,配置指南和調優指南提供了關于最佳實踐的一些信息。確保你的數據儲存在以高效的格式儲存在內存中,這很重要。為了給你部署應用提供幫助,集群模式概覽描述了許多內容,包括分布式操作和支持的集群管理器。
最后,完整的API文檔在這里。Scala版本?Java版本?Python版本
?
from:http://cholerae.com/2015/04/11/-%E7%BF%BB%E8%AF%91-Spark%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97-Python%E7%89%88/
總結
以上是生活随笔為你收集整理的Spark编程指南(Python版)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VS2008显示代码行号
- 下一篇: 30 天学习 30 种新技术系列