每日top3热点搜索词统计案例
數(shù)據(jù)格式:
日期,用戶,搜索詞,平臺(tái),版本
需求:
1、篩選出符合條件(城市,平臺(tái),版本)的數(shù)據(jù)
2、統(tǒng)計(jì)每天搜索uv排名前三的搜索詞
3、按照每天的top3搜索詞的uv搜索總次數(shù),倒敘排序
4、將數(shù)據(jù)保存到hive表中
思路分析
1、針對(duì)原始數(shù)據(jù)(HDFS文件),獲取輸入RDD
2、使用filter算法,針對(duì)輸入RDD中的數(shù)據(jù),進(jìn)行數(shù)據(jù)過(guò)濾,過(guò)濾出符合條件的數(shù)據(jù)
2.1普通的算法:直接在filter算法函數(shù)中,使用外部的查詢條件(map),但是,這樣做的話,是不是查詢條件map,會(huì)發(fā)送到每一個(gè)task上一份副本,(性能不好)
2.2優(yōu)化后的做法,將查詢條件,封裝為broadCast廣播變量,在filter算法中使用broadCast廣播變量。
3、將數(shù)據(jù)轉(zhuǎn)換為(日期_搜索詞,用戶)格式,對(duì)他進(jìn)行分組。然后再次進(jìn)行映射,對(duì)每天每個(gè)搜索詞的搜索用戶進(jìn)行去重操作,并統(tǒng)計(jì)去重后的數(shù)據(jù),即為每天每個(gè)搜搜詞的uv,最后獲得(日期_搜搜詞,uv)。
4、將得到的每天每個(gè)搜索詞的uvRDD,映射為元素類型的RowRDD,轉(zhuǎn)換為DataFrame。
5、注冊(cè)為臨時(shí)表,使用SparkSQL的開窗函數(shù),統(tǒng)計(jì)每天的uv數(shù)量排名前三名的搜索詞,以及他的搜索nv,最后獲得一個(gè)DataFrame
6、將DataFrame轉(zhuǎn)換為RDD,繼續(xù)操作,按照每天日期來(lái)分組,并進(jìn)行映射,計(jì)算出每天的top3搜索詞的uv的總數(shù),然后將uv總數(shù)作為key,將每天的top3搜索詞以及搜索次數(shù),拼接為一個(gè)字符串
7、按照每天的top3搜索總uv,進(jìn)行排序,倒序排序
8、將排好的數(shù)據(jù),再次映射回來(lái),變成 日期_搜索詞_uv的格式
9、再次映射為DataFrame,并將數(shù)據(jù)保存到hive中。
文本:
? ?2018-10-1:leo:water:beijing:android:1.0
2018-10-1:leo1:water:beijing:android:1.0
2018-10-1:leo2:water:beijing:android:1.0
2018-10-1:jack:water:beijing:android:1.0
2018-10-1:jack1:water:beijing:android:1.0
2018-10-1:leo:seafood:beijing:android:1.0
2018-10-1:leo1:seafood:beijing:android:1.0
2018-10-1:leo2:seafood:beijing:android:1.0
2018-10-1:leo:food:beijing:android:1.0
2018-10-1:leo1:food:beijing:android:1.0
2018-10-1:leo2:meat:beijing:android:1.0
2018-10-2:leo:water:beijing:android:1.0
2018-10-2:leo1:water:beijing:android:1.0
2018-10-2:leo2:water:beijing:android:1.0
2018-10-2:jack:water:beijing:android:1.0
2018-10-2:leo1:seafood:beijing:android:1.0
2018-10-2:leo2:seafood:beijing:android:1.0
2018-10-2:leo3:seafood:beijing:android:1.0
2018-10-2:leo1:food:beijing:android:1.0
2018-10-2:leo2:food:beijing:android:1.0
2018-10-2:leo:meat:beijing:android:1.0
代碼:
package com.bynear.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.*; public class DailyTop3Keyword {public static void main(String[] args) {SparkConf conf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new HiveContext(jsc.sc()); // 偽造數(shù)據(jù)(這些數(shù)據(jù)可以來(lái)自mysql數(shù)據(jù)庫(kù)) final HashMap<String, List<String>> queryParaMap = new HashMap<String, List<String>>(); queryParaMap.put("city", Arrays.asList("beijing")); queryParaMap.put("platform", Arrays.asList("android")); queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5")); // 將數(shù)據(jù)進(jìn)行廣播 final Broadcast<HashMap<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParaMap); // 讀取文本 JavaRDD<String> rowRDD = jsc.textFile("hdfs://Spark01:9000/zjs/daily.txt"); // filter算子進(jìn)行過(guò)濾 JavaRDD<String> filterRDD = rowRDD.filter(new Function<String, Boolean>() {@Override public Boolean call(String log) throws Exception {String[] logSplit = log.split(":"); String city = logSplit[3]; String platform = logSplit[4]; String version = logSplit[5]; HashMap<String, List<String>> queryParamMap = queryParamMapBroadcast.value(); List<String> cities = queryParamMap.get("city"); if (!cities.contains(city) && cities.size() > 0) {return false; }List<String> platforms = queryParamMap.get("platform"); if (!platforms.contains(platform)) {return false; }List<String> versions = queryParamMap.get("version"); if (!versions.contains(version)) {return false; }return true; }}); // 過(guò)濾出來(lái)的原始日志,映射為(日期_搜索詞,用戶)格式 JavaPairRDD<String, String> dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {@Override public Tuple2<String, String> call(String log) throws Exception {String[] logSplit = log.split(":"); String date = logSplit[0]; String user = logSplit[1]; String keyword = logSplit[2]; return new Tuple2<String, String>(date + "_" + keyword, user); }}); // 進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重) JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey(); List<Tuple2<String, Iterable<String>>> collect1 = dateKeywordUsersRDD.collect(); for (Tuple2<String, Iterable<String>> tuple2 : collect1) {System.out.println("進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)" + tuple2._2); System.out.println(tuple2); }// 對(duì)每天每個(gè)搜索詞的搜索用戶 去重操作 獲得前uv JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {@Override public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> dataKeywordUsers) throws Exception {String dateKeyword = dataKeywordUsers._1; Iterator<String> users = dataKeywordUsers._2.iterator(); // 去重 并統(tǒng)計(jì)去重后的數(shù)量 List<String> distinctUsers = new ArrayList<String>(); while (users.hasNext()) {String user = users.next(); if (!distinctUsers.contains(user)) {distinctUsers.add(user); }} // 獲取uv long uv = distinctUsers.size(); // 日期_搜索詞,用戶個(gè)數(shù) return new Tuple2<String, Long>(dateKeyword, uv); }}); List<Tuple2<String, Long>> collect2 = dateKeywordUvRDD.collect(); for (Tuple2<String, Long> stringLongTuple2 : collect2) {System.out.println("對(duì)每天每個(gè)搜索詞的搜索用戶 去重操作 獲得前uv"); System.out.println(stringLongTuple2); }// 將每天每個(gè)搜索詞的uv數(shù)據(jù),轉(zhuǎn)換成DataFrame JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function<Tuple2<String, Long>, Row>() {@Override public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {String date = dateKeywordUv._1.split("_")[0]; String keyword = dateKeywordUv._1.split("_")[1]; long uv = dateKeywordUv._2; return RowFactory.create(date, keyword, uv); }}); ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("date", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType); dateKeywordUvDF.registerTempTable("sales"); // 使用開窗函數(shù),統(tǒng)計(jì)每天搜索uv排名前三的熱點(diǎn)搜索詞 // 日期 搜索詞 人數(shù)個(gè)數(shù) 前三名 final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3"); // 將DataFrame轉(zhuǎn)換為RDD, 映射, JavaRDD<Row> dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD(); JavaPairRDD<String, String> dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction<Row, String, String>() {@Override public Tuple2<String, String> call(Row row) throws Exception {String date = String.valueOf(row.get(0)); String keyword = String.valueOf(row.get(1)); String uv = String.valueOf(row.get(2)); // 映射為 日期 搜索詞_總個(gè)數(shù) return new Tuple2<String, String>(date, keyword + "_" + uv); }}); List<Tuple2<String, String>> collect = dailyTop3KeywordRDD.collect(); for (Tuple2<String, String> stringStringTuple2 : collect) {System.out.println("開窗函數(shù)操作"); System.out.println(stringStringTuple2); }// 根據(jù) 日期分組 JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey(); // 進(jìn)行映射 JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {@Override public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {String date = tuple._1; // 搜索詞_總個(gè)數(shù) 集合 Iterator<String> KeyWordUviterator = tuple._2.iterator(); long totalUv = 0L; String dateKeyword = date; while (KeyWordUviterator.hasNext()) { // 搜索詞_個(gè)數(shù) String keywoarUv = KeyWordUviterator.next(); Long uv = Long.valueOf(keywoarUv.split("_")[1]); totalUv += uv; dateKeyword = dateKeyword + "," + keywoarUv; }return new Tuple2<Long, String>(totalUv, dateKeyword); }}); JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false); List<Tuple2<Long, String>> rows = sortedUvDateKeywordsRDD.collect(); for (Tuple2<Long, String> row : rows) {System.out.println(row._2 + " " + row._1); }// 映射 JavaRDD<Row> resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long, String>, Row>() {@Override public Iterable<Row> call(Tuple2<Long, String> tuple) throws Exception {String dateKeywords = tuple._2; String[] dateKeywordsSplit = dateKeywords.split(","); String date = dateKeywordsSplit[0]; ArrayList<Row> rows = new ArrayList<Row>(); rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0], Long.valueOf(dateKeywordsSplit[1].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0], Long.valueOf(dateKeywordsSplit[2].split("_")[1]))); rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0], Long.valueOf(dateKeywordsSplit[3].split("_")[1]))); return rows; }}); DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType); List<Row> rows1 = finalDF.javaRDD().collect(); for (Row row : rows1) {System.out.println(row); }jsc.stop(); } }
注意點(diǎn):
1、如果文本案例使用的是txt編輯,將文本保存ANSI格式,否則在groupByKey的時(shí)候,第一行默認(rèn)會(huì)出現(xiàn)一個(gè)空格,分組失敗。最開始使用的是UTF-8格式
2、文本的最后禁止出現(xiàn)空行,否則在split的時(shí)候會(huì)報(bào)錯(cuò),出現(xiàn)數(shù)組越界的錯(cuò)誤。
3、使用到窗口函數(shù)的時(shí)候,必須使用到HiveContext方法,HiveContext使用到的是SparkContext,使用使用jsc.sc()
運(yùn)行結(jié)果:
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo]
(2018-10-2_meat,[leo])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo, leo1, leo2, jack]
(2018-10-2_water,[leo, leo1, leo2, jack])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo, leo1]
(2018-10-1_food,[leo, leo1])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo1, leo2, leo3]
(2018-10-2_seafood,[leo1, leo2, leo3])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo, leo1, leo2, jack, jack1]
(2018-10-1_water,[leo, leo1, leo2, jack, jack1])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo2]
(2018-10-1_meat,[leo2])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo, leo1, leo2]
(2018-10-1_seafood,[leo, leo1, leo2])
進(jìn)行分組,獲取每天每個(gè)搜索詞,有哪些用戶搜索了(沒(méi)有去重)[leo1, leo2]
(2018-10-2_food,[leo1, leo2])
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-2_meat,1)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-2_water,4)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-1_food,2)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-2_seafood,3)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-1_water,5)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-1_meat,1)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-1_seafood,3)
對(duì)每天每個(gè)搜索詞的搜索用戶? 去重操作? 獲得前uv
(2018-10-2_food,2)
窗函數(shù)操作
(2018-10-1,water_5)
開窗函數(shù)操作
(2018-10-1,seafood_3)
開窗函數(shù)操作
(2018-10-1,food_2)
開窗函數(shù)操作
(2018-10-2,water_4)
開窗函數(shù)操作
(2018-10-2,seafood_3)
開窗函數(shù)操作
(2018-10-2,food_2)
最終結(jié)果
[2018-10-1,water,5]
[2018-10-1,seafood,3]
[2018-10-1,food,2]
[2018-10-2,seafood,3]
[2018-10-2,food,2]
總結(jié)
以上是生活随笔為你收集整理的每日top3热点搜索词统计案例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 蒸花卷多长时间熟 蒸花卷需要的时间
- 下一篇: Linux中安装nc(netcat)常见