使用 Kafka 和 Spark Streaming 构建实时数据处理系统
使用 Kafka 和 Spark Streaming 構建實時數據處理系統?
來源:https://www.ibm.com/developerworks,這篇文章轉載自微信里文章,正好解決了我項目中的技術問題,非常感謝。
引言
在很多領域,如股市走向分析, 氣象數據測控,網站用戶行為分析等,由于數據產生快,實時性強,數據量大,所以很難統一采集并入庫存儲后再做處理,這便導致傳統的數據處理架構不能滿足需要。流計算的出現,就是為了更好地解決這類數據在處理過程中遇到的問題。與傳統架構不同,流計算模型在數據流動的過程中實時地進行捕捉和處理,并根據業務需求對數據進行計算分析,最終把結果保存或者分發給需要的組件。本文將從實時數據產生和流向的各個環節出發,通過一個具有實際意義的案例,向讀者介紹如何使用 Apache Kafka 和 Spark Streaming 模塊構建一個實時的數據處理系統,當然本文只是拋磚引玉,因為構建一個良好健壯的實時數據處理系統并不是一篇文章可以說清楚的。在閱讀本文前,假設您已經對 Apache Kafka 分布式消息系統有了基本的了解,并且可以使用 Spark Streaming API 進行簡單的編程。接下來,就讓我們一起看看如何構建一個簡易的實時數據處理系統吧。
關于 Kafka
Kafka 是一個分布式的,高吞吐量,易于擴展地基于主題發布/訂閱的消息系統,最早是由 Linkedin 開發,并于 2011 年開源并貢獻給 Apache 軟件基金會。一般來說,Kafka 有以下幾個典型的應用場景:
作為消息隊列。由于 Kafka 擁有高吞吐量,并且內置消息主題分區,備份,容錯等特性,使得它更適合使用在大規模,高強度的消息數據處理的系統中。 流計算系統的數據源。流數據產生系統作為 Kafka 消息數據的生產者將數據流分發給 Kafka 消息主題,流數據計算系統 (Storm,Spark Streaming 等) 實時消費并計算數據。這也是本文將要介紹的應用場景。 系統用戶行為數據源。這種場景下,系統將用戶的行為數據,如訪問頁面,停留時間,搜索日志,感興趣的話題等數據實時或者周期性的發布到 Kafka 消息主題,作為對接系統數據的來源。 日志聚集。Kafka 可以作為一個日志收集系統的替代解決方案,我們可以將系統日志數據按類別匯集到不同的 Kafka 消息主題中。 事件源。在基于事件驅動的系統中,我們可以將事件設計成合理的格式,作為 Kafka 消息數據存儲起來,以便相應系統模塊做實時或者定期處理。由于 Kafka 支持大數據量存儲,并且有備份和容錯機制,所以可以讓事件驅動型系統更加健壯和高效。當然 Kafka 還可以支持其他的應用場景,在這里我們就不一一羅列了。關于 Kafka 更詳細的介紹,請讀者參考Kafka 官網。需要指出的是,本文使用的 Kafka 版本是基于 Scala 2.10 版本構建的 0.8.2.1 版本。
關于 Spark Steaming
Spark Streaming 模塊是對于 Spark Core 的一個擴展,目的是為了以高吞吐量,并且容錯的方式處理持續性的數據流。目前 Spark Streaming 支持的外部數據源有 Flume、 Kafka、Twitter、ZeroMQ、TCP Socket 等。
Discretized Stream 也叫 DStream) 是 Spark Streaming 對于持續數據流的一種基本抽象,在內部實現上,DStream 會被表示成一系列連續的 RDD(彈性分布式數據集),每一個 RDD 都代表一定時間間隔內到達的數據。所以在對 DStream 進行操作時,會被 Spark Stream 引擎轉化成對底層 RDD 的操作。對 Dstream 的操作類型有:
Transformations: 類似于對 RDD 的操作,Spark Streaming 提供了一系列的轉換操作去支持對 DStream 的修改。如 map,union,filter,transform 等 Window Operations: 窗口操作支持通過設置窗口長度和滑動間隔的方式操作數據。常用的操作有 reduceByWindow,reduceByKeyAndWindow,window 等 Output Operations: 輸出操作允許將 DStream 數據推送到其他外部系統或存儲平臺, 如 HDFS, Database 等,類似于 RDD 的 Action 操作,Output 操作也會實際上觸發對 DStream 的轉換操作。常用的操作有 print,saveAsTextFiles,saveAsHadoopFiles, foreachRDD 等。關于 DStream Operations 的更多信息,請參考 Spark 官網的 Spark Streaming Programing Guide。
Kafka 集群搭建步驟
本文中,我們將準備三臺機器搭建 Kafka 集群,IP 地址分別是 192.168.1.1,192.168.1.2,192.168.1.3,并且三臺機器網絡互通。
下載地址:?https://kafka.apache.org/downloads.html
下載完成后,上傳到目標機器中的一個,如 192.168.1.1 , 使用以下命令解壓:?
清單 1. Kafka 安裝包解壓命令
tar –xvf kafka_2.10-0.8.2.1
安裝完成。
在所有三臺服務器上執行下面操作。
切換到當前用戶工作目錄,如/home/fams , 創建 zookeeper 保存數據的目錄, 然后在這個目錄下新建服務器編號文件。?
清單 2. 創建數據目錄和服務器編號文件命令
mkdir zk_data?
cat N > myid
注意需要保證 N 在三臺服務器上取不同值,如分別取 1,2,3。
Kafka 安裝包中內置 zookeeper 服務。進入 Kafka 安裝目錄, 如/home/fams/kafka_2.10-0.8.2.1, 編輯 config/zookeeper.properties 文件,增加以下配置:?
清單 3. zookeeper 配置項
tickTime=2000?
dataDir=/home/fams/zk_data/?
clientPort=2181?
initLimit=5?
syncLimit=2?
server.1=192.168.1.1:2888:3888?
server.2=192.168.1.2:2888:3888?
server.3=192.168.1.3:2888:3888
這些配置項的解釋如下:
tickTime:zookeeper 服務器之間的心跳時間間隔,以毫秒為單位。 dataDir:zookeeper 的數據保存目錄,我們也把 zookeeper 服務器的 ID 文件保存到這個目錄下,下文會介紹。 clientPort:zookeeper 服務器會監聽這個端口,然后等待客戶端連接。 initLimit:zookeeper 集群中 follower 服務器和 leader 服務器之間建立初始連接時所能容忍的心跳次數的極限值。 syncLimit:zookeeper 集群中 follower 服務器和 leader 服務器之間請求和應答過程中所能容忍的心跳次數的極限值。 server.N:N 代表的是 zookeeper 集群服務器的編號。對于配置值,以 192.168.1.1:2888:3888 為例,192.168.1.1 表示該服務器的 IP 地址,2888 端口表示該服務器與 leader 服務器的數據交換端口,3888 表示選舉新的 leader 服務器時候用到的通信端口。5.編輯 Kafka 配置文件
a. 編輯 config/server.properties 文件
添加或修改以下配置。?
清單 4. Kafka Broker 配置項
broker.id=0?
port=9092?
host.name=192.168.1.1?
zookeeper.contact=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181?
log.dirs=/home/fams/kafka-logs
這些配置項解釋如下:
broker.id:Kafka broker 的唯一標識,集群中不能重復。 port: Broker 的監聽端口,用于監聽 Producer 或者 Consumer 的連接。 host.name:當前 Broker 服務器的 IP 地址或者機器名。 zookeeper.contact:Broker 作為 zookeeper 的 client,可以連接的 zookeeper 的地址信息。 log.dirs:日志保存目錄。b. 編輯 config/producer.properties 文件
添加或者修改以下配置:?
清單 5. Kafka Producer 配置項
broker.list=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092?
producer.type=async
這些配置項解釋如下:
broker.list:集群中 Broker 地址列表。 producer.type: Producer 類型,async 異步生產者,sync 同步生產者。c. 編輯 config/consumer.properties 文件?
清單 6. Kafka Consumer 配置項
zookeeper.contact=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
配置項解釋如下:
zookeeper.contact: Consumer 可以連接的 zookeeper 服務器地址列表。6.上傳修改好的安裝包到其他機器
至此,我們已經在 192.168.1.1 機器上修改好了所有需要的配置文件,那么接下來請用以下命令打包該 Kafka 安裝包,并上傳至 192.168.1.2 和 192.168.1.3 兩臺機器上。?
清單 7. 打包并上傳 Kafka 安裝包的命令
tar –cvf kafka_2.10-0.8.2.1.tar ./kafka_2.10-0.8.2.1?
scp ./kafka_2.10-0.8.2.1.tar fams@192.168.1.2:/home/fams?
scp ./kafka_2.10-0.8.2.1.tar fams@192.168.1.3:/home/fams
上傳完成后,我們需要到 192.168.1.2 和 192.168.1.3 兩臺機器上解壓剛才上傳的 tar 包,命令如清單一。之后需要分別在兩臺機器上修改 config/server.properties 文件中的 broker.id 和 host.name. broker.id,可以分別復制 1 和 2,host.name 需要改成當前機器的 IP。
分別在三臺機器上運行下面命令啟動 zookeeper 和 Kafka 服務。?
清單 8. 啟動 zookeeper 服務
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
清單 9. 啟動 kafka 服務
nohup bin/kafka-server-start.sh config/server.properties &
我們的驗證步驟有兩個。
第一步,分別在三臺機器上使用下面命令查看是否有 Kafka 和 zookeeper 相關服務進程。?
清單 10. 查看 Kafka 和 zookeeper 服務進程
ps –ef | grep kafka
第二步,創建消息主題,并通過 console producer 和 console consumer 驗證消息可以被正常的生產和消費。?
清單 11. 創建消息主題
bin/kafka-topics.sh –create \?
–replication-factor 3 \?
–partition 3 \?
–topic user-behavior-topic \?
–zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
運行下面命令打開打開 console producer。?
清單 12. 啟動 Console Producer
bin/kafka-console-producer.sh –broker-list 192.168.1.1:9092 –topic user-behavior-topic
在另一臺機器打開 console consumer。?
清單 13. 啟動 Console Consumer
./kafka-console-consumer.sh –zookeeper 192.168.1.2:2181 –topic user-behavior-topic –from-beginning
然后如果在 producer console 輸入一條消息,能從 consumer console 看到這條消息就代表安裝是成功的。
案例介紹與編程實現
該案例中,我們假設某論壇需要根據用戶對站內網頁的點擊量,停留時間,以及是否點贊,來近實時的計算網頁熱度,進而動態的更新網站的今日熱點模塊,把最熱話題的鏈接顯示其中。
對于某一個訪問論壇的用戶,我們需要對他的行為數據做一個抽象,以便于解釋網頁話題熱度的計算過程。
首先,我們通過一個向量來定義用戶對于某個網頁的行為即點擊的網頁,停留時間,以及是否點贊,可以表示如下:
(page001.html, 1, 0.5, 1)
向量的第一項表示網頁的 ID,第二項表示從進入網站到離開對該網頁的點擊次數,第三項表示停留時間,以分鐘為單位,第四項是代表是否點贊,1 為贊,-1 表示踩,0 表示中立。
其次,我們再按照各個行為對計算網頁話題熱度的貢獻,給其設定一個權重,在本文中,我們假設點擊次數權重是 0.8,因為用戶可能是由于沒有其他更好的話題,所以再次瀏覽這個話題。停留時間權重是 0.8,因為用戶可能同時打開多個 tab 頁,但他真正關注的只是其中一個話題。是否點贊權重是 1,因為這一般表示用戶對該網頁的話題很有興趣。
最后,我們定義用下列公式計算某條行為數據對于該網頁熱度的貢獻值。
f(x,y,z)=0.8x+0.8y+z
那么對于上面的行為數據 (page001.html, 1, 0.5, 1),利用公式可得:
H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2
讀者可以留意到,在這個過程中,我們忽略了用戶本身,也就是說我們不關注用戶是誰,而只關注它對于網頁熱度所做的貢獻。
在本案例中我們將使用一段程序來模擬用戶行為,該程序每隔 5 秒鐘會隨機的向 user-behavior-topic 主題推送 0 到 50 條行為數據消息,顯然,這個程序扮演消息生產者的角色,在實際應用中,這個功能一般會由一個系統來提供。為了簡化消息處理,我們定義消息的格式如下:
網頁 ID|點擊次數|停留時間 (分鐘)|是否點贊
并假設該網站只有 100 個網頁。以下是該類的 Scala 實現源碼。?
清單 14. UserBehaviorMsgProducer 類源碼
- 編寫 Spark Streaming 程序消費消息
在弄清楚了要解決的問題之后,就可以開始編碼實現了。對于本案例中的問題,在實現上的基本步驟如下:
構建 Spark 的 StreamingContext 實例,并且開啟 checkpoint 功能。因為我們需要使用 updateStateByKey 原語去累計的更新網頁話題的熱度值。 利用 Spark 提供的 KafkaUtils.createStream 方法消費消息主題,這個方法會返回 ReceiverInputDStream 對象實例。 對于每一條消息,利用上文的公式計算網頁話題的熱度值。 定義一個匿名函數去把網頁熱度上一次的計算結果值和新計算的值相加,得到最新的熱度值。 調用 updateStateByKey 原語并傳入上面定義的匿名函數更新網頁熱度值。 最后得到最新結果后,需要對結果進行排序,最后打印熱度值最高的 10 個網頁。源代碼如下。?
清單 15. WebPagePopularityValueCalculator 類源碼
部署和測試
讀者可以參考以下步驟部署并測試本案例提供的示例程序。
第一步,啟動行為消息生產者程序, 可以直接在 Scala IDE 中啟動,不過需要添加啟動參數,第一個是 Kafka Broker 地址,第二個是目標消息主題的名稱。?
圖 1. UserBehaviorMsgProducer 類啟動參數
啟動后,可以看到控制臺有行為消息數據生成。?
圖 2. 生成的行為消息數據預覽
第二步,啟動作為行為消息消費者的 Spark Streaming 程序,需要在 Spark 集群環境中啟動,命令如下:?
清單 16. WebPagePopularityValueCalculator 類啟動命令
bin/spark-submit \?
–jars?SPARKHOME/lib/spark?streaming?kafka2.10?1.3.1.jar,?SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar, \?
SPARKHOME/lib/kafka2.10?0.8.2.1.jar,?SPARK_HOME/lib/kafka-clients-0.8.2.1.jar \?
–class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator?
–master spark://:7077 \?
–num-executors 4 \?
–driver-memory 4g \?
–executor-memory 2g \?
–executor-cores 2 \?
/home/fams/sparkexercise.jar \?
192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 2
由于程序中我們要用到或者間接調用 Kafka 的 API,并且需要調用 Spark Streaming 集成 Kafka 的 API(KafkaUtils.createStream), 所以需要提前將啟動命令中的 jar 包上傳到 Spark 集群的每個機器上 (本例中我們將它們上傳到 Spark 安裝目錄的 lib 目錄下,即$SPARK_HOME/lib),并在啟動命令中引用它們。
啟動后,我們可以看到命令行 console 下面有消息打印出來,即計算的熱度值最高的 10 個網頁。?
圖 3. 網頁話題熱度當前排序預覽
我們也可以到 Spark Web Console 上去查看當前 Spark 程序的運行狀態, 默認地址為:?http://spark_master_ip:8080。?
圖 4. 查看 Spark Streaming 程序的運行狀態
注意事項
利用 Spark Streaming 構建一個高效健壯的流數據計算系統,我們還需要注意以下方面。
1. 需要合理的設置數據處理的間隔,即需要保證每一批數據的處理時間必須小于處理間隔,保證在處理下一批數據的時候,前一批已經處理完畢。顯然這需要由您的 Spark 集群的計算能力還有 input 數據的量決定。2. 需要盡可能的提升讀取 input 數據的能力。在 Spark Streaming 與外部系統如 Kafka,Flume 等集成時,為了避免接收數據環節成為系統的瓶頸,我們可以啟動多個 ReceiverInputDStream 對象實例。3. 雖然本文案例中,我們只是把 (近) 實時計算結果打印出來,但是實際上很多時候這些結果會被保存到數據庫,HDFS, 或者發送回 Kafka, 以供其他系統利用這些數據做進一步的業務處理。4. 由于流計算對實時性要求很高,所以任何由于 JVM Full GC 引起的系統暫停都是不可接受的。除了在程序中合理使用內存,并且定期清理不需要的緩存數據外,CMS(Concurrent Mark and Sweep) GC 也是被 Spark 官方推薦的 GC 方式,它能有效的把由于 GC 引起的暫停維持在一個在很低的水平。我們可以在使用 spark-submit 命令時通過增加 --driver-java-options 選項來添加 CMS GC 相關的參數。 5. 在 Spark 官方提供關于集成 Kafka 和 Spark Streaming 的指導文檔中,提到了兩種方式,第一種是 Receiver Based Approach,即通過在 Receiver 里實現 Kafka consumer 的功能來接收消息數據;第二種是 Direct Approach, 即不通過 Receiver,而是周期性的主動查詢 Kafka 消息分區中的最新 offset 值,進而去定義在每個 batch 中需要處理的消息的 offset 范圍。本文采用的是第一種方式,因為目前第二種方式還處于試驗階段。 6. 如果采用 Receiver Based Approach 集成 Kafka 和 Spark Streaming,就需要考慮到由于 Driver 或者 Worker 節點宕機而造成的數據丟失的情況,在默認配置下,是有可能造成數據丟失的,除非我們開啟 Write Ahead Log(WAL) 功能。在這種情況下,從 Kafka 接收到的消息數據會同步的被寫入到 WAL 并保存到可靠的分布式文件系統上,如 HDFS。可以通過在 Spark 配置文件中 (conf/spark-defaults.conf) 把 spark.streaming.receiver.writeAheadLog.enable 配置項設置成 true 開啟這個功能。當然在開啟 WAL 的情況下,會造成單個 Receiver 吞吐量下降,這時候,我們可能需要并行的運行多個 Receiver 來改善這種情況。 7. 由于 updateStateByKey 操作需要開啟 checkpoint 功能,但是頻繁的 checkpoint 會造成程序處理時間增長,也會造成吞吐量下降。默認情況下,checkpoint 時間間隔會取 steaming 程序數據處理間隔或者 10 秒兩者中較大的那個。官方推薦的間隔是 streaming 程序數據處理間隔的 5-10 倍。可以通過 dsteam.checkpoint(checkpointInterval) 來設置,參數需要用樣本類 Duration 包裝下,單位是毫秒。結束語
本文包含了集成 Spark Streaming 和 Kafka 分布式消息系統的基本知識,但是需要指出的是,在實際問題中,我們可能面臨更多的問題,如性能優化,內存不足,以及其他未曾遇到的問題。希望通過本文的閱讀,讀者能對使用 Spark Streaming 和 Kafka 構建實時數據處理系統有一個基本的認識,為讀者進行更深入的研究提供一個參考依據。
轉載于:https://www.cnblogs.com/camilla/p/8431602.html
總結
以上是生活随笔為你收集整理的使用 Kafka 和 Spark Streaming 构建实时数据处理系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JS表格分页(封装版)
- 下一篇: 列表标签