一、 實(shí)戰(zhàn)
1.用Spark Streaming實(shí)現(xiàn)實(shí)時(shí)WordCount
架構(gòu)圖:
說明:在hadoop1:9999下的nc上發(fā)送消息,消費(fèi)端接收消息,然后并進(jìn)行單詞統(tǒng)計(jì)計(jì)算。
* 2.安裝并啟動(dòng)生成者 *
首先在一臺(tái)Linux(ip:192.168.10.101)上用YUM安裝nc工具
yum install -y nc
啟動(dòng)一個(gè)服務(wù)端并監(jiān)聽9999端口
nc -lk 9999
2.編寫Spark Streaming程序
編寫Pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0
</modelVersion><groupId>cn.toto.spark
</groupId><artifactId>bigdata
</artifactId><version>1.0-SNAPSHOT
</version><properties><maven.compiler.source>1.7
</maven.compiler.source><maven.compiler.target>1.7
</maven.compiler.target><encoding>UTF-8
</encoding><scala.version>2.10.6
</scala.version><spark.version>1.6.2
</spark.version><hadoop.version>2.6.4
</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang
</groupId><artifactId>scala-library
</artifactId><version>${scala.version}
</version></dependency><dependency><groupId>org.scala-lang
</groupId><artifactId>scala-compiler
</artifactId><version>${scala.version}
</version></dependency><dependency><groupId>org.scala-lang
</groupId><artifactId>scala-reflect
</artifactId><version>${scala.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-core_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-client
</artifactId><version>${hadoop.version}
</version></dependency><dependency><groupId>mysql
</groupId><artifactId>mysql-connector-java
</artifactId><version>5.1.38
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-sql_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-mllib_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-hive_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming-flume_2.10
</artifactId><version>${spark.version}
</version></dependency><dependency><groupId>org.apache.spark
</groupId><artifactId>spark-streaming-kafka_2.10
</artifactId><version>${spark.version}
</version></dependency></dependencies><build><sourceDirectory>src/main/scala
</sourceDirectory><testSourceDirectory>src/test/scala
</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven
</groupId><artifactId>scala-maven-plugin
</artifactId><version>3.2.2
</version><executions><execution><goals><goal>compile
</goal><goal>testCompile
</goal></goals><configuration><args><arg>-make:transitive
</arg><arg>-dependencyfile
</arg><arg>${project.build.directory}/.scala_dependencies
</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins
</groupId><artifactId>maven-surefire-plugin
</artifactId><version>2.18.1
</version><configuration><useFile>false
</useFile><disableXmlReport>true
</disableXmlReport><includes><include>**/*Test.*
</include><include>**/*Suite.*
</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins
</groupId><artifactId>maven-shade-plugin
</artifactId><version>2.4.3
</version><executions><execution><phase>package
</phase><goals><goal>shade
</goal></goals><configuration><filters><filter><artifact>*:*
</artifact><excludes><exclude>META-INF/*.SF
</exclude><exclude>META-INF/*.DSA
</exclude><exclude>META-INF/*.RSA
</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.JdbcRDDDemo
</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
package cn.toto.spark.streams
import org.apache.log4j.{Level, Logger}
import org.apache.spark.Logging
import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging
object LoggerLevels extends Logging {def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {logInfo(
"Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}
package cn.toto.spark
import cn.toto.spark.streams.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*** Created by toto on 2017/7/13.*/
object NetworkWordCount {def main(args: Array[String]) {LoggerLevels.setStreamingLogLevels()
val conf =
new SparkConf().setMaster(
"local[2]").setAppName(
"NetworkWordCount")
val ssc =
new StreamingContext(conf, Seconds(
5))
val lines = ssc.socketTextStream(
"hadoop1",
9999)
val words = lines.flatMap(_.split(
" "))
val pairs = words.map(word => (word,
1))
val wordCounts = pairs.reduceByKey(_ + _)wordCounts.print()ssc.start()ssc.awaitTermination()}
}
3.啟動(dòng)Spark Streaming程序:由于使用的是本地模式”local[2]”所以可以直接在本地運(yùn)行該程序
注意: 要指定并行度,如在本地運(yùn)行設(shè)置setMaster(“l(fā)ocal[2]”),相當(dāng)于啟動(dòng)兩個(gè)線程,一個(gè)給receiver,一個(gè)給computer。如果是在集群中運(yùn)行,必須要求集群中可用core數(shù)大于1
4.在Linux端命令行中輸入單詞
5.在IDEA控制臺(tái)中查看結(jié)果
二、DStream的使用
package cn.toto.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*** Created by toto on 2017/7/13.*/
object StreamingWordCount {def main(args: Array[String]): Unit = {
val conf =
new SparkConf().setAppName(
"StreamingWordCount").setMaster(
"local[2]")
val ssc =
new StreamingContext(conf, Seconds(
5))
val lines:ReceiverInputDStream[String] = ssc.socketTextStream(
"hadoop1",
9999)
val words: DStream[String] = lines.flatMap(_.split(
" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,
1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)result.print()ssc.start()ssc.awaitTermination()}
}
運(yùn)行結(jié)果:
上面的案例中,所有的都是臨時(shí)計(jì)算,然后獲得到結(jié)果內(nèi)容,第二次計(jì)算的時(shí)候結(jié)果值不是在上一次基礎(chǔ)上進(jìn)行累加的。下面的案例中將實(shí)現(xiàn)累加的效果:
在上述的wordCount案例中,每次在Linux端輸入的單詞次數(shù)都被正確的統(tǒng)計(jì)出來,但是結(jié)果不能累加,如果需要累加需要使用updateStateByKey(func)來更新狀態(tài)
package cn
.toto.sparkimport cn
.toto.spark.streams.LoggerLevels
import org
.apache.spark.{HashPartitioner, SparkConf}
import org
.apache.spark.streaming.{Seconds, StreamingContext}object NetworkUpdateStateWordCount {val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter
.flatMap(it=>Some(it._2
.sum + it._3
.getOrElse(
0))
.map(
x=>(it._1,
x)))iter
.flatMap{case(
x,
y,
z)=>Some(
y.sum +
z.getOrElse(
0))
.map(m=>(
x, m))}}def main(args: Array[String]) {LoggerLevels
.setStreamingLogLevels()val conf = new SparkConf()
.setMaster(
"local[2]")
.setAppName(
"NetworkUpdateStateWordCount")val ssc = new StreamingContext(conf, Seconds(
5))//做checkpoint 寫入共享存儲(chǔ)中ssc
.checkpoint(
"E://workspace//netresult")val lines = ssc
.socketTextStream(
"hadoop1",
9999)//reduceByKey 結(jié)果不累加//val result = lines
.flatMap(_
.split(
" "))
.map((_,
1))
.reduceByKey(_+_)//updateStateByKey結(jié)果可以累加但是需要傳入一個(gè)自定義的累加函數(shù):updateFuncval results = lines
.flatMap(_
.split(
" "))
.map((_,
1))
.updateStateByKey(updateFunc,new HashPartitioner(ssc
.sparkContext.defaultParallelism),true)results
.print()ssc
.start()ssc
.awaitTermination()}}
在nc上輸入內(nèi)容:
運(yùn)行結(jié)果如下:
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。