第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密
? ?Spark Streaming的DStream為我們提供了一個(gè)updateStateByKey方法,它的主要功能是可以隨著時(shí)間的流逝在Spark Streaming中為每一個(gè)key維護(hù)一份state狀態(tài),通過(guò)更新函數(shù)對(duì)該key的狀態(tài)不斷更新。對(duì)每一個(gè)新的batch而言,Spark Streaming會(huì)在使用updateStateByKey的時(shí)候?yàn)橐呀?jīng)存在的key進(jìn)行state的狀態(tài)更新(對(duì)每個(gè)新出現(xiàn)的key,會(huì)同樣執(zhí)行state的更新函數(shù)操作),但是如果通過(guò)更新函數(shù)對(duì)state更新后返回none的話(huà),此時(shí)刻key對(duì)應(yīng)的state狀態(tài)被刪除掉,需要特別說(shuō)明的是state可以是任意類(lèi)型的數(shù)據(jù)結(jié)構(gòu),這就為我們的計(jì)算帶來(lái)無(wú)限的想象空間;
? 重點(diǎn)來(lái)了!!!如果要不斷的更新每個(gè)key的state,就一定會(huì)涉及到狀態(tài)的保存和容錯(cuò),這個(gè)時(shí)候就需要開(kāi)啟checkpoint機(jī)制和功能,需要說(shuō)明的是checkpoint可以保存一切可以存儲(chǔ)在文件系統(tǒng)上的內(nèi)容,例如:程序未處理的數(shù)據(jù)及已經(jīng)擁有的狀態(tài)。
? 補(bǔ)充說(shuō)明:關(guān)于流式處理對(duì)歷史狀態(tài)進(jìn)行保存和更新具有重大實(shí)用意義,例如進(jìn)行廣告(投放廣告和運(yùn)營(yíng)廣告效果評(píng)估的價(jià)值意義,熱點(diǎn)隨時(shí)追蹤、熱力圖)
? 簡(jiǎn)單的來(lái)說(shuō),如果我們需要進(jìn)行wordcount,每個(gè)batchInterval都會(huì)計(jì)算出新的一批數(shù)據(jù),這批數(shù)據(jù)如何更新到以前計(jì)算的結(jié)果上?updateStateByKey就能實(shí)現(xiàn)此功能。
函數(shù)定義如下:
def?updateStateByKey[S:?ClassTag](updateFunc:?(Seq[V],?Option[S])?=>?Option[S]):?DStream[(K,?S)]?=?ssc.withScope?{updateStateByKey(updateFunc,?defaultPartitioner()) }updateStateByKey 需要傳入一個(gè)函數(shù),該函數(shù)有兩個(gè)參數(shù)Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對(duì)應(yīng)的以前的值。返回的時(shí)一個(gè)key的最新值。
下面我們用實(shí)例演示:
package?com.dt.spark.streamingimport?org.apache.spark.SparkConf import?org.apache.spark.streaming.{Seconds,?StreamingContext}/***?Created?by?Administrator?on?2016/5/3.*/ object?UpdateStateByKeyDemo?{def?main(args:?Array[String])?{val?conf?=?new?SparkConf().setAppName("UpdateStateByKeyDemo")val?ssc?=?new?StreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必須設(shè)置Checkpoint。ssc.checkpoint("/checkpoint/")val?socketLines?=?ssc.socketTextStream("spark-master",9999)socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey((currValues:Seq[Int],preValue:Option[Int])?=>{val?currValue?=?currValues.sumSome(currValue?+?preValue.getOrElse(0))}).print()ssc.start()ssc.awaitTermination()ssc.stop()} }打包上傳至spark集群。
打開(kāi)nc,發(fā)送測(cè)試數(shù)據(jù)
root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark運(yùn)行spark 程序
root@spark-master:~#?/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit?--class?com.dt.spark.streaming.UpdateStateByKeyDemo??--master?spark://spark-master:7077?./spark.jar查看運(yùn)行結(jié)果:
------------------------------------------- Time:?1462282180000?ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)我們?cè)趎c中再輸入一些數(shù)據(jù)
root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark再次查看結(jié)果:
------------------------------------------- Time:?1462282200000?ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)可見(jiàn),它將我們兩次統(tǒng)計(jì)結(jié)果合并了。
備注:
1、DT大數(shù)據(jù)夢(mèng)工廠(chǎng)微信公眾號(hào)DT_Spark?
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博:?http://www.weibo.com/ilovepains
轉(zhuǎn)載于:https://blog.51cto.com/lqding/1769852
總結(jié)
以上是生活随笔為你收集整理的第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 一步一步学lucene——(第四步:搜索
- 下一篇: [android] 切换按钮-自定义控件