内存有限的情况下 Spark 如何处理 T 级别的数据?
生活随笔
收集整理的這篇文章主要介紹了
内存有限的情况下 Spark 如何处理 T 级别的数据?
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
簡單起見,下述答案僅就無shuffle的單stage Spark作業(yè)做了概要解釋。對于多stage任務(wù)而言,在內(nèi)存的使用上還有很多其他重要問題沒有覆蓋。部分內(nèi)容請參考評論中?
@邵賽賽
?給出的補(bǔ)充。Spark確實(shí)擅長內(nèi)存計(jì)算,內(nèi)存容量不足時(shí)也可以回退,但題主給出的條件(8GB內(nèi)存跑1TB數(shù)據(jù))也確實(shí)是過于苛刻了……
首先需要解開的一個(gè)誤區(qū)是,對于Spark這類內(nèi)存計(jì)算系統(tǒng),并不是說要處理多大規(guī)模的數(shù)據(jù)就需要多大規(guī)模的內(nèi)存。Spark相對Hadoop MR有大幅性能提升的一個(gè)前提就是大量大數(shù)據(jù)作業(yè)同一時(shí)刻需要加載進(jìn)內(nèi)存的數(shù)據(jù)只是整體數(shù)據(jù)的一個(gè)子集,且大部分情況下可以完全放入內(nèi)存,正如Shark(Spark上的Hive兼容的data warehouse)論文1.1節(jié)所述:
在Spark內(nèi)部,單個(gè)executor進(jìn)程內(nèi)RDD的分片數(shù)據(jù)是用Iterator流式訪問的,Iterator的hasNext方法和next方法是由RDD lineage上各個(gè)transformation攜帶的閉包函數(shù)復(fù)合而成的。該復(fù)合Iterator每訪問一個(gè)元素,就對該元素應(yīng)用相應(yīng)的復(fù)合函數(shù),得到的結(jié)果再流式地落地(對于shuffle stage是落地到本地文件系統(tǒng)留待后續(xù)stage訪問,對于result stage是落地到HDFS或送回driver端等等,視選用的action而定)。如果用戶沒有要求Spark cache該RDD的結(jié)果,那么這個(gè)過程占用的內(nèi)存是很小的,一個(gè)元素處理完畢后就落地或扔掉了(概念上如此,實(shí)現(xiàn)上有buffer),并不會長久地占用內(nèi)存。只有在用戶要求Spark cache該RDD,且storage level要求在內(nèi)存中cache時(shí),Iterator計(jì)算出的結(jié)果才會被保留,通過cache manager放入內(nèi)存池。
簡單起見,暫不考慮帶shuffle的多stage情況和流水線優(yōu)化。這里拿最經(jīng)典的log處理的例子來具體說明一下(取出所有以ERROR開頭的日志行,按空格分隔并取第2列):
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 按傳統(tǒng)單機(jī)immutable FP的觀點(diǎn)來看,上述代碼運(yùn)行起來好像是:
把HDFS上的日志文件全部拉入內(nèi)存形成一個(gè)巨大的字符串?dāng)?shù)組, Filter一遍再生成一個(gè)略小的新的字符串?dāng)?shù)組, 再map一遍又生成另一個(gè)字符串?dāng)?shù)組。
真這么玩兒的話Spark早就不用混了……
如前所述,Spark在運(yùn)行時(shí)動態(tài)構(gòu)造了一個(gè)復(fù)合Iterator。就上述示例來說,構(gòu)造出來的Iterator的邏輯概念上大致長這樣:
new Iterator[String] {private var head: String = _private var headDefined: Boolean = falsedef hasNext: Boolean = headDefined || {do {try head = readOneLineFromHDFS(...) // (1) read from HDFScatch {case _: EOFException => return false}} while (!head.startsWith("ERROR")) // (2) filter closuretrue}def next: String = if (hasNext) {headDefined = falsehead.split(" ")(1) // (3) map closure} else {throw new NoSuchElementException("...")} } 上面這段代碼是我按照Spark中FilteredRDD、MappedRDD的定義和Scala Iterator的filter、map方法的框架寫的偽碼,并且省略了從cache或checkpoint中讀取現(xiàn)成結(jié)果的邏輯。1、2、3三處便是RDD lineage DAG中相應(yīng)邏輯嵌入復(fù)合出的Iterator的大致方式。每種RDD變換嵌入復(fù)合Iterator的具體方式是由不同的RDD以及Scala Iterator的相關(guān)方法定義的。可以看到,用這個(gè)Iterator訪問整個(gè)數(shù)據(jù)集,空間復(fù)雜度是O(1)。可見,Spark RDD的immutable語義并不會造成大數(shù)據(jù)內(nèi)存計(jì)算任務(wù)的龐大內(nèi)存開銷。
然后來看加cache的情況。我們假設(shè)errors這個(gè)RDD比較有用,除了拿出空格分隔的第二列以外,可能在同一個(gè)application中我們還會再頻繁用它干別的事情,于是選擇將它c(diǎn)ache住:
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")).cache() // <-- !!! val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 加了cache之后有什么變化呢?實(shí)際上相當(dāng)于在上述復(fù)合Iterator偽碼的(2)處,將filter出來的文本行逐一追加到了內(nèi)存中的一個(gè)ArrayBuffer[String]里存起來形成一個(gè)block,然后通過cache manager扔進(jìn)受block manager管理的內(nèi)存池。注意這里僅僅cache了filter出來的結(jié)果,HDFS讀出的原始數(shù)據(jù)沒有被cache,對errors做map操作后得到的messages RDD也沒有被cache。這樣一來,后續(xù)任務(wù)復(fù)用errors這個(gè)RDD時(shí),直接從內(nèi)存中取就好,就不用重新計(jì)算了。
首先需要解開的一個(gè)誤區(qū)是,對于Spark這類內(nèi)存計(jì)算系統(tǒng),并不是說要處理多大規(guī)模的數(shù)據(jù)就需要多大規(guī)模的內(nèi)存。Spark相對Hadoop MR有大幅性能提升的一個(gè)前提就是大量大數(shù)據(jù)作業(yè)同一時(shí)刻需要加載進(jìn)內(nèi)存的數(shù)據(jù)只是整體數(shù)據(jù)的一個(gè)子集,且大部分情況下可以完全放入內(nèi)存,正如Shark(Spark上的Hive兼容的data warehouse)論文1.1節(jié)所述:
In fact, one study [1] analyzed the access patterns in the Hive warehouses at Facebook and discovered that for the vast majority (96%) of jobs, the entire inputs could fit into a fraction of the cluster’s total memory.
[1] G. Ananthanarayanan, A. Ghodsi, S. Shenker, and I. Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.
至于數(shù)據(jù)子集仍然無法放入集群物理內(nèi)存的情況,Spark仍然可以妥善處理,下文還會詳述。在Spark內(nèi)部,單個(gè)executor進(jìn)程內(nèi)RDD的分片數(shù)據(jù)是用Iterator流式訪問的,Iterator的hasNext方法和next方法是由RDD lineage上各個(gè)transformation攜帶的閉包函數(shù)復(fù)合而成的。該復(fù)合Iterator每訪問一個(gè)元素,就對該元素應(yīng)用相應(yīng)的復(fù)合函數(shù),得到的結(jié)果再流式地落地(對于shuffle stage是落地到本地文件系統(tǒng)留待后續(xù)stage訪問,對于result stage是落地到HDFS或送回driver端等等,視選用的action而定)。如果用戶沒有要求Spark cache該RDD的結(jié)果,那么這個(gè)過程占用的內(nèi)存是很小的,一個(gè)元素處理完畢后就落地或扔掉了(概念上如此,實(shí)現(xiàn)上有buffer),并不會長久地占用內(nèi)存。只有在用戶要求Spark cache該RDD,且storage level要求在內(nèi)存中cache時(shí),Iterator計(jì)算出的結(jié)果才會被保留,通過cache manager放入內(nèi)存池。
簡單起見,暫不考慮帶shuffle的多stage情況和流水線優(yōu)化。這里拿最經(jīng)典的log處理的例子來具體說明一下(取出所有以ERROR開頭的日志行,按空格分隔并取第2列):
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 按傳統(tǒng)單機(jī)immutable FP的觀點(diǎn)來看,上述代碼運(yùn)行起來好像是:
如前所述,Spark在運(yùn)行時(shí)動態(tài)構(gòu)造了一個(gè)復(fù)合Iterator。就上述示例來說,構(gòu)造出來的Iterator的邏輯概念上大致長這樣:
new Iterator[String] {private var head: String = _private var headDefined: Boolean = falsedef hasNext: Boolean = headDefined || {do {try head = readOneLineFromHDFS(...) // (1) read from HDFScatch {case _: EOFException => return false}} while (!head.startsWith("ERROR")) // (2) filter closuretrue}def next: String = if (hasNext) {headDefined = falsehead.split(" ")(1) // (3) map closure} else {throw new NoSuchElementException("...")} } 上面這段代碼是我按照Spark中FilteredRDD、MappedRDD的定義和Scala Iterator的filter、map方法的框架寫的偽碼,并且省略了從cache或checkpoint中讀取現(xiàn)成結(jié)果的邏輯。1、2、3三處便是RDD lineage DAG中相應(yīng)邏輯嵌入復(fù)合出的Iterator的大致方式。每種RDD變換嵌入復(fù)合Iterator的具體方式是由不同的RDD以及Scala Iterator的相關(guān)方法定義的。可以看到,用這個(gè)Iterator訪問整個(gè)數(shù)據(jù)集,空間復(fù)雜度是O(1)。可見,Spark RDD的immutable語義并不會造成大數(shù)據(jù)內(nèi)存計(jì)算任務(wù)的龐大內(nèi)存開銷。
然后來看加cache的情況。我們假設(shè)errors這個(gè)RDD比較有用,除了拿出空格分隔的第二列以外,可能在同一個(gè)application中我們還會再頻繁用它干別的事情,于是選擇將它c(diǎn)ache住:
val lines = spark.textFile("hdfs://<input>") val errors = lines.filter(_.startsWith("ERROR")).cache() // <-- !!! val messages = errors.map(_.split(" ")(1)) messages.saveAsTextFile("hdfs://<output>") 加了cache之后有什么變化呢?實(shí)際上相當(dāng)于在上述復(fù)合Iterator偽碼的(2)處,將filter出來的文本行逐一追加到了內(nèi)存中的一個(gè)ArrayBuffer[String]里存起來形成一個(gè)block,然后通過cache manager扔進(jìn)受block manager管理的內(nèi)存池。注意這里僅僅cache了filter出來的結(jié)果,HDFS讀出的原始數(shù)據(jù)沒有被cache,對errors做map操作后得到的messages RDD也沒有被cache。這樣一來,后續(xù)任務(wù)復(fù)用errors這個(gè)RDD時(shí),直接從內(nèi)存中取就好,就不用重新計(jì)算了。
總結(jié)
以上是生活随笔為你收集整理的内存有限的情况下 Spark 如何处理 T 级别的数据?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 程序员必备的代码审查(Code Revi
- 下一篇: MapReduce的自制Writable