Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.                        
                                1、創(chuàng)建Maven項(xiàng)目
創(chuàng)建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374
2、啟動(dòng)Kafka
A:安裝kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 
 B:創(chuàng)建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874
3、編寫Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version> </properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency> </dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins> </build></project>4.編寫代碼
package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.* 從kafka中讀數(shù)據(jù),并且進(jìn)行單詞數(shù)量的計(jì)算*/ object KafkaWordCount {/*** String :單詞* Seq[Int] :單詞在當(dāng)前批次出現(xiàn)的次數(shù)* Option[Int] :歷史結(jié)果*/val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }}def main(args: Array[String]): Unit = {LoggerLevels.setStreamingLogLevels()//這里的args從IDEA中傳入,在Program arguments中填寫如下內(nèi)容://參數(shù)用一個(gè)數(shù)組來接收://zkQuorum :zookeeper集群的//group :組//topic :kafka的組//numThreads :線程數(shù)量//hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 要注意的是要?jiǎng)?chuàng)建line這個(gè)topicval Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))ssc.checkpoint("E:\\wordcount\\outcheckpoint")//"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"//"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//保存到內(nèi)存和磁盤,并且進(jìn)行序列化val data: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)//從kafka中寫數(shù)據(jù)其實(shí)也是(key,value)形式的,這里的_._2就是valueval words = data.map(_._2).flatMap(_.split(" "))val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism), true)wordCounts.print()ssc.start()ssc.awaitTermination()} }5.配置IDEA中運(yùn)行的參數(shù):
 
 配置說明:
6、創(chuàng)建kafka,并在kafka中傳遞參數(shù)
啟動(dòng)kafka
[root@hadoop1 kafka]# pwd /home/tuzq/software/kafka/servers/kafka [root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &創(chuàng)建topic
[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount Created topic "wordcount".查看主題
bin/kafka-topics.sh --list --zookeeper hadoop11:2181啟動(dòng)一個(gè)生產(chǎn)者發(fā)送消息(我的kafka在hadoop1,hadoop2,hadoop3這幾臺(tái)機(jī)器上)
[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount No safe wading in an unknown water Anger begins with folly,and ends in repentance No safe wading in an unknown water Anger begins with folly,and ends in repentance Anger begins with folly,and ends in repentance使用spark-submit來運(yùn)行程序
#啟動(dòng)spark-streaming應(yīng)用程序 bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 17、查看運(yùn)行結(jié)果
8、再如統(tǒng)計(jì)URL出現(xiàn)的次數(shù)
package cn.toto.sparkimport org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/14.*/ object UrlCount {val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}}def main(args: Array[String]) {//接收命令行中的參數(shù)val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args//創(chuàng)建SparkConf并設(shè)置AppNameval conf = new SparkConf().setAppName("UrlCount")//創(chuàng)建StreamingContextval ssc = new StreamingContext(conf, Seconds(2))//設(shè)置檢查點(diǎn)ssc.checkpoint(hdfs)//設(shè)置topic信息val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//重Kafka中拉取數(shù)據(jù)創(chuàng)建DStreamval lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)//切分?jǐn)?shù)據(jù),截取用戶點(diǎn)擊的urlval urls = lines.map(x=>(x.split(" ")(6), 1))//統(tǒng)計(jì)URL點(diǎn)擊量val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)//將結(jié)果打印到控制臺(tái)result.print()ssc.start()ssc.awaitTermination()} }總結(jié)
以上是生活随笔為你收集整理的Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 电脑系统u怎么安装系统 如何安装电脑系统
- 下一篇: 三星本怎么u盘启动项 三星电脑如何设置U
