spark入门_入门必读 | Spark 论文导读
Resilient Distributed Datasets: A fault-tolerant abstraction for in-Memory cluster computing, 是講述 Spark RDD 的基礎論文,通讀論文能給我們帶來全景的 Spark 知識面
摘要:RDD,全稱Resilient Distributed Dataset,可伸縮性數據集。使用它編程,可以有效利用大規模集群的內存,并且兼顧容錯。RDD的流行,完美解決了兩類應用難題:迭代算法(Iterative Algorithm)和交互性數據挖掘工具。在這兩類應用中,RDD緩存中間結果集的辦法,使得程序運行性能提高了一個量級。在容錯方面,RDD使用了粗放型的共享內存轉換方法,而不是對其(共享內存)做精控更新。RDD完全可以勝任迭代算法(此前這類任務都由Pregel這樣的編程模型完成),并且對新數據分析算法、應用都提供更好的支持。通過大量的用戶應用和壓力測試,最終Spark實現了RDD.
1 簡介:
像MapReduce和Dryad這樣的集群計算框架,已經廣泛應用于大規模數據分析。這類計算框架,最大的兩大優點,旨在幫助程序員專注業務編程,而非花精力分發計算任務和實現程序容錯。
當今的計算框架雖然對利用集群中的計算資源做了各類抽象,但還沒有實現對集群內存的抽象封裝。這樣對那些需要重復利用中間結果集的應用就很不友好,比如機器學習和圖算法,PageRank,K-means 聚類以及邏輯回歸等等。另一類計算,比如交互式數據分析,因涉及大量的即席數據查詢,為確保下一次數據集可以被重用,需要借助存儲物化結果集,這引發大量寫入實體磁盤的操作,導致執行時間拉長。
意識到這個問題的存在,專家們做了大量嘗試,比如 Pregel,把大量中間數據緩存起來,專為圖計算封裝了框架;HaLoop 則提供了實現迭代算法的MapReduce接口。但這些僅僅對個案有幫助,回到通用的計算上來,毫無優勢。比如最常見的數據分析,裝載多樣化多源頭數據,展開即席查詢。
RDD彌補了“專家型計算框架”的缺陷,支持通用型分布式并行計算。使用集群中所有節點內存來裝載同一應用所需的數據,兼容并包圖形數據,二維數據,非結構化數據。且提供容錯機制,控制并行數據結構和持久化中間數據集。最神奇的地方是,RDD根據分區有效控制數據分布,利用高度抽象豐富的API去操作數據。
設計RDD遇到最大的難點是容錯?,F存的對集群內存的抽象,包括分布式共享內存,鍵值對,數據庫和Piccolo,提供的接口都是針對穩定狀態的精控更新,比如二維表中的單元格。利用這類接口,保證容錯的方法只能制作跨節點數據副本,或者異地日志備份。顯而易見,這些操作對于大數據量的支持不夠友好,既浪費網絡流量還增加了存儲開銷。
與這些老式的設計相比,RDD的優勢在于存儲計算方法而不是數據。數據經過一系列計算得到最終的結果,如果要保存這些數據的中間狀態來完成容錯,那還不如保存如何得到這些數據的計算方法來的開銷少。就如前面所說,保存這些中間數據集好處是可以提高性能。與容錯機制并不矛盾。舉例:讀取數據源后,原始RDD就初始化成功,經過map,filter,reduce得到一系列新的RDD,一旦RDD失效,只要重新按照RDD的生成路徑執行,數據還能復原。RDD天然還有分區屬性,即他的數據是分區存儲于集群中某些節點上,同一時間點不會所有分區都失效,那么重新計算某一個或幾個失效分區,需花費的時間肯定比重新計算所有分區來的少。
在RDD發明之前,很多特殊的計算需求只能靠不斷引入新的計算框架才能解決,比如 MapReduce, DryadLINQ,SQL, Pregel和HaLoop. 而RDD發明之后,對于這類在多個數據集上重復一組運算的操作,變得簡單和通用了。乍看上去,RDD似乎有很多缺陷,但在解決實際問題上,RDD卻是把合適的利劍。
以RDD為編程核心的Spark,廣泛用于 UC Berkeley 和眾多公司。它以Scala為首要編程語言,提供方便的集成編程接口,有點類似DryadLINQ.除此之外,Spark提供的Scala解釋器,可以很容易讓編程人員完成大規模數據集的集成處理。大概Spark是第一個使用通用編程語言來達到在集群中完成交互式數據挖掘的工具。
通過壓力測試,Spark處理迭代計算的速度是Hadoop的20倍,完成一份數據報表的分析,總耗時比之前的技術快40倍。甚至在5-7秒的延遲內,可以處理1TB的數據掃描。從更底層的角度出發,實際上在Spark中還繼承了Pregel和HaLoop編程模式,并采用了代碼優化,使得編程庫只有200行代碼那么輕便。
2 可伸縮性分布式數據集(RDD-Resilient Distributed Datasets)
在本小節,主要探討以下方面內容:
1)RDD 的編程接口
2)RDD 與精控共享內存的對比
3)RDD 的缺陷
2.1 RDD 抽象
首先,RDD 最明顯的兩個特征:1)只讀;2)分區。
只讀的屬性注定了RDD的產生方式只有新建,要么從其他數據源讀取而來,要么從已有的RDD修剪出來。
看到這里我有兩個問題:1)其他數據源讀取,如何分區并行讀取?比如讀一張數據庫二維表,如何并行地去讀? 2)RDD從另一個RDD派生出來,會造成大量數據重復,占用大量內容,如何優化?
RDD 的這類生產方式,叫做轉換操作(Transformation).這類操作并沒有直接作用在RDD本身的數據結構上,而是重新生成新的RDD.那么為什么不是直接在原RDD上做轉換呢,這是需要思考的問題。
常見的轉換操作有map(),filter(),join()等,后續詳解。
但,RDD并不總需要物化數據。它記錄了足夠多的繼承、轉換步驟等信息,即血統,以便在必要的時候,實現自我修復,從頭再生一個RDD。RDD的分區屬性,又幫助再生RDD的過程執行得非常高效,僅再生丟失的RDD分區即可。如果RDD丟失了血統信息,它將不能被任何程序調用。那么RDD的血統數據結構又是如何的呢?
用戶可控的兩個RDD屬性是分區和持久化。持久化提高RDD數據的可復用性,可以存儲在內存,可以存在硬盤(當然是在逼不得已的情況下)。分區是特別優雅的屬性,它方便程序員靈活的部署數據分布,使得最終需要JOIN的兩個數據集,按照同一個鍵值做哈希分區(Hash-Partition),這樣在Join時加快了處理速度。
2.2 Spark編程接口
與 DryadLINQ,FlumeJava 一樣,Spark 操控RDD同樣使用語言集成化編程接口(language-integrated API), 即把RDD當做對象,使用對象的方法來操作RDD.
在編寫 Spark 應用程序時,程序員首先做的事情,便是通過轉換函數,將源數據抽取過來,生成一組RDD;在RDD上執行動作函數(Action),使得結算結果返回驅動程序(Spark程序發起點),或是單值,或是數據集,或裝載到其他存儲設備或文件。整個過程中,最有技巧的地方是,RDD的動作函數(Action)才是真正的程序起始點,第一個動作函數開始執行時,整個數據流和任務流才開始。這是RDD典型的惰性計算。
在復雜的Spark程序中,轉換函數在動作函數之前可能會有很多,每一步的轉換函數都能重生一個RDD,當這些RDD需要在長鏈條的轉換函數中重復利用時,把特定的RDD固化下來,是提高性能的不二法門。Spark做得完美的地方在于,他允許我們將中間結果集(RDD)用persist方法暫存于內存中。比如對于從Hive來的數據,我們既需要做總計,還需要分維度做分計,那么計算整理出來的原始數據,就最好存入內存。除非內存不夠大,則選擇存入硬盤,或復制到更遠的遠程服務器。甚至還可以控制RDD存盤的優先級別。
實例:使用控制臺挖掘日志
當運營需要對上T的網絡日志做錯誤分析時,如果使用Hadoop平臺HDFS格式存儲,要分析日志,首先要編寫MapReduce程序,在程序中篩選錯誤日志,之后聚合匯總;也可以使用Hive來查詢,前提是搭建Hive環境,并設計好表結構。
如果采用Spark查詢,會是下面的編程腳本,非常簡易:
image
圖解:圖中的方框代表RDD,箭頭則表示一個轉換函數。
lines=spark.textFile("hdfs://...")errors=lines.filter(_.startsWith("ERROR"))errors.persist()這三行代碼就能解決查詢所有錯誤日志的信息。具體展開說明下:
lines=spark.textFile("hdfs://...")lines 是RDD,作用是從 hdfs 讀取日志;
errors=lines.filter(_.startsWith("ERROR"))errors是另一個新RDD,用來存儲含有ERROR的錯誤日志;
errors.persist()是將errors的數據固化在內存中,以供之后程序反復使用。但此時,spark并未開始執行。
若要執行這個Spark程序,需要執行一個動作函數,比如:
errors.count()這是在計算總共有多少次錯誤發生,此時Spark程序就執行了。這就是典型的“惰計算”,Spark獨有的特性。
再舉個具體的例子:比如MySQL數據庫的錯誤日志,歸檔之后放在了HDFS上面,那么用Spark計算總數就簡單了:
errors.filter(_.contains("MySQL")).count()除了count()這個總計動作函數外,還有很多動作函數也可以使得Spark程序立即運行起來:
errors.filter(_.contains("HDFS")).map(_.split('')(3)).collect()這是取了包含HDFS錯誤信息的第三個字段的值,并返回前臺。
當Spark的第一個動作函數執行時,lines,errors就相繼建立, lines因為沒有將其他非錯信息剔除,所以數據量巨大,全部裝載到內存里就容易溢出。但errors就不一樣了,因數據量小,適合暫留在內存中,為后續的復用提供準備。
最后,RDD是如何做到容錯的呢?在開始的簡易計算譜系圖中,每一步轉換操作都會被記錄下來。一旦errors RDD其中的一個分區丟失,重新按照這份譜系圖執行一遍,相當丟失分區的數據就回來了。
2.3 RDD模型的優點
image
(圖1)
分布式共享內存的概念
Distributed Shared Memory, 分布式共享內存
https://en.wikipedia.org/wiki/Distributed_shared_memory
分布式共享內存,最大的優點在于寫一次,多機同步。集群中的所有計算機節點,在同一內存位置存儲了同一份數據。
弊端也很明顯,一旦數據損壞,所有數據都要重新還原或重做;同步導致的延遲會很高,因為系統要保障數據的完整性。這在分布式數據庫中常見。
RDD 與 DSM 的區別在于,前者是粗放式寫入,通過轉換函數生成,而后者在內存任意位置均可寫入。 RDD不能很好地支持大批量寫入,卻可以很好的支持分區容錯。前面也說道,譜系圖是RDD容錯的利器,丟失分區可重生。
RDD的第二大優勢在于,備份節點可以迅速的被喚起,去代替那些緩慢節點執行任務。即在緩慢節點執行任務的同時,備份節點同時也執行相同的任務,哪個節點快就用那個節點的結果。而DSM則會被備份節點干擾,引起大家同時緩慢,因為共享內存之間會同步狀態,互相干擾。
RDD的另外兩大優點,基于數據存儲分發任務和溢出緩存至硬盤。在大量寫入的操作中,比如生成RDD,會選擇離數據最近的節點開始任務(如下圖所示);而在只讀操作中,大量數據沒發存入內存時,會自動存到硬盤上而不是報錯停止執行。
image
(圖2)
上圖所示的,便是驅動器程序(Driver)將計算任務分發到數據分區所在節點,執行轉換操作。多節點并行執行一個巨大數據量的操作得以完成。
不適合使用RDD的場景
如前所述,RDD的最大優點是,并行處理只讀數據。RDD之間有完整的血統關系,稱之為譜系圖。其中之一丟失后,可以憑借譜系圖恢復數據。但對于大量寫入的程序,比如爬蟲就不適合了。保障爬蟲數據的完整性,需要做及時的checkpoint,實現多重副本的建立。這種異步機制,只能靠傳統的日志型系統完成,比如數據庫, RAMCloud, Percolator, Piccolo.
3 Spark編程接口
Spark提供了Scala,一種類DryadLINQ的Java vm函數編程語言,用來封裝 RDD 的編程接口(Api). Scala有兩個好處,一是方便交互式操作;二是靜態類型的效率極高。
Scala 是靜態類型的語言,即在編譯時就已經完成了數據類型的檢查,比起動態類型,是要提高不少效率
如圖2所示,Spark是由Driver程序啟動,分發任務到各節點上運行,這些節點稱為worker程序,生成的RDD數據分區會在worker程序里面保存起來,直到程序結束。Driver還負責每個RDD分區的血統記錄,即每個RDD分區的父分區或者數據源是什么,以便丟失后恢復。
在Spark的編程接口里,有個很重要的特性是傳遞函數閉包(function closure).函數閉包被當做變量可以傳遞到轉換函數或動作函數中去,而閉包中的變量,常量都可以被共享訪問。因此當轉換函數與動作函數有閉包函數傳入時,事實上每個RDD分區都會接收到相同的一個閉包函數。
比如:
var?x?=?5;rdd.map(_?+?x)就把 x 傳到了每個RDD分區的map函數中。
Scala是門靜態語言,RDD的元素類型需要首先定義好,但支持隱式轉換,比如RDD[Int]理論上需要存儲整型(int)元素,但事實上Int可以省卻,因為一旦存儲可以隱式轉換成int的字符串,也沒問題。
RDD及其操作非常簡單,但理解RDD的重點卻在于閉包函數。閉包函數在傳遞過程中,需要序列化,反射。這些都需要嚴肅處理。
3.1 RDD的操作
image
上圖給出的是Spark支持的轉換函數與動作函數,方括號[]中的T代表元素類型。轉換函數用來生成RDD,而動作函數用來計算值或保存計算值到外部存儲。最大的特性是惰性執行,即只有第一個動作函數的執行,才會引起數據流真正的流動。
詳細解釋下這些函數。比如:
-Join: 必須兩個RDD都是鍵值對RDD;
-map:一對一匹配,輸入與輸出同數量,一條輸入產生一條輸出;
-flatMap:一對多匹配,輸入與輸出可不同數量,一條輸入產生多條輸出;
-groupByKey,reduceByKey,sort:自動產生一個哈希(hash)或范圍(range)分區
3.2 應用一,邏輯回歸
很多機器學習的算法都采用了迭代處理,使得最終算法更加優化。那么在迭代過程中,顯然能把之前的結果保留下來,重復使用,使得迭代時間更快。
比如,邏輯回歸,最常見的分類算法,用來計算最恰當的超平面分割線(比如區分垃圾郵件)。算法使用了梯度下降,從隨機數開始,每一次迭代更優化一次求值。
val?points?=?spark.textFile(...).map(parsePoint).persist()var?w?=?//random?initial?vectorfor(i?????p.x?*?(1/(1+exp(-p.y*(w?dot?p.x)))-1)*p.y????}.rduce((a,b)?=>?a+b)????w?-=gradient}把 points 固化在內存中,可以使得計算時間縮短 20倍左右。
3.2 應用二,PageRank
PageRank是知名的網頁排名(網頁影響力)算法。一個網頁被指向的次數越多,在搜索引擎中的排名越高。除了計算網頁影響力之外,還可以用來計算社交網絡中的影響力。
在計算過程中,每一次迭代更新,增加的是被指向網頁的權重。每一個帶有出鏈的網頁,都將帶給其出鏈網頁r/n的貢獻值,這些貢獻值的總計,就是出鏈網頁的排名。
a/N?+?(1-a)∑CiPageRank算法詳細解答,可看這里 https://www.cnblogs.com/jpcflyer/p/11180263.html
用Spark來計算PageRank,可以這么寫:
//?從源文件抽取RDD[URL,outlinks]val?links?=?spark.textFile(...).map(...).persist()var?ranks?=?//?RDD[URL,?rank]for(i?????????????links.map(dest?=>(dest,rank/links.size))????}????ranks?=?contribs.reduceByKey((x,y)?=>?x+y).mapValues(sum?=>?a/N?+?(1-a)*sum)}下圖是對這段代碼的譜系圖,每一次的迭代都會重新計算并生成ranks RDD.
image
從圖中很明顯的可以看出,ranks RDD的數量隨著link的增加而長度變得越來越長,當 ranks RDD 有一次失效(丟失或者故障)時,重新計算會耗時很多。因此,需要將這些中間步驟的ranks RDD保存或者另存副本,執行這個操作,可以使用 persist函數的 RELIABLE 開關。
計算中有一處Join,如果links, ranks的分區都在同一個節點上,那么計算并不需要通信節點,假如不巧的是同一URL,links,ranks的分區卻在不同的分區上,那通信成本就高了。所以控制links,ranks的分區就很講究,盡量(使用相同分區方式,比如hash分區)使得參加join的兩個分區都分配在同一個節點上。
控制分區的分配,也可以通過自定義分區類Partitioner,來完成:
links?=?spark.textFile(...).map(...).partitionBy(myPartFunc).persist()如果源文件在分布式系統比如hdfs上的分區,與 Spark 的分區不一致,在使用轉換函數前,一定會經過混洗(shuffle),這是最大的耗時。
4 RDDs的表達手法
在長串的轉換函數鏈條中,抽象地表現RDD的譜系,是非常困難的。從完美的角度來講,一個實現了RDD的系統,必須能提供一系列豐富的轉換函數,而且還要讓用戶自由的重組這些函數。Spark提供了圖化的RDD表現形式,達到了這些目的。
總之,RDD的表現方式,在Spark中是常用接口,涵蓋了5個方面的信息:
分區集合:
每個分區是最小的原子單位;
父RDD依賴:
每個子分區都依賴父分區;
轉換函數:
每個父分區只有通過轉換函數,才能生成子分區;
分區形式和分區數據地址:
分區形式(partitioning schema),即分區標準。比如按照銷售區域(華東,華北,華西,華南,華中)分區);分區數據地址(partition data placement),按照標準分好的區,數據應該保存到哪些節點上。比如以HDFS文件為數據源,并要以HDFS文件數據塊為分區,那么Spark創建RDD的時候,會從當前含有這些數據塊的節點上,直接創建RDD分區。倘若要在RDD上應用轉換函數,直接操作數據所在節點的本地內存即可,無需通過網絡傳輸,非常高效。
image
partitions():
查詢分區集合包含的所有分區;
preferredLocations(p):
根據數據歸屬地,查詢能迅速找到數據分區的所在節點地址;
dependencies():
查詢RDD的譜系圖;
iterator(p,parentIters):
基于給定的父RDD,查找對應子分區所有對象;
partitioner():
確定分區方法是hash還是range分區
設計RDD接口的有趣之處,在于如何去表達依賴關系。最終,獲得認可的有效方法是定義為兩類,一是窄依賴(narrow dependencies),二是寬依賴(wide dependencies) 。窄依賴是指父RDD頂多能產生一個子RDD,比如map;寬依賴指父RDD能產生多個子RDD,比如Join.
之所以這么區分寬窄依賴關系,有兩個原因:
1)窄依賴關系,使得父子分區可以在同一個節點上完成轉換,比如map,filter;而寬依賴關系,則需要所有上層分區都同時存在,且大概率是要從不同的數據分區,抽取數據到一個分區或多個分區進行計算,這個過程稱之為 shuffle, shuffle是 Spark 最具有破壞性能的操作。
2)故障恢復:窄依賴的數據分區如果故障了,只要從上層的RDD分區重新生成,而且就在本地即可高效完成,就算是多個分區損壞,也可以并行完成恢復;但寬依賴關系就需要多個RDD分區聯合執行恢復,不亞于重新執行Spark程序。
image
最有意思的地方是Join操作。父RDD分區的方法決定了子RDD生成的方式,比如父RDD按照hash來分區,Join的時候,就不需要shuffle了。
5 Spark系統實現
Spark是以Scala寫就的,總共有14000行代碼(初始化版本,現在不止)。Spark程序運行在 Mesos 集群管理器上,但也可與 Hadoop, MAPI等做互連,利用Hadoop提供的輸入接口插件,讀取HDFS,HBase的數據。每個Spark程序作為一個單獨應用運行在Mesos上,程序間的交互由Mesos處理。一個完整的Spark程序由Driver和Worker組成,Driver是主程,用來協調和收集各個Worker的工作。
接下來,主要闡述系統調度器,交互式程序解釋器,內存管理和checkpointing技術。
5.1 任務調度
image
總體來說,任務調度器(scheduler)按照 driver, workder 中的程序,在集群中分配任務。上圖是經典的有向無環圖(DAG),每一步都是在生成一個新的RDD,只有第一個作用在RDD上的動作函數開始時,正式的數據流才開啟。圖中矩形框代表一個RDD,有背景色(不管藍黑)的矩形代表一個分區,黑色代表該分區是持久化駐留在內存中的。
持久化駐留,只在當前程序中生效,一旦程序執行完畢,還是銷毀,其他程序不能訪問。
任務調度器最有特點的功能在于它對數據歸屬非常敏感。如果程序需要的RDD分區數據在某臺節點的內存里,任務就優先分發到那臺節點上;如果集群中所有內存都沒有需要的分區數據,任務調取器則會根據RDD提供的優選地址,將任務分配到那些節點上。
窄依賴的RDD譜系比較簡單,每次分區失效都可以高效重生,但寬依賴的RDD在恢復時就比較復雜,需要所有父RDD都存在,若父RDD也失效了,則需要更上層的RDD,依次類推,直到源RDD全部重生,才能恢復當前RDD,程序才能進行下去。所以寬依賴RDD通常會在產生時,將其所有父RDD都物化下來,以使得恢復時更快。
如果任務執行失敗,原因有很多,內存不夠,機器故障等等,任務調度器會安排另一臺節點來繼續執行失敗的任務,只要父RDD都還存在。若父RDD失效了,也沒關系,根據圖譜自動再生成這些父RDD即可。但若任務調度器失敗,則整個程序就是失敗,并不會重新自動跑起來。
目前Spark的程序設計,都是在針對RDD的動作做響應式啟動執行,當然另一種嘗試也是有意義的,那就是針對動作中涉及的RDD,一步步往前推,少了什么RDD,根據圖譜去生成。這種想法暫時還只是處于試驗階段。
5.2 集成的解釋器(Interpreter Integration)
Spark計算框架允許用戶在Scala提供的解釋器窗口(與Python,Ruby類似的解釋器窗口),交互式的利用大數據集群提供的算力,查詢和操控大規模數據庫集。交互式操作,即一次運算表達式,可以操作數千臺計算機的計算資源,并且得益于集群內存計算模式,而非MapReduce借助硬盤的低效模式,以低延遲的方式得到該步計算的結果。
看以下簡單代碼,一窺Scala編程的不同:
var?x?=?5?;println(x);每一行Scala代碼,會被解釋為單行類,執行時,實際上運行的便是這單行類的賦值或者函數調用。
因此上面這兩行代碼,可以解釋為:
println(Line1.getInstance().x)Line1 就是將單行代碼抽象為一個類并實例化后的結果對象。
實際上,我覺得更確切的說,應該是 Line2.getInstance().println(Line1.getInstance().x).但原論文并沒有這么解釋
最神秘的事情,并不是scala獨特的解釋器特性,而是Spark如何分發scala程序。就拿上面兩行代碼來說,Spark把這兩行代碼,分發到了1000臺計算機上,并行地跑了一次批處理,得到最終結果,且中間有任何機器故障,都沒有影響到程序的執行和結果的正確。
因此,探索Spark如何完成這整個執行過程就變得非常有意義。事實上,Spark解釋器就暗藏了答案:
1)類運送(class shipping): 為了讓每個工作節點(workder node)都能得到可執行代碼字節(bytecode),scala提供的解釋器,就負責為這些節點提供類運送,且是通過http傳送的方式.
為什么 http 傳送方式在這里會被指定為傳送協議,值得思考!
2)改變代碼的產生方式(modified code generation):讓所有的工作節點(worker node)都得到相同的程序代碼,最大的問題是同時傳送閉包引用的上下文,包括閉包中引用的變量。如果變量是在閉包之前定義的,工作節點上的Java就無法定位閉包之前的變量。所以改變代碼的產生方式就解決了這一點,也就是為什么每一行 Scala代碼要被解釋為當行類,這行里定義的變量或方法,在閉包中引用時,會被追溯到變量或方法定義的單行類,從而這些單行類會被遺棄運送到工作節點上。
在實際的業務應用場景里,在交互式解釋器中查詢大規模數據集,比如從HDFS上分析日志文件,非常實用。后期加入的 Spark SQL 更是將 Spark 的分布式計算能力擴大化到極致,普惠了每個數據分析師。
image
上面的示意圖,很好地解釋了單行類的同步運送,對于工作節點的意義。當閉包中引用了上行的變量,則需要將上行封裝成一個類實例,同時運送到其他節點。
5.3 內存管理
Spark 為 RDD 提供了三種存儲格式:
訪問速度從快到慢,即第一種方式最快,無需任何轉換就可以被自由訪問。最后一種最慢,因每次使用,需從硬盤抽取數據,有不必要的IO開銷
當內存吃緊,新建的RDD分區沒有足夠內存存儲時,Spark會采用回收分區方式,以給新分區提供空間。除非新的分區和要回收的老分區在同一個RDD。回收機制采用的是常規LRU(Least Recently Used)算法,即最近最少使用的算法。這套回收機制很有用,至少目前來說是。但分權機制也很有用,比如設定RDD的權限等級,控制RDD分區被回收的可能性。
5.4 支持 checkpointing
checkpointing的技術本質是為長鏈操作尤其是依賴寬關系的計算做結果緩存。
長鏈操作:經由一系列轉換操作得來的RDD,在故障之后,恢復需要經歷同樣多步驟,會導致時間過多的消耗,這就是長鏈操作。
實現checkpointing的api是persist的replicate開關,即:
rdd.persist(REPLICATE)通過將數據暫存至穩定的存儲設備,以防備RDD失效后的重算。
checkpointing的決策是留給用戶的,但也可以做成自動化。在保障數據一致性角度看,自動在RDD創建成功后保留一份副本,不會引起數據不一致的尷尬,看起來是件一勞永逸的事情。為什么不這么做呢?我想這其中涉及的一個判斷是,是否有足夠的必要去消耗原本應該留給其他Spark程序的資源,來保障僅有百萬分之一的可能會丟掉的分區。
6 性能評估
Spark 在性能方面的出眾,對標物是Hadop,以下是基于 Amazon EC2做出的4相對比數據:
1)在圖運算和迭代機器學習方面,優先Hadoop 20倍速度。性能的提高得益于無需硬盤I/O,且在內存中的Java對象計算,沒有序列化和反序列化的開銷
2)性能與擴展性都很好。單測一張分析報表,就比Hadoop提高了40倍性能
3)當有節點故障時,Spark能自動恢復已丟失的分區
4)查詢1TB的數據,延遲僅在5-7秒
image
7 一些討論
學習一門技術,就要徹底了解其歷史,知其應用。從這些應用著手,由點到面的知悉這門技術的優勢。而不至于學得茫然而不知所措。
7.1 囊括眾多集群編程模式
當年Spark發明的時候,市面上有很多獨立的軟件解決方案,來完成大規模數據應用。這些獨立的解決方案僅僅是某類應用中的佼佼者,換個場景,效果就沒那么突出了。Spark的出現,統一了這些獨立的軟件解決方案,使得用戶只需Spark一個框架,即可完成原本需要4-5個獨立解決方案才能解決的問題。
因此,首先就要討論Spark出現之前,市面上有哪些應用:
1) MapReduce
2) DryadLINQ
3) SQL
4) Pregel
5) Iterative MapReduce
6) Batched Stream Processing
這些應用就不再過多闡述了,Spark 將他們集成起來,提供方便的api供使用,原本這些技術的細節就不用深究了。
***7.2 RDD調試 ***
在分區故障時,如何快速恢復是個痛點。依賴RDD的譜系圖,可以保障分區故障后的數據一致性。記錄RDD的譜系圖,對于程序的健壯性變得非常重要。與先前的分布式系統調試器,最大的優勢在于,不需要記錄每個事件在不同節點上的執行順序。
8 其他相關工作進展
集群編程模式:在Spark出現之前,大規模利用集群計算資源處理數據應用已經有成熟的方案了,比如MapReduce,Dryad和Ciel. 這些方案靠的是移動硬盤數據來實現分布式進程之間的數據共享。Spark出現之后,數據共享有了新的突破,雖然穩定的存儲依舊可以使用,但更多利用了高效的存儲,實現了無盤(不需要借助硬盤)計算,之前借盤運算的開銷,比如序列化,反序列化和刻錄副本都可以去掉。
第二種高級編程語言的集群編程模式,就像 DryadLINQ 和 FlumeJava, 提供了語言集成的編程接口(API),用戶需要調用集群處理大規模數據時,只要使用這些高級語言提供的編程接口,比如map, join 即可。這些系統唯一的缺點在于,他們無法把數據高效方便地共享到下一個查詢中去,只能在同一個查詢中,比如map接著一個map中,共享數據流。Spark 實現的 RDD,借用了同樣的編程語言集成接口,僅僅是完成一次分布式數據的抽象,就完美的實現了在多個查詢中共享數據流。
第三種集群編程模式,采用的是特殊高級接口定制,采用這種定制支持特定的應用,比如圖運算和迭代計算。Pregel 系統支持迭代圖計算,而 Twister 和 HaLoop 則是迭代的MapReduce計算運行時刻庫。他們都不支持通用計算,比如建立數據集,裝載到內存中,使用任何方式去查詢這份數據集。而Spark使用的是分布式數據抽象,基于抽象做出靈活的操作標準,因此類似及時分析這樣的操作,完全受到Spark的支持。
最后,有些分布式系統,比如Piccolo, 分布式共享內存(DSM)系統和鍵值對系統都采取的是共享可變狀態集。用戶既可以讀也可以寫入這些共享內存。由于系統狀態可變,可被更新,只有依靠checkpoint技術才能保障數據完整性,一致性,因此開銷會比Spark多很多。
緩存系統:Nectar 系統可以在任意的 DryadLINQ應用程序之間共享中間數據集,實現的方法是將數據集輸出到穩定的存儲設備上,而不是內存。并且Nectar也不允許用戶傾倒指定的分區,連分區方法也不受用戶控制。Ciel和FlumeJava提供結果緩存,但不支持用戶自定義緩存內容。
譜系圖: 在科學計算和數據庫領域,譜系圖或源數據管理一直是重點研究對象。一旦數據丟失,從從源頭開始重新計算是最慢的一項恢復操作,如果自動修復能從丟失的上一級開始追溯,那是最快的。很多系統能保障斷點恢復,但所用的措施卻是耗時耗資源最多的構建副本方法。而譜系圖在單個MapReduce任務之后,被丟失的無影無蹤。
關系型數據庫: 在數據庫中,視圖就像是RDD,物化視圖就像是持久化的RDD,但數據庫在更新這些對象時,都需要做日志登記的操作,有些類似構建副本的方法,開銷巨大。
總結
以上是生活随笔為你收集整理的spark入门_入门必读 | Spark 论文导读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信视频号用户总使用时长接近朋友圈 80
- 下一篇: 数据库主键的自动增长之总结