【Spark-core学习之九】 Spark案例
環境
虛擬機:VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依賴jdk1.8)
spark-1.6
一、PV & UV
PV是網站分析的一個術語,用以衡量網站用戶訪問的網頁的數量。對于廣告主,PV值可預期它可以帶來多少廣告收入。一般來說,PV與來訪者的數量成正比,但是PV并不直接決定頁面的真實來訪者數量,如同一個來訪者通過不斷的刷新頁面,也可以制造出非常高的PV。
1、什么是PV值
PV(page view)即頁面瀏覽量或點擊量,是衡量一個網站或網頁用戶訪問量。具體的說,PV值就是所有訪問者在24小時(0點到24點)內看了某個網站多少個頁面或某個網頁多少次。PV是指頁面刷新的次數,每一次頁面刷新,就算做一次PV流量。
度量方法就是從瀏覽器發出一個對網絡服務器的請求(Request),網絡服務器接到這個請求后,會將該請求對應的一個網頁(Page)發送給瀏覽器,從而產生了一個PV。那么在這里只要是這個請求發送給了瀏覽器,無論這個頁面是否完全打開(下載完成),那么都是應當計為1個PV。
?
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根據PV定義 某個頁面/網址的訪問數量 將每一條記錄根據網址解析出一條訪問量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加頁面訪問量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//換個 用于按照整數key排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}結果:
(www.baidu.com,18791) (www.dangdang.com,18751) (www.suning.com,18699) (www.mi.com,18678) (www.taobao.com,18613)?
2、什么是UV值
UV(unique visitor)即獨立訪客數,指訪問某個站點或點擊某個網頁的不同IP地址的人數。在同一天內,UV只記錄第一次進入網站的具有獨立IP的訪問者,在同一天內再次訪問該網站則不計數。UV提供了一定時間內不同觀眾數量的統計指標,而沒有反應出網站的全面活動。
?
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根據IP網址來確定唯一用戶訪問 然后排重 累計* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反轉 數值做KEY 用于排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后 反轉回來 @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5個元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}結果:
(www.baidu.com,15830) (www.suning.com,15764) (www.mi.com,15740) (www.jd.com,15682) (www.dangdang.com,15641)?
二、二次排序
對于兩列以上的數據,要求對第一列排序之后,之后的列也要依次排序,思路就是:先對第一列進行排序,對于第一列數值相同,再對第二列進行排序。
舉例:
待排序數據:secondSort.txt
3 1 5 2 6 5 8 123 1 4 4 123 5 432 3 54 5 121 8 654 3 98 package com.wjy.test;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//轉成K-V格式//PairFunction 入參1-rdd的一行記錄 入參2 入參3是call的出參JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 會使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});sc.stop();
}}
?
對于KEY自定義類型 實現comparable接口 實現comparTo方法
package com.wjy.test;import java.io.Serializable;public class SecondSortKey implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比較第一個數值 如果相同再比較第二個值 否則直接返回第一個值的比較結果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}?排序結果:
8 654 8 123 6 5 5 432 5 121 5 2 4 123 3 98 3 54 3 1 1 4?
三、分組取topN
對于多組數據,去每一組數據前N個數據,比如列出每個班級的前三名等等問題。
解決的思路:先分組,然后每一組排序,取前N個。
案例:有三個班級的分數清單scores.txt,取出每班前三名。
?groupByKey+排序算法:
package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//轉成K-V格式 方便下一步分組和排序//PairFunction 入參1rdd的一行數據 入參2、3是call的出參元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 將相同班級的數據放在一個集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}}); sc.stop(); } }?
topN 結果:
classname=class3 98 70 70 classname=class1 102 100 99 classname=class2 88 85 85?
轉載于:https://www.cnblogs.com/cac2020/p/10684754.html
總結
以上是生活随笔為你收集整理的【Spark-core学习之九】 Spark案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何用Kaplan-MeierPlott
- 下一篇: 济南市BSL-2(生物安全等级2级)实验