基于案例贯通 Spark Streaming 流计算框架的运行源码
?
本期內(nèi)容 :
- Spark Streaming+Spark SQL案例展示
- 基于案例貫穿Spark Streaming的運行源碼
?
一、?案例代碼闡述 :
在線動態(tài)計算電商中不同類別中最熱門的商品排名,例如:手機類別中最熱門的三種手機、電視類別中最熱門的三種電視等。
?
1、案例運行代碼 :
import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.streaming.{Seconds, StreamingContext} object OnlineTheTop3ItemForEachCategory2DB {def main(args: Array[String]){ /**** 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序的運行時的配置信息, */val conf = new SparkConf() //創(chuàng)建SparkConf對象conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設(shè)置應(yīng)用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱 //conf.setMaster("spark://Master:7077") //此時,程序在Spark集群conf.setMaster("local[6]")//設(shè)置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口val ssc = new StreamingContext(conf, Seconds(5))ssc.checkpoint("/root/Documents/SparkApps/checkpoint")val userClickLogsDStream = ssc.socketTextStream("Master", 9999)val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1)) //val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2, //(v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20)) val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_, Seconds(60), Seconds(20))categoryUserClickLogsDStream.foreachRDD { rdd => {if (rdd.isEmpty()) {println("No data inputted!!!")} else {val categoryItemRow = rdd.map(reducedItem => {val category = reducedItem._1.split("_")(0)val item = reducedItem._1.split("_")(1)val click_count = reducedItem._2Row(category, item, click_count)})val structType = StructType(Array(StructField("category", StringType, true),StructField("item", StringType, true),StructField("click_count", IntegerType, true)))val hiveContext = new HiveContext(rdd.context)val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)categoryItemDF.registerTempTable("categoryItemTable")val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +" OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +" WHERE rank <= 3")reseltDataFram.show()val resultRowRDD = reseltDataFram.rddresultRowRDD.foreachPartition { partitionOfRecords => {if (partitionOfRecords.isEmpty){println("This RDD is not null but partition is null")} else {// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => {val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +record.getAs("item") + "'," + record.getAs("click_count") + ")"val stmt = connection.createStatement();stmt.executeUpdate(sql);})ConnectionPool.returnConnection(connection) // return to the pool for future reuse }}}}}ssc.start()ssc.awaitTermination()}}
}
? 2、案例流程框架圖 :
二、?基于案例的源碼解析?:
1、?構(gòu)建Spark的配置對象SparkConf,設(shè)置Spark程序的運行時的配置信息:
2、構(gòu)建StreamingContext時傳遞SparkConf參數(shù)在內(nèi)部創(chuàng)建SparkContext?:
3、創(chuàng)建了 StreamingContext :?同時說明Spark Streaming 是Spark Core上的一個應(yīng)用程序
4、 checkpoint 持久化
5、構(gòu)建SocketTextStream 獲取輸入源
01、 創(chuàng)建Socket 獲取輸入流
02、 SocketInputDstream繼承ReceiverInputDStream,通過構(gòu)建Receiver來接收數(shù)據(jù)
03、 創(chuàng)建SocketReceiver
04、?通過Receiver 在網(wǎng)絡(luò)獲取相關(guān)數(shù)據(jù)
05、數(shù)據(jù)輸出
06、生成job作業(yè)
07、 根據(jù)時間間隔產(chǎn)生RDD ,存儲數(shù)據(jù)
6、 Streaming Start :
7、?流程總結(jié)?:
01、 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán)。
02、 在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法:
-
-
- JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job?;
- ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver?(其實是在Executor中先啟動ReceiverSupervisor);
-
03、 在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker?。
04、 在ReceiverTracker內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息?。
05、 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已?。
06、 要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行)。
轉(zhuǎn)載于:https://www.cnblogs.com/yinpin2011/p/5469694.html
總結(jié)
以上是生活随笔為你收集整理的基于案例贯通 Spark Streaming 流计算框架的运行源码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 生成器模式 - 让配置代码更优雅
- 下一篇: Grunt教程——安装Grunt