Spark Streaming 实战案例(一)
本節主要內容
本節部分內容來自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations
1. Spark流式計算簡介
Hadoop的MapReduce及Spark SQL等只能進行離線計算,無法滿足實時性要求較高的業務需求,例如實時推薦、實時網站性能分析等,流式計算可以解決這些問題。目前有三種比較常用的流式計算框架,它們分別是Storm,Spark Streaming和Samza,各個框架的比較及使用情況,可以參見:http://www.csdn.net/article/2015-03-09/2824135。本節對Spark Streaming進行重點介紹,Spark Streaming作為Spark的五大核心組件之一,其原生地支持多種數據源的接入,而且可以與Spark MLLib、Graphx結合起來使用,輕松完成分布式環境下在線機器學習算法的設計。Spark支持的輸入數據源及輸出文件如下圖所示:
在后面的案例實戰當中,會涉及到這部分內容。中間的”Spark Streaming“會對輸入的數據源進行處理,然后將結果輸出,其內部工作原理如下圖所示:
Spark Streaming接受實時傳入的數據流,然后將數據按批次(batch)進行劃分,然后再將這部分數據交由Spark引擎進行處理,處理完成后將結果輸出到外部文件。
先看下面一段基于Spark Streaming的word count代碼,它可以很好地幫助初步理解流式計算
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingWordCount {def main(args: Array[String]) {if (args.length < 1) {System.err.println("Usage: StreamingWordCount <directory>")System.exit(1)}//創建SparkConf對象val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")// Create the context//創建StreamingContext對象,與集群進行交互val ssc = new StreamingContext(sparkConf, Seconds(20))// Create the FileInputDStream on the directory and use the// stream to count words in new files created//如果目錄中有新創建的文件,則讀取val lines = ssc.textFileStream(args(0))//分割為單詞val words = lines.flatMap(_.split(" "))//統計單詞出現次數val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//打印結果wordCounts.print()//啟動Spark Streamingssc.start()//一直運行,除非人為干預再停止ssc.awaitTermination()} }運行上面的程序后,再通過命令行界面,將文件拷貝到相應的文件目錄,具體如下:
程序在運行時,根據文件創建時間對文件進行處理,在上一次運行時間后創建的文件都會被處理,輸出結果如下:
2. Spark Streaming相關核心類
1. DStream(discretized stream)
Spark Streaming提供了對數據流的抽象,它就是DStream,它可以通過前述的 Kafka, Flume等數據源創建,DStream本質上是由一系列的RDD構成。各個RDD中的數據為對應時間間隔( interval)中流入的數據,如下圖所示:
對DStream的所有操作最終都要轉換為對RDD的操作,例如前面的StreamingWordCount程序,flatMap操作將作用于DStream中的所有RDD,如下圖所示:
2.StreamingContext
在Spark Streaming當中,StreamingContext是整個程序的入口,其創建方式有多種,最常用的是通過SparkConf來創建:
創建StreamingContext對象時會根據SparkConf創建SparkContext
/*** Create a StreamingContext by providing the configuration necessary for a new SparkContext.* @param conf a org.apache.spark.SparkConf object specifying Spark parameters* @param batchDuration the time interval at which streaming data will be divided into batches*/def this(conf: SparkConf, batchDuration: Duration) = {this(StreamingContext.createNewSparkContext(conf), null, batchDuration)}也就是說StreamingContext是對SparkContext的封裝,StreamingContext還有其它幾個構造方法,感興趣的可以了解,后期在源碼解析時會對它進行詳細的講解,創建StreamingContext時會指定batchDuration,它用于設定批處理時間間隔,需要根據應用程序和集群資源情況去設定。當創建完成StreamingContext之后,再按下列步驟進行:
關于StreamingContext有幾個值得注意的地方:
1.StreamingContext啟動后,增加新的操作將不起作用。也就是說在StreamingContext啟動之前,要定義好所有的計算邏輯
2.StreamingContext停止后,不能重新啟動。也就是說要重新計算的話,需要重新運行整個程序。
3.在單個JVM中,一段時間內不能出現兩個active狀態的StreamingContext
4.調用StreamingContext的stop方法時,SparkContext也將被stop掉,如果希望StreamingContext關閉時,保留SparkContext,則需要在stop方法中傳入參數stopSparkContext=false
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). By default, if stopSparkContext is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(
stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
5.SparkContext對象可以被多個StreamingContexts重復使用,但需要前一個StreamingContexts停止后再創建下一個StreamingContext對象。
3. InputDStreams及Receivers
InputDStream指的是從數據流的源頭接受的輸入數據流,在前面的StreamingWordCount程序當中,val lines = ssc.textFileStream(args(0)) 就是一種InputDStream。除文件流外,每個input DStream都關聯一個Receiver對象,該Receiver對象接收數據源傳來的數據并將其保存在內存中以便后期Spark處理。
Spark Streaimg提供兩種原生支持的流數據源:
Basic sources(基礎流數據源)。直接通過StreamingContext API創建,例如文件系統(本地文件系統及分布式文件系統)、Socket連接及Akka的Actor。
文件流(File Streams)的創建方式:
a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
b. streamingContext.textFileStream(dataDirectory)
實時上textFileStream方法最終調用的也是fileStream方法
def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
}
基于Akka Actor流數據的創建方式:
streamingContext.actorStream(actorProps, actor-name)
基于Socket流數據的創建方式:
ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
基于RDD隊列的流數據創建方式:
streamingContext.queueStream(queueOfRDDs)
Advanced sources(高級流數據源)。如Kafka, Flume, Kinesis, Twitter等,需要借助外部工具類,在運行時需要外部依賴(下一節內容中介紹)
Spark Streaming還支持用戶
3. Custom Sources(自定義流數據源),它需要用戶定義receiver,該部分內容也放在下一節介紹
最后有兩個需要注意的地方:
3. 入門案例
為方便后期查看運行結果,修改日志級別為Level.WARN
import org.apache.spark.Loggingimport org.apache.log4j.{Level, Logger}/** Utility functions for Spark Streaming examples. */ object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}} }基于Socket流數據
配置運行時參數
使用
//啟動netcat server root@sparkmaster:~/streaming# nc -lk 9999運行NetworkWordCount 程序,然后在netcat server運行的控制臺輸入任意字符串
root@sparkmaster:~/streaming# nc -lk 9999 Hello WORLD HELLO WORLD WORLD TEWST NIMA基于RDD隊列的流數據
總結
以上是生活随笔為你收集整理的Spark Streaming 实战案例(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ArcGis中空间连接join
- 下一篇: 万用socket神器Linux Netc