Spark Streaming
spark streaming介紹
Spark streaming是Spark核心API的一個擴展,它對實時流式數據的處理具有可擴展性、高吞吐量、可容錯性等特點。我們可以從kafka、flume、witter、 ZeroMQ、Kinesis等源獲取數據,也可以通過由 高階函數map、reduce、join、window等組成的復雜算法計算出數據。最后,處理后的數據可以推送到文件系統、數據庫、實時儀表盤中.
為什么使用spark streaming
很多大數據應用程序需要實時處理數據流。思考:
我們知道spark和storm都能處理實時數據,可是spark是如何處理實時數據的,spark包含比較多組件:包括
- spark core
- Spark SQL
- Spark Streaming
- GraphX
- MLlib
spark core中包含RDD、DataFrame和DataSet等,因此spark sql是為了兼容hive而產生的sql語句,GraphX提供的分布式圖計算框架,MLlib提供的機器學習框架。因此spark所謂的實時處理數據則是通過spark streaming來實現的。
什么是StreamingContext
為了初始化Spark Streaming程序,一個StreamingContext對象必需被創建,它是Spark Streaming所有流操作的主要入口。一個StreamingContext 對象可以用SparkConf對象創建。StreamingContext這里可能不理解,其實跟SparkContext也差不多的。(可參考讓你真正理解什么是SparkContext, SQLContext 和HiveContext)。同理也有hadoop Context,它們都是全文對象,并且會獲取配置文件信息。那么配置文件有哪些?比如hadoop的core-site.xml,hdfs-site.xml等,spark如spark-defaults.conf等。這時候我們可能對StreamingContext有了一定的認識。下面一個例子
為了初始化Spark Streaming程序,一個StreamingContext對象必需被創建,它是Spark Streaming所有流操作的主要入口。
一個StreamingContext 對象可以用SparkConf對象創建。
一個進程內運行Spark Streaming。需要注意的是,它在內部創建了一個SparkContext對象,你可以通過 ssc.sparkContext訪問這個SparkContext對象。
批時間片需要根據你的程序的潛在需求以及集群的可用資源來設定,你可以在性能調優那一節獲取詳細的信息.可以利用已經存在的 SparkContext 對象創建 StreamingContext 對象。 當一個上下文(context)定義之后,你必須按照以下幾步進行操作
- 定義輸入源;
- 準備好流計算指令;
- 利用 streamingContext.start() 方法接收和處理數據;
- 處理過程將一直持續,直到 streamingContext.stop() 方法被調用。
幾點需要注意的地方:
- 一旦一個context已經啟動,就不能有新的流算子建立或者是添加到context中。
- 一旦一個context已經停止,它就不能再重新啟動
- 在JVM中,同一時間只能有一個StreamingContext處于活躍狀態
- 在StreamingContext上調用 stop() 方法,也會關閉SparkContext對象。如果只想僅關閉StreamingContext對象,設
- 置 stop() 的可選參數為false
- 一個SparkContext對象可以重復利用去創建多個StreamingContext對象,前提條件是前面的StreamingContext在后面
- StreamingContext創建之前關閉(不關閉SparkContext)。
一個簡單的基于Streaming的workCount代碼如下: package com.debugo.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf ?? object WordCountStreaming { ??def main(args: Array[String]): Unit ={ ????val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077") ?? ????//create the streaming context ????val? ssc = new StreamingContext(sparkConf, Seconds(30)) ?? ????//process file when new file be found. ????val lines = ssc.textFileStream("file:///home/spark/data") ????val words = lines.flatMap(_.split(" ")) ????val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//這里不是rdd,而是dstream ????wordCounts.print() ????ssc.start() ????ssc.awaitTermination() ??} }
這段代碼實現了當指定的路徑有新文件生成時,就會對這些文件執行wordcount,并把結果print。具體流程如下:
代碼詮釋:
使用Spark Streaming就需要創建StreamingContext對象(類似SparkContext)。創建StreamingContext對象所需的參數與SparkContext基本一致,包括設定Master節點(setMaster),設定應用名稱(setAppName)。第二個參數Seconds(30),指定了Spark Streaming處理數據的時間間隔為30秒。需要根據具體應用需要和集群處理能力進行設置。
轉載于:https://www.cnblogs.com/chengzhihua/p/9512634.html
總結
以上是生活随笔為你收集整理的Spark Streaming的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [JSOI2010] 满汉全席
- 下一篇: 如何修改以前登录过的共享文件夹的用户名和