spark如何防止内存溢出_Spark 理论基石 —— RDD
概述
RDD,學名可伸縮的分布式數(shù)據(jù)集(Resilient Distributed Dataset)。是一種對數(shù)據(jù)集形態(tài)的抽象,基于此抽象,使用者可以在集群中執(zhí)行一系列計算,而不用將中間結(jié)果落盤。而這正是之前 MR 抽象的一個重要痛點,每一個步驟都需要落盤,使得不必要的開銷很高。
對于分布式系統(tǒng),容錯支持是必不可少的。為了支持容錯,RDD 只支持粗粒度的變換。即,輸入數(shù)據(jù)集是 immutable (或者說只讀)的,每次運算會產(chǎn)生新的輸出。不支持對一個數(shù)據(jù)集中細粒度的更新操作。這種約束,大大簡化了容錯支持,并且能滿足很大一類的計算需求。
初次接觸 RDD 的概念的時候,不大能夠理解為什么要以數(shù)據(jù)集為中心做抽象。后來隨著不斷深入的了解,對數(shù)據(jù)集的一致性抽象正是計算流水線(pipeline)得以存在和優(yōu)化的精髓所在。在定義了數(shù)據(jù)集的基本屬性(不可變,分區(qū),依賴關(guān)系,存放位置等)后,就可以在此基礎上施加各種高階算子,以構(gòu)建 DAG 執(zhí)行引擎,并做適當優(yōu)化。從這個角度來說,RDD 實在是一種精妙設計。
例行總結(jié)一下 RDD 論文的主要設計點有:
作者:青藤木鳥 Muniao's blog 轉(zhuǎn)載請注明出處
小引
Dryad 和 MapReduce 是業(yè)已流行的大數(shù)據(jù)分析工具。它們給用戶提供了一些高階算子來使用,而不用去關(guān)心底層的分布式和容錯細節(jié)。但它們都缺少對分布式內(nèi)存的抽象,不同計算過程之間只能夠通過外存來耦合:前驅(qū)任務將計算結(jié)果寫到外存上去,后繼任務再將其作為輸入加載到內(nèi)存,然后才能接著執(zhí)行后繼計算任務。這樣的設計有兩個很大的劣勢:復用性差、延遲較高。這對于像 Page-Rand,K-Means,LR 等要求迭代式計算的機器學習算法(需要數(shù)據(jù)復用)極其不友好;對于一些隨機的交互式查詢(要求延遲低)也是個災難。因為他們將大部分的時間都耗費在數(shù)據(jù)備份、硬盤 IO 和數(shù)據(jù)序列化之上。
在 RDD 之前,為了解決數(shù)據(jù)復用的問題,業(yè)界已有諸多嘗試。包括將中間結(jié)果放在內(nèi)存中的迭代式圖計算系統(tǒng)——Pregel,以及將多個 MR 串在一塊,緩存循環(huán)不變量的 HaLoop。但這些系統(tǒng)只支持受限的計算模型(比如MR),而且只進行隱式[1]的數(shù)據(jù)復用。如何進行更通用的數(shù)據(jù)復用,以支持更復雜的查詢計算,仍是一個難題。
RDD 正是為解決這個問題而設計,高效地復用數(shù)據(jù)的一個數(shù)據(jù)結(jié)構(gòu)抽象。RDD 支持數(shù)據(jù)容錯、數(shù)據(jù)并行;在此之上,能夠讓用戶利用多機內(nèi)存、控制數(shù)據(jù)分區(qū)、構(gòu)建一系列運算過程。從而解決很多應用中連續(xù)計算過程對于數(shù)據(jù)復用的需求。
其中比較難的一個設計是如何針對內(nèi)存數(shù)據(jù)進行高效的容錯。現(xiàn)有的一些基于集群內(nèi)存的系統(tǒng),比如分布式KV、共享內(nèi)存、Piccolo 都提供一種可以細粒度的修改的可變數(shù)據(jù)集抽象。為了支持這種抽象之上的容錯,就需要進行數(shù)據(jù)多機冗余或者操作日志備份。這些操作都會導致多機間大量的數(shù)據(jù)傳輸,由于網(wǎng)絡帶寬遠慢于 RAM,使得分布式利用內(nèi)存這件事失去其優(yōu)勢。
與之相對,RDD 只提供粗粒度的、基于整個數(shù)據(jù)集的計算接口,即數(shù)據(jù)集中的所有條目都施加同一種操作。這樣一來,為了容錯,我們只需要備份每個操作而非數(shù)據(jù)本身(因為是整體更新的);在某個分區(qū)數(shù)據(jù)出現(xiàn)問題進行錯誤恢復時,只需要從原始數(shù)據(jù)集出發(fā),按順序再算一遍即可。
初看起來,這種計算抽象很受限,但它其實能滿足現(xiàn)有的一大類的集群計算需求,包括 MR、 DryadLINQ、 SQL、Pregel 和 HaLoop。并且能滿足一些其他計算需求,比如說交互式計算。RDD 的實現(xiàn)系統(tǒng) Spark,提供類似 DryadLINQ 的高階算子,應該是第一個提供交互式的集群運算接口。
RDD
本節(jié)首先給出 RDD 的詳細定義,然后介紹下 Spark 的中針對 RDD 的操作接口,繼而對比了 RDD 與提供細粒度更新接口的共享內(nèi)存抽象優(yōu)劣。最后就 RDD 的局限性討論一下。
RDD 抽象
RDD 是一個基于分區(qū)的、只讀的數(shù)據(jù)記錄集抽象。RDD 只可以通過對持久存儲或其他 RDD 進行確定性運算得來,這種運算被稱為變換。常用的變換算子包括:map,filter 和 join。
RDD 沒有選擇不斷的做檢查點以進行容錯,而是會記下 RDD 從最初的外存的數(shù)據(jù)集變化而來的變化路徑,也就是其譜系(lineage)。理論上所有的 RDD 都可以在出錯后從外存中依據(jù)譜系圖進行重建。一般來說,重建的粒度是分區(qū)(Partition)而非整個數(shù)據(jù)集,一來代價更小,二來不同分區(qū)可能在不同機器上。
用戶可以對 RDD 的兩個方面進行控制:持久化和分區(qū)控制。對于前者,如果某些 RDD 需要復用,那么用戶可以指示系統(tǒng)按照某種策略將其進行持久化。后者來說,用戶可以定制分區(qū)路由函數(shù),將數(shù)據(jù)集合中的記錄按照某個鍵值路由到不同分區(qū)。比如進行 Join 操作的時候,可以講待 Join 數(shù)據(jù)集按照相同的策略進行分區(qū),以并行 Join。
Spark 編程接口
Spark 通過暴露與編程語言集成的算子來提供操作 RDD 的接口。 其中 RDD 表現(xiàn)為編程語言中的類,而 RDD 的算子為作用于這些類上的函數(shù)。之前的系統(tǒng)如 DryadLINQ 和 FlumeJava 也使用了類似的形式。
用戶使用 RDD 時,首先將數(shù)據(jù)從持久化存儲中通過變換(Transformations,如 map 或者 filter)將其載入內(nèi)存,然后可以對 RDD 施加任何系統(tǒng)支持的一系列變換,最后利用動作(Action)算子,將 RDD 重新持久化到外存中或者將控制權(quán)交還用戶。和 DryadLINQ 一樣,這個加載-變換-落盤的過程是聲明式(Declarative,或者說是惰式[2])的,Spark 在拿到整個拓撲后會利用執(zhí)行引擎進行執(zhí)行優(yōu)化(比如將并行化、流水線化,之后會進一步討論)。
此外很重要的一個接口是 persist,可以由用戶來告訴系統(tǒng)哪些 RDD 需要持久化,如何持久化(本機硬盤還是跨機器存儲),如果有多個 RDD 需要持久化,那么優(yōu)先級如何確定。Spark 默認將 RDD 保存在內(nèi)存中,如果內(nèi)存不夠用了會根據(jù)用戶配置將數(shù)據(jù)溢出(spill)到硬盤上。
舉個栗子
假設我們相對存在于 HDFS 上的日志文件,找出錯誤條目,針對出現(xiàn) hdfs 關(guān)鍵字的具體條目進行分析。利用 Spark 接口,使用 Scala 語言實現(xiàn),代碼如下:
lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR")) errors.persist() ? // Return the time fields of errors mentioning // HDFS as an array (assuming time is field // number 3 in a tab-separated format): errors.filter(_.contains("HDFS")).map(_.split(’t’)(3)).collect()第一行基于某個 hdfs 上的文件定義一個 rdd(每一行作為集合中的一個條目)。第二行通過 filter 變換生成新的 rdd,第三行請求 spark 將其結(jié)果進行暫存。最后一行是鏈式操作,以一個 collect 的動作結(jié)尾,求出包含 HDFS 關(guān)鍵字的所有行數(shù)的各個字段。
其計算的譜系圖(lineage)如下:
有兩點需要注意:
由于第三行將結(jié)果在內(nèi)存中進行了緩存,因此還可以基于此做其他動作。比如,計算包含 'MySQL' 關(guān)鍵字的錯誤條數(shù):
// Count errors mentioning MySQL: errors.filter(_.contains("MySQL")).count()RDD 模型的優(yōu)點
為了理解 RDD 帶來的好處,可以看下面一個表,將 RDD 與 DSM (Distributed Shared Memory)做了詳細對比。DSM 在這里是一個很寬泛的抽象,不僅包括一般的內(nèi)存共享系統(tǒng),還包括其他支持細粒度的狀態(tài)更新的框架,比如說 Piccolo、分布式數(shù)據(jù)庫等。
首先, DSM 和 RDD 最主要的區(qū)別在于,DSM 支持對數(shù)據(jù)集細粒度的更新。即,可以對任意內(nèi)存位置進行更新。而 RDD 舍棄了這一點,只允許批量的寫入數(shù)據(jù),從而提高了容錯效率:
其次,RDD 的不可變的特點允許系統(tǒng)叫較容易的對某些計算進行遷移。比如說 MR 中的某些 Stragger 任務就可以很方便的遷移到其他計算節(jié)點上去,因為其輸入數(shù)據(jù)一定不會被改變,因此不用考慮一致性的問題。
最后還有兩個好處值得一提:
RDD 不適用的場景
如前所述,RDD 適用于針對全數(shù)據(jù)集統(tǒng)一處理的粗粒度變換的抽象。相對的,就不適用于要求對數(shù)據(jù)進行細粒度的、異步更新的數(shù)據(jù)集。比如說 web 應用,再比如說爬蟲等等。對于這些引用類型,傳統(tǒng)的快照+操作日志的容錯方式可能更適合一些。如數(shù)據(jù)庫 RAMCloud , Percolator 和 Piccolo。 RDD 的目標在于批量分析型應用,而將這些異步應用的需求留給那些專有系統(tǒng)。
Spark 編程接口
Spark 利用 Scala 語言作為 RDD 抽象的接口,因為 Scala 兼顧了精確(其函數(shù)式語義適合交互式場景)與高效(使用靜態(tài)類型)。當然,對于 RDD 本身來說,不限定于任何特定的語言表達。下面從執(zhí)行流程與代碼分發(fā)兩個方面來詳細說明下 Spark 是如何執(zhí)行用戶代碼的。
開發(fā)者利用 Spark 提供的庫編寫驅(qū)動程序 (driver programe)以使用 Spark。驅(qū)動程序會定義一到多個 RDD,并對其進行各種變換。Spark 提供的庫會連接 Spark 集群,生成計算拓撲,并將拓撲分散到多個 workers 上去進行執(zhí)行,同時記下變換的譜系(lineage)。這些 workers 是分散在 Spark 集群內(nèi)各個機器上的常駐進程,它們在內(nèi)存里保存計算過程中生成的 RDD 的各個分區(qū)。
像前面舉的例子一樣,開發(fā)者需要將函數(shù)作為參數(shù)傳給 map 等 Spark 算子。Spark 會將這些函數(shù)(或者說閉包)序列化為 Java 對象,然后分發(fā)給執(zhí)行節(jié)點進行加載。閉包所涉及的變量會被當做上述生成對象的字段值。RDD 本身會被包裝成靜態(tài)類型的參數(shù)進行傳遞。由于 Scala 支持類型推斷,大部分例子都省掉了 RDD 數(shù)據(jù)類型。
盡管 Spark 暴露的 Scala 的 RDD 接口在概念上看起來很簡單,但實在實現(xiàn)上有一些很臟的角落,比如說 Scala 的閉包得使用反射, 比如說盡量避免修改 Scala 的解釋器。
Spark 中的 RDD 操作
下表列出了 Spark 中支持的 RDD 操作。如前面所說,變換(transformations)是生成新 RDD 的惰性算子,而動作(actions)是觸發(fā)調(diào)度的算子,它會返回一個結(jié)果或者將數(shù)據(jù)寫到外存中。
需要注意的是:
RDD 的表示
提供 RDD 抽象的一個難點在于,如何高效的跟蹤譜系并能提供豐富的變換支持。最后我們選用了基于圖的調(diào)度模型,將調(diào)度和算子進行了解耦。從而能夠在不改變調(diào)度模塊邏輯的前提下,很方便的增加算子支持。具體來說,RDD 抽象的核心組成主要有以下五個部分:
在 RDD 的接口設計中最有趣的一個點是如何對 RDD 間的依賴關(guān)系進行規(guī)約。最后發(fā)現(xiàn)可以將所有依賴歸納為兩種類型:
如此歸納的原因主要有兩點。
調(diào)度優(yōu)化。對于窄依賴,可以對分區(qū)間進行并行流水化調(diào)度,先計完成某個窄依賴算子(比如說 map)的分區(qū)不用等待其他分區(qū)而直接進行下一個窄依賴算子(比如 filter )的運算。與之相對,寬依賴的要求父 RDD 的所有分區(qū)就緒,并進行跨節(jié)點的傳送后,才能進行計算。類似于 MapReduce 中的 shuffle。
數(shù)據(jù)恢復。在某個分區(qū)出現(xiàn)錯誤或者丟失時,窄依賴的恢復更為高效。因為涉及到的父分區(qū)相對較少,并且可以并行恢復。而對于寬依賴,由于依賴復雜(如上圖,子 RDD 的每個分區(qū)都會依賴父 RDD 的所有分區(qū)),一個分區(qū)的丟失可能就會引起全盤的重新計算。
這樣將調(diào)度和算子解耦的設計大大簡化了變換的實現(xiàn),大部分變換都可以用20余行代碼來實現(xiàn)。由于不需要了解調(diào)度細節(jié),任何人都可以很快的上手實現(xiàn)一個新的變換。試舉幾例:
HDFS 文件:partitions 函數(shù)返回 HDFS 文件的所有 block,每個 block 被當做一個 partition。 preferredLocations 返回每個 block 所在的位置,Iterator 會對每個 block 進行讀取。
map:在任意 RDD 上調(diào)用 map 會返回一個 MappedRDD 對象,該對象的 partitions 函數(shù)和 preferredLocations 與父 RDD 保持一致。對于 iterator,只需要將傳給 map 算子的函數(shù)以此作用到其父 RDD 的各個分區(qū)即可。
union: 在兩個 RDD 上調(diào)用 union 會返回一個新的 RDD,該 RDD 的每個分區(qū)由對應的兩個父 RDD 通過窄依賴計算而來。
sample:抽樣函數(shù)和 map 大體一致。但該函數(shù)會給每個分區(qū)保存一個隨機數(shù)種子來決定父 RDD 的每個記錄是否保留。
join:在兩個 RDD 上調(diào)用 join 操作可能會導致兩個窄依賴(比如其分區(qū)都是按待 join 的key 哈希的),兩個寬依賴,或者混合依賴。每種情況下,子 RDD 都會有一個 partitioner 函數(shù),或繼承自父分區(qū),或是默認的hash 分區(qū)函數(shù)。
實現(xiàn)
Spark 最初版本(論文里提到的),只有 1.4w 行 Scala 代碼,由 mesos 管理資源分配,可以和 Hadoop 生態(tài)共用資源,并從 Hadoop/Hbase 中加載數(shù)據(jù)。對于 Spark 的實現(xiàn),有幾個值得一說的點: Job 調(diào)度,交互式解釋器,內(nèi)存管理和檢查點機制(checkpointing)。
Job 調(diào)度
Spark 調(diào)度設計依賴于上一節(jié)提到的 RDD 的抽象。它的調(diào)度策略和 Dryad 有點像,但又不盡相同。在用戶在某個 RDD 上調(diào)用 Action 類型(count,save 等等)的算子時,調(diào)度器就會根據(jù)用戶代碼中調(diào)用算子的順序生成計算拓撲。我們把每一個變換前后的 RDD 當做點,算子產(chǎn)生的 RDD 間的依賴/父子關(guān)系當做邊,如此構(gòu)成一個有向無環(huán)圖(DAG)。為了減小傳輸,調(diào)度器會將幾個連續(xù)的計算進行歸并,稱為階段(Stage),進行階段歸并的依據(jù)為是否需要 shuffle,也即是否為寬依賴。這樣,會形成一個新的由階段組成的更精簡的 DAG。
之后,調(diào)度器會從目標 RDD 出發(fā),沿著 DAG 圖中的邊往前遍歷,對每個不在內(nèi)存中的分區(qū)進行計算。如果需要計算的分區(qū)已經(jīng)在內(nèi)存中了,則直接利用結(jié)果即可,如上圖所示。
然后,調(diào)度器會將任務調(diào)度到離其依賴 RDD 的 Partition 近的地方去:
對于寬依賴,Spark 和 MR 一樣,會將其中間結(jié)果輸出持久化起來,以簡化容錯。如果某個 Stage 的父 RDD 不可用,調(diào)度器就會新提交一些并行運行的任務,來生成這些缺失的分區(qū)。不過現(xiàn)在 Spark 還不能對調(diào)度器本身故障進行恢復,雖然看起來對 RDD 的譜系圖進行冗余備份或許是一個簡單可行的方案。
最后,現(xiàn)在仍是由用戶 Driver 程序調(diào)用 Action 算子來觸發(fā)調(diào)度任務。但我們正在探索維持一些周期性的檢查性任務,對 RDD 中某些缺失的分區(qū)進行補足。
解釋器集成
像 Python 和 Ruby 一樣,Scala 提供交互式的 shell 環(huán)境。由于 Spark 將數(shù)據(jù)保存在內(nèi)存中,我們希望可以借助 Scala 的這個交互式環(huán)境讓用戶對大數(shù)據(jù)集進行交互式實時的查詢。
Scala 的解釋器對用戶代碼進行解釋執(zhí)行的通常做法是,將用戶鍵入的每一行 Scala 命令編譯成一個 Java Class 字節(jié)碼,然后將其加載到 JVM 中。該類包含一個初始化過的單例實例,實例中包含用戶定義的變量和函數(shù)。比如,用戶輸入:
var x = 5 println(x)Scala 解釋器會針對第一行生成一個叫做 Line1 的類,其中有一個 x 的字段,并且將第二行編譯為:println(Line1.getInstance().x)
為了讓 Scala 解釋器能在分布式環(huán)境運行,我們在 Spark 中對其進行了以下修改:
下圖反映了我們修改后的 Scala 解釋器生成 Java 對象的過程:
我們發(fā)現(xiàn)解釋器在對大型數(shù)據(jù)集進行交互式查詢時很有幫助,我們計劃對更高級的查詢語言進行支持,如 SQL。
內(nèi)存管理
Spark 提供了三種存儲 RDD 的方式:
由于 Spark 跑在 JVM 上,因此第一種存儲方式訪問最快,第二種允許用戶犧牲一點性能以換取更高效的內(nèi)存利用。當數(shù)據(jù)尺度太大以至于內(nèi)存裝不下的時候,第三種方式很有用。
為了有效的利用有限的內(nèi)存,我們在 RDD 分區(qū)級別上進行 LRU 式的驅(qū)逐策略。即,當我們新計算出一個 RDD 的分區(qū)時,如果發(fā)現(xiàn)內(nèi)存不夠用,就會從內(nèi)存中驅(qū)逐出去一個最久沒有使用過的 RDD 的分區(qū)。但是,如果這個最久沒有使用過的分區(qū)和新計算出的分區(qū)屬于同一個 RDD,我們會接著尋找,直到找到一個和當前分區(qū)不屬于一個 RDD 并且最久沒用過的分區(qū)。因為 Spark 的大部分計算會施加于整個 RDD 上,這樣做可以防止這些分區(qū)被反復的計算-驅(qū)逐。這個策略在論文成文時用的很好,不過,我們?nèi)匀惶峁┙o了用戶進行深度控制的接口——指定存儲優(yōu)先級。
現(xiàn)在每個 Spark 實例擁有自己的分立的內(nèi)存空間,我們計劃將來提供跨 Spark 實例的統(tǒng)一的內(nèi)存管理。
檢查點機制
盡管所有失敗的 RDD 都可以通過譜系(lineage)來重新計算得出,但是對于某些譜系特別長的 RDD 來說,這將是一個很耗時間的操作,因此提供 RDD 級別的外存檢查點(checkpointing)可能會很有用。
對于具有很長譜系圖,并且譜系圖中存在很多寬依賴的 RDD,在外存上做檢查點會很有幫助,因為某一兩個的分區(qū)可能會引起全盤的重算,對于這種又臭又長的計算拓撲來說,依據(jù)譜系圖重算無疑非常浪費時間。而對于只有窄依賴的、不那么長譜系圖來說,在外存做檢查點可能有些得不償失,因為他們可以很簡單的并行計算出來。
Spark 現(xiàn)階段提供檢查點的 API (給 persist 函數(shù)傳 REPLICATE 標志),然后由用戶來決定是否對其持久化。但我們在思考,是否可以進行一些自動的檢查點計算。由于調(diào)度器知道每個數(shù)據(jù)集的內(nèi)存占用以及計算使用時間,我們或許可以選擇性的對某些關(guān)鍵 RDD進行持久化以最小化宕機恢復時間。
最后,由于 RDD 的只讀特性,我們在做檢查點時不用像通用共享內(nèi)存模型那樣過分考慮一致性的問題,因此可以用后臺線程默默地干這些事情而不用影響主要工作流,也不用使用復雜的分布式的快照算法來解決一致性問題。
注解
[1] 隱式與顯示顯式:在這里可以理解為,顯式是把數(shù)據(jù)集這個概念完整的構(gòu)造出來,定義他的內(nèi)涵和邊界,并基于其上做一些外延拓展。而隱式只是事實上復用了數(shù)據(jù),但并沒有定義被復用的數(shù)據(jù)格式。
[2] 聲明式(Declarative)語言與命令式(Imperative)語言:前者例子有 SQL,HTML;后者例子最常見的有 Shell,其他的常見編程語言 C,Java,Python 也屬于此列。前者的好處在于將"干什么"和"怎么干"這兩件事解耦,這樣一來就可以開發(fā)不同的執(zhí)行引擎,針對不同場景來優(yōu)化"怎么干"這件事。而后者會告訴機器以特定的順序執(zhí)行特定的操作,與直覺一致,是一般編程語言的路子。
掃一掃關(guān)注我的公眾號:分布式點滴
總結(jié)
以上是生活随笔為你收集整理的spark如何防止内存溢出_Spark 理论基石 —— RDD的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis setnx 分布式锁_Spr
- 下一篇: string 中的offset_Kafk