spark性能优化 -- spark工作原理
從本篇文章開始,將開啟 spark 學(xué)習(xí)和總結(jié)之旅,專門針對(duì)如何提高 spark 性能進(jìn)行總結(jié),力圖總結(jié)出一些干貨。
無論你是從事算法工程師,還是數(shù)據(jù)分析又或是其他與數(shù)據(jù)相關(guān)工作,利用 spark 進(jìn)行海量數(shù)據(jù)處理和建模都是非常重要和必須掌握的一門技術(shù),我感覺編寫 spark 代碼是比較簡(jiǎn)單的,特別是利用 Spark SQL 下的 DataFrame 接口進(jìn)行數(shù)據(jù)處理,只要有 python 基礎(chǔ)都是非常容易入門的,但是在性能調(diào)優(yōu)上,許多人都是一知半解,寫的 spark 程序經(jīng)常陷入 OOM 或卡死狀態(tài)。這時(shí)深入了解 spark 原理就顯得非常有必要了。
本系列總結(jié)主要針對(duì) Hadoop YARN 模式。
RDD(Resilient Distributed Datasets)
RDD 是 spark 中最基本的數(shù)據(jù)抽象,存儲(chǔ)在 exector 或 node 中,它代表一個(gè) “惰性,”“靜態(tài)”,“不可變”,“分布式“的數(shù)據(jù)集合,RDD 基本介紹在網(wǎng)上上太多了,這里就不做詳細(xì)介紹了,主要講下以下內(nèi)容:
transform(轉(zhuǎn)換)與 action(執(zhí)行)的區(qū)別
轉(zhuǎn)換操作:返回的是一個(gè)新的 RDD,常見的如:map、filter、flatMap、groupByKey 等等 執(zhí)行操作:返回的是一個(gè)結(jié)果,一個(gè)數(shù)值或者是寫入操作等,如 reduce、collect、count、first 等等
惰性計(jì)算
spark 中計(jì)算 RDD 是惰性的,也即 RDD 真正被計(jì)算(執(zhí)行操作,例如寫入存儲(chǔ)操作、collect 操作等)時(shí),其轉(zhuǎn)換操作才會(huì)真正被執(zhí)行。spark 為什么采用惰性計(jì)算:
在 MapReduce 中,大量的開發(fā)人員浪費(fèi)在最小化 MapReduce 通過次數(shù)上。通過將操作合并在一起來實(shí)現(xiàn)。在 Spark 中,我們不創(chuàng)建單個(gè)執(zhí)行圖,而是將許多簡(jiǎn)單的操作結(jié)合在一起。因此,它造成了 Hadoop MapReduce 與 Apache Spark 之間的差異。
惰性設(shè)計(jì)的好處:
① 提高可管理性 可以查看整個(gè) DAG(將對(duì)數(shù)據(jù)執(zhí)行的所有轉(zhuǎn)換的圖形),并且可以使用該信息來優(yōu)化計(jì)算。
② 降低時(shí)間復(fù)雜度和加快計(jì)算速度 只運(yùn)算真正要計(jì)算的轉(zhuǎn)換操作,并且可以根據(jù) DAG 圖,合并不需要與 drive 通信的操作(連續(xù)的依賴轉(zhuǎn)換),例如在一個(gè) RDD 上同時(shí)調(diào)用 map 和 filter 轉(zhuǎn)換操作,spark 可以將 map 和 filter 指令發(fā)送到每個(gè) executor 上,spark 程序在真正執(zhí)行 map 和 filter 時(shí),只需訪問一次 record,而不是發(fā)送兩組指令并兩次訪問分區(qū)。理論上相對(duì)于非惰性,將時(shí)間復(fù)雜度降低了一半。例如:
val list1 = list.map(i -> i * 3) // Transformation1 val list2 = list1.map(i -> i + 3) // Transformation1 val list3 = list1.map(i -> i / 3) // Transformation1 list3.collect() // ACTION假設(shè)原始列表(list) 很大,其中包含數(shù)百萬個(gè)元素。如果沒有懶惰的評(píng)估,我們將完成三遍如此龐大的計(jì)算。如果我們假設(shè)一次這樣的列表迭代需要 10 秒,那么整個(gè)評(píng)估就需要 30 秒。并且每個(gè) RDD 都會(huì)緩存下來,浪費(fèi)內(nèi)存。使用惰性評(píng)估,Spark 可以將這三個(gè)轉(zhuǎn)換像這樣合并到一個(gè)轉(zhuǎn)換中,如下:
val list3 = list.map(i -> i + 1)它將只執(zhí)行一次該操作。只需一次迭代即可完成,這意味著只需要 10 秒的時(shí)間。
容錯(cuò)性
RDD 本身包含其復(fù)制所需的所有依賴信息,一旦該 RDD 中某個(gè)分區(qū)丟失了,該 RDD 有足夠需要重新計(jì)算的信息,可以去并行的,很快的重新計(jì)算丟失的分區(qū)。
運(yùn)行在內(nèi)存
在 spark application 的生命周期中,RDD 始終常駐內(nèi)存(在所在的節(jié)點(diǎn)內(nèi)存),這也是其比 MapReduce 更快的重要原因。
spark 中提供了三種內(nèi)存管理機(jī)制:
① in-memory as deserialized data 這種常駐內(nèi)存方式速度快(因?yàn)槿サ袅诵蛄谢瘯r(shí)間),但是內(nèi)存利用效率低。
② in-memory as serialized data 該方法內(nèi)存利用效率高,但是速度慢
③ 直接存在 disk 上 對(duì)于那些較大容量的 RDD,沒辦法直接存在內(nèi)存中,需要寫入到 DISK 上。該方法僅適用于大容量 RDD。要持久化一個(gè) RDD,只要調(diào)用其 cache()或者 persist()方法即可。在該 RDD 第一次被計(jì)算出來時(shí),就會(huì)直接緩存在每個(gè)節(jié)點(diǎn)中。而且 Spark 的持久化機(jī)制還是自動(dòng)容錯(cuò)的,如果持久化的 RDD 的任何 partition 丟失了,那么 Spark 會(huì)自動(dòng)通過其源 RDD,使用 transformation 操作重新計(jì)算該 partition。
cache()和 persist()的區(qū)別在于,cache()是 persist()的一種簡(jiǎn)化方式,cache()的底層就是調(diào)用的 persist()的無參版本,同時(shí)就是調(diào)用 persist(MEMORY_ONLY),將數(shù)據(jù)持久化到內(nèi)存中。如果需要從內(nèi)存中清楚緩存,那么可以使用 unpersist()方法。
我們來仔細(xì)分析下持久化和非持久化的區(qū)別:
非持久化:持久化:
顯然對(duì)于要復(fù)用多次的 RDD,要將其進(jìn)行持久化操作,此時(shí) Spark 就會(huì)根據(jù)你的持久化策略,將 RDD 中的數(shù)據(jù)保存到內(nèi)存或者磁盤中。以后每次對(duì)這個(gè) RDD 進(jìn)行算子操作時(shí),都會(huì)直接從內(nèi)存或磁盤中提取持久化的 RDD 數(shù)據(jù),然后執(zhí)行算子,而不會(huì)從源頭處重新計(jì)算一遍這個(gè) RDD,再執(zhí)行算子操作。 所以在寫 spark 代碼時(shí):盡可能復(fù)用同一個(gè) RDD。
這里常有個(gè)誤區(qū):
val rdd1 = ... // 讀取hdfs數(shù)據(jù),加載成RDD rdd1.cache // 持久化操作val rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...)rdd1.unpersist // 釋放緩存rdd2.take(10).foreach(println) rdd3.take(10).foreach(println)如果按上述代碼進(jìn)行持久化,則效果就如同沒有持久化一樣。原因就在于 spark 的 lazy 計(jì)算。
代碼應(yīng)該如下:
val rdd1 = ... // 讀取hdfs數(shù)據(jù),加載成RDD rdd1.cacheval rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...)rdd2.take(10).foreach(println) rdd3.take(10).foreach(println)rdd1.unpersistrdd2 執(zhí)行 take 時(shí),會(huì)先緩存 rdd1,接下來直接 rdd3 執(zhí)行 take 時(shí),直接利用緩存的 rdd1,最后,釋放掉 rdd1。所以在何處釋放 RDD 也是非常需要細(xì)心的。 請(qǐng)?jiān)?action 之后 unpersisit!!!
Spark Job Scheduling
窄依賴 與 寬依賴
shuffle 過程,簡(jiǎn)單來說,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè) key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或 join 等操作。比如 reduceByKey、join 等算子,都會(huì)觸發(fā) shuffle 操作。shuffle 操作需要將數(shù)據(jù)進(jìn)行重新聚合和劃分,然后分配到集群的各個(gè)節(jié)點(diǎn)上進(jìn)行下一個(gè) stage 操作,這里會(huì)涉及集群不同節(jié)點(diǎn)間的大量數(shù)據(jù)交換。由于不同節(jié)點(diǎn)間的數(shù)據(jù)通過網(wǎng)絡(luò)進(jìn)行傳輸時(shí)需要先將數(shù)據(jù)寫入磁盤,因此集群中每個(gè)節(jié)點(diǎn)均有大量的文件讀寫操作,從而導(dǎo)致 shuffle 操作十分耗時(shí)(相對(duì)于 map 操作)。
窄依賴:父 RDD 與 子 RDD 的分區(qū)是一對(duì)一(map 操作)或多對(duì)一(coalesce)的,不會(huì)有 shuffle 過程;并且子 RDD 的分區(qū)結(jié)果與其 key 和 value 值無關(guān),每個(gè)分區(qū)與其他分區(qū)亦無關(guān)。
上面左圖可對(duì)應(yīng) map 操作分區(qū),右圖對(duì)應(yīng) coalesce 操作。
寬依賴:父 RDD 與子 RDD 的分區(qū)是一對(duì)多的關(guān)系,并且是按一定方式進(jìn)行重分區(qū),會(huì)有 shuffle 過程產(chǎn)生,比較耗時(shí),可能會(huì)引發(fā) spark 性能問題。常見的寬依賴操作如:groupByKey、reduceByKey、sort、sortByKey 等等。注意:coalesce 操作如果是將 10 個(gè)分區(qū)換成 100 個(gè)分區(qū),由少分區(qū)轉(zhuǎn)成大分區(qū)將會(huì)發(fā)生 shuffle 過程。coalesce 操作場(chǎng)景主要是 rdd 經(jīng)過多層過濾后的小文件合并。rdd 的 reparation 方法與 coalesce 相反,主要是為了 處理數(shù)據(jù)傾斜,增加 partiton 的數(shù)量使得每個(gè) task 處理的數(shù)據(jù)量減少,肯定會(huì)有 shuffle 過程產(chǎn)生(repartition 其實(shí)調(diào)用的就是 coalesce,只不過 shuffle = true (coalesce 中 shuffle: Boolean = false))。
Spark Application
一個(gè) spark 應(yīng)用主要由一系列的 spark Job 組成,而這些 spark Job 由 sparkContext 定義而來。當(dāng) SparkContent 啟動(dòng)時(shí),一個(gè) driver 和一系列的 executor 會(huì)在集群的工作節(jié)點(diǎn)上啟動(dòng)。每個(gè) executor 都有個(gè) JVM 虛擬環(huán)境,一個(gè) executor 不能跨越多個(gè)節(jié)點(diǎn)。
上圖表示在一個(gè)分布式系統(tǒng)上啟動(dòng)一個(gè) spark application 的物理硬件層面流程。
啟動(dòng)一個(gè) SparkContext
驅(qū)動(dòng)程序(driver program)會(huì)定義一個(gè)集群管理(cluster manager)
cluster manager 會(huì)在工作節(jié)點(diǎn)上啟動(dòng)一些 executor,運(yùn)行提交的代碼(注意:一個(gè)節(jié)點(diǎn) node 上會(huì)有多個(gè) executor,但是一個(gè) executor 不能跨越多個(gè) node)
需要注意以下兩點(diǎn):
一個(gè)節(jié)點(diǎn) node 上會(huì)有多個(gè) executor,但是一個(gè) executor 不能跨越多個(gè) node
每個(gè) executor 會(huì)有多個(gè)分區(qū),但是一個(gè)分區(qū)不能跨越多個(gè) executor
DAG(Directed Acyclic Graph)詳解
spark Application tree
簡(jiǎn)而言之:一個(gè) spark Application 由多個(gè) Job 組成,Job 由提交代碼中的 Action 操作定義,而一個(gè) Action 操作由多個(gè) Stage 組成,Stage 的分割由寬依賴進(jìn)行分割的,而每個(gè) Stage 又由多個(gè) Task 組成。一個(gè) Task 對(duì)應(yīng)一個(gè)分區(qū),一個(gè) task 會(huì)被分配到一個(gè) executor 上執(zhí)行。
每個(gè) Job 都對(duì)應(yīng)一個(gè) DAG 圖,每個(gè) DAG 有一系列的 Stage 組成。
Job:每個(gè) Job 對(duì)應(yīng)一個(gè) Action 操作,在 spark execution Graph 中,其邊是基于代碼中的 transform 操作的依賴關(guān)系定義的。
Stages:每個(gè) Action 中可能包含一個(gè)或多個(gè) transform 操作,其中寬依賴又將 Job 劃分成多個(gè) Stage。因?yàn)?Stages 的邊緣需要和 driver 進(jìn)行通信,故通常一個(gè) Job 里,必須順序的執(zhí)行 Stages 而非并行。并且會(huì)將多個(gè)窄依賴步驟合并成一個(gè)步驟,因?yàn)槠渲袥]有的轉(zhuǎn)換操作沒有 shuffle 過程,可以通過只訪問一次數(shù)據(jù),連續(xù)執(zhí)行多個(gè) transform 操作,這也是上面提到的惰性計(jì)算的優(yōu)點(diǎn)。
其代碼中對(duì)應(yīng)的 Stage 如下:
Task:task 是 spark 中最小最基本的執(zhí)行單元,每個(gè) task 代表一個(gè)局部的計(jì)算任務(wù)。在 executor 中可以有多個(gè) core,而每個(gè) core 可以對(duì)應(yīng)一個(gè) task,每個(gè) task 針對(duì)一個(gè)分區(qū)。 每次針對(duì)不同的一塊分區(qū),執(zhí)行相同的代碼。
注意:
spark 中同時(shí)并行的 task 數(shù)量不能超過所有 executor core 數(shù)量。 其中 所有 executor cores 數(shù)量= 每個(gè) executor 中 core 數(shù)量 * executor 數(shù)量。
task 的并行化是有 executor 數(shù)量 × core 數(shù)量決定的。task 過多,并行化過小,就會(huì)浪費(fèi)時(shí)間;反之就會(huì)浪費(fèi)資源。所以設(shè)置參數(shù)是一個(gè)需要權(quán)衡的過程,原則就是在已有的資源情況下,充分利用內(nèi)存和并行化。
總結(jié)
對(duì)于 DAG 的深刻理解非常重要,如果理解不深刻則可能定位問題的效率不高。比如常見的數(shù)據(jù)傾斜。當(dāng)理解了這些,如果出現(xiàn)了數(shù)據(jù)傾斜,可以分析 job,stage 和 task,找到部分 task 輸入的嚴(yán)重不平衡,最終定位是數(shù)據(jù)問題或計(jì)算邏輯問題。
參考
High Performance Spark
https://www.quora.com/What-is-the-reason-behind-keeping-lazy-evaluation-in-Apache-Spark
https://data-flair.training/blogs/apache-spark-lazy-evaluation/
http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html
備注:公眾號(hào)菜單包含了整理了一本AI小抄,非常適合在通勤路上用學(xué)習(xí)。
往期精彩回顧2019年公眾號(hào)文章精選適合初學(xué)者入門人工智能的路線及資料下載機(jī)器學(xué)習(xí)在線手冊(cè)深度學(xué)習(xí)在線手冊(cè)AI基礎(chǔ)下載(第一部分)備注:加入本站微信群或者qq群,請(qǐng)回復(fù)“加群”加入知識(shí)星球(4500+用戶,ID:92416895),請(qǐng)回復(fù)“知識(shí)星球”喜歡文章,點(diǎn)個(gè)在看
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的spark性能优化 -- spark工作原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 50题matplotlib从入门到精通
- 下一篇: 案例 | 用pdpipe搭建pandas