SparkStreaming -Kafka数据源
生活随笔
收集整理的這篇文章主要介紹了
SparkStreaming -Kafka数据源
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
SparkStreaming處理kafka作為數據源
所以我們要創建的是kafka的Dstream,那么就要使用到KafkaUtils下的createStream,先來看一下ctrl點進去查看,然后來寫參數
package date_10_16_SparkStreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils object kafkaSource {def main(args: Array[String]): Unit = {//使用SparkStreaming完成wordcount//配置對象val conf = new SparkConf().setMaster("local[*]").setAppName("wordcount")//實時數據分析的環境對象//StreamingContext需要兩個參數,一個conf,一個是采集周期val streamingContext = new StreamingContext(conf,Seconds(5))//從kafka采集數據val kafkaStream = KafkaUtils.createStream(streamingContext,"chun1:2181","chun",Map("chun"->3))//將采集的數據進行分解(扁平化)val wordToSumDstream = kafkaStream.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_)wordToSumDstream.print()//這里不能停止采集功能,也就是streamingContext不能結束//可以簡單理解為啟動采集器streamingContext.start()//Driver等待采集器,采集器不挺Driver不停止streamingContext.awaitTermination()} }開啟kafka,輸入數據
kafka-console-producer.sh --broker-list chun1:2181 --topic chun a a a a a a a a a a a a a a aidea里查看結果
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的SparkStreaming -Kafka数据源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哈弗酷狗内饰官图发布:坦克300同款档杆
- 下一篇: 暴雨高温无缝衔接!河南又变“蒸炉”了:大