获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上
生活随笔
收集整理的這篇文章主要介紹了
获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、創建Maven項目
創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374
2、準備日志文件
url.log的內容類似:
3、編寫UrlCount1,代碼如下:
通過scala的方式獲取日志文件中每類次主機名出現的前3名
package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 獲取到每類host出現的次數的前三名,下面通過sacle的方式實現* Created by toto on 2017/7/8.*/ object UrlCount1 {def main(args: Array[String]): Unit = {//使用local就是啟動一個線程,local[2]表示啟動2個線程,Local[*]表示根據機器來自動分配val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line =>{val fields = line.split("\t")val url = fields(1)//封裝成url,次數(url,1)})//聚合,計算某個url出現了多少次,所以要聚合一下,這里做了Cache,問題是當數據量很大的時候,可能出現內存溢出val summedUrl = urlAndOne.reduceByKey(_+_).cache()println(summedUrl)//返回的是[(host,url,次數)]這樣的元組//groupBy(_._1) 表示按照host進行分組val grouped = summedUrl.map(t => {val host = new URL(t._1).getHost//主機名,url,次數(host,t._1,t._2)}).groupBy(_._1)println(grouped)//_ :表示上面的集合//toList :表示它轉化為集合//sortBy :這里是scala的集合//_._3 :表示按照次數進行排序//.reverse.take(3) :表示取前3名val result = grouped.mapValues(_.toList.sortBy(_._3).reverse.take(3))println(result.collect().toBuffer)sc.stop()} }運行參數配置:
運行結果:
4、通過Spark的方式計算URL出現的前3名
代碼如下:
package cn.toto.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkRDD的方式取出每個子Host的出現的次數的前3名,并循環打印出來。* Created by toto on 2017/7/8.*/ object UrlCount2 {/*** 使用了Spark的RDD緩存機制,這樣再進行排序時不會出現內存溢出* @param args*/def main(args: Array[String]): Unit = {//后續這些Url就從數據庫中獲取到val urls = Array("http://java.toto.cn","http://php.toto.cn","http://net.toto.cn")val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)//(url,次數)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_)//循環過濾for(u <- urls) {//過濾(值過濾出urls這些的內容)val insRdd = summedUrl.filter(t => {val url = t._1url.startsWith(u)})val result = insRdd.sortBy(_._2, false).take(3)println(result.toBuffer)}sc.stop()} }運行參數配置:
運行結果:
5、將url進行篩選,分類,并通過自定義分區將數據存儲到不同的文件中
package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/*** 自定義Partitioner,按照不同的子主機名存儲到不同的分區文件中* Created by toto on 2017/7/8.*/ object UrlCount3 {/*** 如果把每個學員單獨產生的內容都寫入到磁盤文件中* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines : RDD[String] = sc.textFile("E:\\workspace\\url.log")//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_).cache()val rdd1 = summedUrl.map(t => {val host = new URL(t._1).getHost//(host,(url,出現次數))(host,(t._1,t._2))})val urls = rdd1.map(_._1).distinct().collect()val partitioner = new HostPartitioner(urls)//安裝自定義的分區器重新分區val partitionedRdd = rdd1.partitionBy(partitioner)val result = partitionedRdd.mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})result.saveAsTextFile("E:\\workspace\\out")sc.stop()} }class HostPartitioner(urls: Array[String]) extends Partitioner {val rules = new mutable.HashMap[String,Int]()var index = 0for(url <- urls){rules.put(url,index)index += 1}override def getPartition(key: Any): Int = {val url = key.toString//如果取到了值就返回url,否則返回0rules.getOrElse(url,0)}//分區數量override def numPartitions: Int = urls.length }最終的輸出內容是:
總結
以上是生活随笔為你收集整理的获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 港股恒生指数交易时间
- 下一篇: 荣耀v40微信视频美颜怎么设置 其它配置