Spark详解(十二):Spark Streaming原理和实现
1 簡介
SparkStreaming是Spark核心API的一個擴展,具有高吞吐量和容錯能力的實時流數據處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復雜操作,并將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
Spark Streaming在內部處理的機制原理是:先接受實時流的數據,并根據一定的時間間隔拆分成一批批的數據,這些批數據在Spark內核對應一個RDD實例,因此,流數據的DStream可以看成一組RDDs,然后通過調用Spark核心的作業處理這些批數據,最終得到處理后的一批批結果數據。
Spark Streaming的具體工作原理如下:
2. 術語定義
2.1 離散流DStream
Spark Streaming 提供了一種高級的抽象,叫做 DStream,英文全稱為 Discretized Stream,中文翻譯為“離散流”,它代表了一個持續不斷的數據流。DStream可以通過輸入數據源來創建,比如Kafka、Flume和Kinesis;也可以通過對其他DStream應用高階函數來創建,比如map、reduce、join、window。
DStream的內部,其實一系列持續不斷產生的RDD。RDD是Spark Core的核心抽象,即,不可變的,分布式的數據集。DStream中的每個RDD都包含了一個時間段內的數據。
對DStream應用的算子,比如map,其實在底層會被翻譯為對DStream中每個RDD的操作。比如對一個DStream執行一個map操作,會產生一個新的DStream。但是,在底層,其實其原理為,對輸入DStream中每個時間段的RDD,都應用一遍map操作,然后生成的新的RDD,即作為新的DStream中的那個時間段的一個RDD。底層的RDD的transformation操作,其實,還是由Spark Core的計算引擎來實現的。Spark Streaming對Spark Core進行了一層封裝,隱藏了細節,然后對開發人員提供了方便易用的高層次的API。
2.2 Dstream Graph
Spark Streaming中作業的生成與Spark Core類似,對DStream的各種操作讓他們之間建立起依賴關系,當遇到DStream使用輸出操作時,對這些依賴關系以及它們之間的操作會被記錄到名為DStreamGraph的對象中表示一個作業。這些作業注冊到DstreamGraph中并不會立即被運行,而是等到Spark Streaming啟動后,達到批處理時間時,才根據DStreamGraph生成作業處理該批處理時間內接受的數據。
2.3 批處理間隔
在Spark Streaming中,數據采集是逐條進行的,而數據處理時按照批次進行的,因此在Spark Streaming中會先設置批處理間隔(batch duration)。
2.4 窗口間隔(Window Duration)和滑動間隔(Slide Duration)
對于窗口操作而言,其窗口內部會有N個批處理數據,批處理數據的個數是由窗口間隔決定的,其為窗口持續的時間,在窗口操作中只有窗口間隔滿足了才會觸發批處理數據的處理。除了窗口的長度,另一個重要參數就是滑動間隔(Slide Duration),它指的是經過多長時間窗口滑動一次,形成新的窗口,滑動窗口默認為情況下和批處理間隔相同,而窗口間隔一般設置地比他們兩個都大。
3. Spark Streaming 特點
3.1 流式處理
Spark Streaming是將流式計算分解成一系列短小的批處理作業。
3.2 高容錯
對于流式計算來說,容錯性至關重要,首先我們要明確一下SparkRDD的容錯機制。每一個RDD都是一個不可變的分布式可重新計算的數據集,其記錄著確定性的操作“血統”(lineage),所以只要輸入數據是可以容錯的,那么任意一個RDD的分區(partition)出錯或者不可用,都是可以利用輸入數據通過轉換操作而重新計算的。
3.3 低延遲
對于目前版本的Spark Streaming而言,其最小的Batch Size的選擇在0.5~2s之間。
3.4 吞吐量高
4. Spark Streaming 編程模型
4.1 DStream的輸入源
在Spark Streaming中所有的操作都是基于流的,而輸入源是這一系列操作的起點。輸入 DStreams 和 DStreams 接收的流都代表輸入數據流的來源,在Spark Streaming 提供兩種內置數據流來源:
- 基礎來源 在 StreamingContext API 中直接可用的來源。例如:文件系統、Socket(套接字)連接和 Akka actors;
- 高級來源 如 Kafka、Flume、Kinesis、Twitter 等,可以通過額外的實用工具類創建。
4.2 DStream 的操作
與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、窗口轉換操作和輸出操作。
4.2.1 普通的轉換操作
| map(func) | 源 DStream的每個元素通過函數func返回一個新的DStream。 |
| flatMap(func) | 類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。 |
| filter(func) | 在源DSTREAM上選擇Func函數返回僅為true的元素,最終返回一個新的DSTREAM 。 |
| flatMap(func) | 類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。 |
| repartition(numPartitions) | 通過輸入的參數numPartitions的值來改變DStream的分區大小。 |
| union(otherStream) | 返回一個包含源DStream與其他 DStream的元素合并后的新DSTREAM。 |
| count() | 對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。 |
| reduce(func) | 使用函數func(有兩個參數并返回一個結果)將源DStream 中每個RDD的元素進行聚 合操作,返回一個內部所包含的RDD只有一個元素的新DStream。 |
| countByValue() | 計算DStream中每個RDD內的元素出現的頻次并返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。 |
| reduceByKey(func, [numTasks]) | 當一個類型為(K,V)鍵值對的DStream被調用的時候,返回類型為類型為(K,V)鍵值對的新 DStream,其中每個鍵的值V都是使用聚合函數func匯總。注意:默認情況下,使用 Spark的默認并行度提交任務(本地模式下并行度為2,集群模式下位8),可以通過配置numTasks設置不同的并行任務數。 |
| join(otherStream, [numTasks]) | 當被調用類型分別為(K,V)和(K,W)鍵值對的2個DStream 時,返回類型為(K,(V,W))鍵值對的一個新 DSTREAM。 |
| cogroup(otherStream, [numTasks]) | 當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
| map(func) | 源 DStream的每個元素通過函數func返回一個新的DStream。 |
| transform(func) | 通過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這可以用來在DStream做任意RDD操作。 |
| updateStateByKey(func) | 返回一個新狀態的DStream,其中每個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func后的更新。這個方法可以被用來維持每個鍵的任何狀態數據。 |
在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:
- transform(func)操作
該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函數。它可以被應用于未在 DStream API 中暴露任何的RDD操作。例如,在每批次的數據流與另一數據集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來做到這一點,這使得DStream的功能非常強大。例如,你可以通過連接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),然后基于此做實時數據清理的篩選,如下面官方提供的偽代碼所示。事實上,也可以在transform方法中使用機器學習和圖形計算的算法。
- updateStateByKey操作
該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態可以是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
讓我們用一個例子來說明,假設你要進行文本數據流中單詞計數。在這里,正在運行的計數是狀態而且它是一個整數。我們定義了更新功能如下:
此函數應用于含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對里面的每個元素(如wordCount中的word)調用一下更新函數,newValues是最新的值,runningCount是之前的值。
4.2.2 窗口轉換操作
Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:
| window(windowLength, slideInterval) | 返回一個基于源DStream的窗口批次計算后得到新的DStream。 |
| countByWindow(windowLength,slideInterval) | 返回基于滑動窗口的DStream中的元素的數量。 |
| reduceByWindow(func, windowLength,slideInterval) | 基于滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStream。 |
| reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) | 基于滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操作,得到一個新的DStream。 |
| reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) | 一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合并移去最早的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那么我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以復用中間三秒的統計量,提高統計的效率。 |
| countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基于滑動窗口計算源DStream中每個RDD內每個元素出現的頻次并返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選參數進行配置。 |
4.2.3 輸出操作
Spark Streaming允許DStream的數據被輸出到外部系統,如數據庫或文件系統。由于輸出操作實際上使transformation操作后的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似于RDD操作)。以下表列出了目前主要的輸出操作:
| print() | 在Driver中打印出DStream中數據的前10個元素。 |
| saveAsTextFiles(prefix, [suffix]) | 將DStream中的內容以文本的形式保存為文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
| saveAsObjectFiles(prefix, [suffix]) | 將DStream中的內容按對象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
| saveAsHadoopFiles(prefix, [suffix]) | 將DStream中的內容以文本的形式保存為Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
| foreachRDD(func) | 最基本的輸出操作,將func函數應用于DStream中的RDD上,這個操作會輸出數據到外部系統,比如保存RDD到文件或者網絡數據庫等。需要注意的是func函數是在運行該streaming應用的Driver進程里執行的。 |
5. 容錯、持久化和性能調優
5.1 容錯
DStream基于RDD組成,RDD的容錯性依舊有效,我們首先回憶一下SparkRDD的基本特性。
- RDD是一個不可變的、確定性的可重復計算的分布式數據集。RDD的某些partition丟失了,可以通過血統(lineage)信息重新計算恢復;
- 如果RDD任何分區因worker節點故障而丟失,那么這個分區可以從原來依賴的容錯數據集中恢復;
- 由于Spark中所有的數據的轉換操作都是基于RDD的,即使集群出現故障,只要輸入數據集存在,所有的中間結果都是可以被計算的。
Spark Streaming是可以從HDFS和S3這樣的文件系統讀取數據的,這種情況下所有的數據都可以被重新計算,不用擔心數據的丟失。但是在大多數情況下,Spark Streaming是基于網絡來接受數據的,此時為了實現相同的容錯處理,在接受網絡的數據時會在集群的多個Worker節點間進行數據的復制,通過RDD設置默認存儲級別為Memroy_AND_DISK_2(默認的復制數是2),這導致產生在出現故障時被處理的兩種類型的數據:
1)Data received and replicated :一旦一個Worker節點失效,系統會從另一份還存在的數據中重新計算。
2)Data received but buffered for replication :一旦數據丟失,可以通過RDD之間的依賴關系,從HDFS這樣的外部文件系統讀取數據。
此外,有兩種故障,我們應該關心:
(1)Worker節點失效:通過上面的講解我們知道,這時系統會根據出現故障的數據的類型,選擇是從另一個有復制過數據的工作節點上重新計算,還是直接從從外部文件系統讀取數據。
(2)Driver(驅動節點)失效 :如果運行 Spark Streaming應用時驅動節點出現故障,那么很明顯的StreamingContext已經丟失,同時在內存中的數據全部丟失。對于這種情況,Spark Streaming應用程序在計算上有一個內在的結構——在每段micro-batch數據周期性地執行同樣的Spark計算。這種結構允許把應用的狀態(批次數據的元數據信息,亦稱checkpoint)周期性地保存到可靠的存儲空間中,并在driver重新啟動時恢復該狀態。具體做法是在ssc.checkpoint(<checkpoint directory>)函數中進行設置,Spark Streaming就會定期把DStream的元信息寫入到HDFS中,一旦驅動節點失效,丟失的StreamingContext會通過已經保存的檢查點信息進行恢復。
5.2 預寫日志 WriteAheadLogs
從Spark Streaming 1.2 版本開始引入了預寫日志的功能(WriteAheadLogs)。實時流處理系統必須要能在24/7時間內工作,因此它需要具備從各種系統故障中恢復過來的能力。最開始,SparkStreaming就支持從driver和worker故障恢復的能力。然而有些數據源的輸入可能在故障恢復以后丟失數據。在Spark1.2版本中,Spark已經在SparkStreaming中對預寫日志(也被稱為journaling)作了初步支持,改進了恢復機制,并使更多數據源的零數據丟失有了可靠。
對于文件這樣的源數據,driver恢復機制足以做到零數據丟失,因為所有的數據都保存在了像HDFS或S3這樣的容錯文件系統中了。但對于像Kafka和Flume等其它數據源,有些接收到的數據還只緩存在內存中,尚未被處理,它們就有可能會丟失。這是由于Spark應用的分布操作方式引起的。當driver進程失敗時,所有在standalone/yarn/mesos集群運行的executor,連同它們在內存中的所有數據,也同時被終止。對于Spark Streaming來說,從諸如Kafka和Flume的數據源接收到的所有數據,在它們處理完成之前,一直都緩存在executor的內存中。縱然driver重新啟動,這些緩存的數據也不能被恢復。為了避免這種數據損失,在Spark1.2發布版本中引進了預寫日志(WriteAheadLogs)功能。
在一個Spark Streaming應用開始時(也就是Driver開始)。相關的Streaming Context(所有流功能的基礎功能)使用SparkContext 啟動接收器成為長駐運行任務。這些接收器接受并保存數據到Spark內存中一共處理。用戶傳輸數據的生命周期如下圖所示:
(1)接受數據:接收器將數據分成一系列小塊,存儲到Executor內存或者磁盤中,如果啟動了預寫日志,數據同時還寫入到容錯文件系統的預寫日志文件中。
(2)通知StreamContext:接受塊的元數據(Meatdata)被發送到Driver的StreamingContext。1.這個元數據包括:定位其在executor內存或者磁盤中數據位置的塊信息。2.塊數據在日志文件中的偏移信息。如果啟動了預寫日志,數據同時還寫入到容錯文件系統的預寫日志文件中。
(3)處理數據:每批數據的間隔,流上下文使用塊信息產生彈性分布式數據集RDD和他們的作業Job,StreamingContext通過運行任務處理Executor內存或者磁盤中的數據塊執行作業。
(4)周期性的設置檢查點:為了恢復的需要,流計算(即StreamingContext)提供來的DStream)周期性的設置檢查點,并保存到同一個容錯文件系統的另外一組文件中。
當一個失敗的Driver端重啟的時候,會進行如下處理:
(1)恢復計算:使用檢查點信息重啟Driver,重新構造上下文重啟接收器。
(2)恢復元數據:為了保證能夠繼續下去所必備的全部元數據塊都被恢復。
(3)未完成作業的重新生成:由于失敗而沒有處理完成的批處理,將使用會的元數據再次產生RDD和對應的作業。
(4)讀取保存在日志中的塊數據:在這些作業執行時,塊數據之間從預寫日志中讀出,這將恢復在日志中可靠地保存所有必要的數據。
(5)重發尚未確認的數據:失敗時沒有保存到日志中的緩存數據將由數據源再次發送。
6. 持久化
與RDD類似,Spark Streaming也可以讓開發人員手動控制,將數據流中的數據持久化到內存中。對DStream調用persist()方法,就可以讓Spark Streaming自動將該數據流中的所有產生的RDD,都持久化到內存中。如果要對一個DStream多次執行操作,那么,對DStream持久化是非常有用的。因為多次操作,可以共享使用內存中的一份緩存數據。
對于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于狀態的操作,比如updateStateByKey,默認就隱式開啟了持久化機制。即Spark Streaming默認就會將上述操作產生的Dstream中的數據,緩存到內存中,不需要開發人員手動調用persist()方法。
對于通過網絡接收數據的輸入流,比如socket、Kafka、Flume等,默認的持久化級別,是將數據復制一份,以便于容錯。相當于是MEMORY_ONLY_SER_2。
與RDD不同的是,默認的持久化級別,統一都是要序列化的。
7. 性能調優
7.1 優化運行時間
-
增加并行度 確保使用整個集群的資源,而不是把任務集中在幾個特定的節點上。對于包含shuffle的操作,增加其并行度以確保更為充分地使用集群資源;
-
減少數據序列化,反序列化的負擔 Spark Streaming默認將接受到的數據序列化后存儲,以減少內存的使用。但是序列化和反序列話需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的系列化接口可以更高效地使用CPU;
-
設置合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關系,后面的Job必須確保前面的作業執行結束后才能提交。若前面的Job執行的時間超出了批處理時間間隔,那么后面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成后續Job的阻塞。因此設置一個合理的批處理間隔以確保作業能夠在這個批處理間隔內結束時必須的;
-
減少因任務提交和分發所帶來的負擔 通常情況下,Akka框架能夠高效地確保任務及時分發,但是當批處理間隔非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式通常會比使用Fine-grained Mesos模式有更小的延遲。
7.2 優化內存使用
-
控制batch size(批處理間隔內的數據量) Spark Streaming會把批處理間隔內接收到的所有數據存放在Spark內部的可用內存區域中,因此必須確保當前節點Spark的可用內存中少能容納這個批處理時間間隔內的所有數據,否則必須增加新的資源以提高集群的處理能力;
-
及時清理不再使用的數據 前面講到Spark Streaming會將接受的數據全部存儲到內部可用內存區域中,因此對于處理過的不再需要的數據應及時清理,以確保Spark Streaming有富余的可用內存空間。通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數需要小心設置以免后續操作中所需要的數據被超時錯誤處理;
-
觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引起一系列不可預料的問題。觀察GC的運行情況,采用不同的GC策略以進一步減小內存回收對Job運行的影響。
總結
以上是生活随笔為你收集整理的Spark详解(十二):Spark Streaming原理和实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(十一):Spark运行架
- 下一篇: Spark详解(十三):Spark St