SPARK STREAMING之1:编程指南(翻译v1.4.1)
SPARK STREAMING之1:編程指南(翻譯v1.4.1)
@(SPARK)[spark, 大數據]
- SPARK STREAMING之1編程指南翻譯v141
- 概述
- 快速入門例子
- 基本概念
- Linking
概述
Spark Streaming是Spark核心API的一個擴展,它使得spark可擴展、高吞吐、可容錯的對實時數據流進行處理。可以通過集成外部系統獲取數據來源,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, 或者 TCP sockets,還可以使用高度抽象的復雜算法,如map、reduce、join、window等。最后,處理完的數據可以直接發送到文件系統、數據庫或者一些儀表盤。事實上,你還可以spark的機器學習和圖處理算法直接運用于數據流中。
它的內部實現原理如下圖: Spark Streaming收到實時輸入數據,然后將它切分為多個批次,這些批次被spark引擎處理后同樣以批次的形式輸出最終的數據流。(注:也就是說Spark Streaming不是真正的實時系統,而是一個準實時系統,它會等待收集到一定量的數據后匯總成一個批次,然后再統一處理)
Spark Streaming有一個高度抽象概念叫做discretized stream or DStream(分離式流),它表示一個持續的數據流。DStream可以通過集成外部系統來獲取(如kafka,flume等),或者通過對其它DStream操作后生成新的Dstream。在spark內部,DStream以一系列的RDD表示。
本文介紹了如何編寫Spark Streaming的程序。你可以使用scala、java、python(1.2版本后)編寫程序,這些內容都將在本文中介紹。在本文中,你可以通過點擊標簽選擇查看不同語言所實現的代碼。
注意:Spark Streaming的python API在1.2版本后引入的,它包括了scala/java所有的DStream transformation操作,以及大部分的輸出操作。但是,只它支持基本的數據源(如文本文件,socket的文本數據等),集成kafka,flume的API將在以后的版本中實現。
快速入門例子
在詳細介紹如何編寫Spark Streaming程序前,我們先看一個簡單的例子,在這個例子中,我們對從TCP scoket中收到的文本數據進行單詞統計。步驟如下:
首先,我們導入Spark Streaming的相關類,以及StreamingContext(1.3版本后不再需要)。StreamingContext是所有streaming功能的進入點。在這個例子中,我們創建了一個local StreamingContext,它有2個執行線程,每1秒形成一個批次。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario.val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))使用這個context,我們創建一個從TCP socket獲取數據的Dstream,它指定了主機名(localhost)以及端口號(9999)。
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)變量lines表示從數據服務器收到的數據流,DStream中的每一個數據記錄表示一行。然后,我們將這些行以空格切分成單詞。(注:事實上,lines代表的是每秒鐘收到的數據,而在map操作中以行作為單位進行輸入)
// Split each line into words val words = lines.flatMap(_.split(" "))flatMap是一個一對多的DStream操作,它從源DStream中讀取一個記錄,然后生成多個記錄并發送到新的DStream中。在這個例子中,每一行內容都將切分為多個單詞,這個單詞流以words這個DStream表示。接下來,我們統計這些單詞。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _)// Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()words這個DStream經過map操作后轉換為一個(word,1)對的新DStream,然后統計每個單詞的出現頻率。最后,wordCouns.print()會每秒鐘輸出一些統計結果。
記住到這里為止,Spark Streaming只是定義了它會進行什么處理,但這些處理還沒啟動。你可以通過以下代碼啟動真正的處理進程:
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate完整代碼請見 Spark Streaming example NetworkWordCount.
如果你已經下載了spark,你可以按照下面的步驟運行。首先你需要運行netcat(一個在大多數類unix系統中都能找到的小工具)作為一個數據服務器:
$ nc -lk 9999然后,在另一個終端中,你可以這樣啟動上面的例子:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999此時,你在nc終端中輸入的每一行文本都將被統計并輸出結果。如下:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world# TERMINAL 2: RUNNING NetworkWordCount$ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ...基本概念
下面,我們開始詳細描述Spark Streaming的基本概念。
Linking
與spark 類似,spark streaming可以通過maven的中央倉庫獲取。開發Spark Streaming程序前,你需要將以下依賴關系放到你的maven或者sbt項目中。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.4.1</version></dependency>如果需要集成kafka, flume,kinesis等未在Spark Streaming 核心API中包含的數據源,你需要在依賴中添加類似 spark-streaming-xyz_2.10的依賴。一引起常用的如下:
Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License] Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10請參考maven的中央倉庫以獲得最新最全的依賴關系。
總結
以上是生活随笔為你收集整理的SPARK STREAMING之1:编程指南(翻译v1.4.1)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 构建scala+IDEA+sbt开发环境
- 下一篇: spark之13:提交应用的方法(spa