spark項目實戰:電商分析平臺之各個范圍Session步長、訪問時長占比統計(需求一)
項目基本信息,架構,需要一覽各個范圍Session步長、訪問時長占比統計概述各個范圍Session步長、訪問時長占比統計簡要運行流程代碼實現小結
1. 項目基本信息,架構,需要一覽
見項目第一篇文章 spark項目實戰:電商分析平臺之項目概述
代碼在github上
初始代碼: https://github.com/githubIMrLi/spark-commerce_basic
完整代碼:https://github.com/githubIMrLi/spark-commerce
2. 各個范圍Session步長、訪問時長占比統計概述
訪問時長:session的最早時間與最晚時間之差。訪問步長:session中的action個數。統計出符合篩選條件的session中,訪問時長在1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m,訪問步長在1_3、4_6、…以上各個范圍內的各種session的占比
3. 各個范圍Session步長、訪問時長占比統計簡要運行流程
4. 代碼實現
先運行 commerce_basic\mock\src\main\scala\MockDataGenerate.scala 代碼,生成所需表
各個范圍Session步長、訪問時長占比統計代碼
import java
.util
.{Date
, UUID
}import commons
.conf
.ConfigurationManager
import commons
.constant
.Constants
import commons
.model
.{UserInfo
, UserVisitAction
}
import commons
.utils
._
import net
.sf
.json
.JSONObject
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.sql
.{SaveMode
, SparkSession
}import scala
.collection
.mutableobject SessionStatisticAgg
{def
main(args
: Array
[String
]): Unit
= {val jsonStr
= ConfigurationManager
.config
.getString(Constants
.TASK_PARAMS
)val taskParam
= JSONObject
.fromObject(jsonStr
)val taskUUID
= UUID
.randomUUID().toStringval sparkConf
= new SparkConf().setAppName("session").setMaster("local[*]")val sparkSession
= SparkSession
.builder().config(sparkConf
).enableHiveSupport().getOrCreate()val actionRDD
= getActionRDD(sparkSession
, taskParam
)val sessionId2ActionRDD
= actionRDD
.map
{item
=> (item
.session_id
, item
)}val sessionId2GroupRDD
= sessionId2ActionRDD
.groupByKey()sessionId2GroupRDD
.cache()sessionId2GroupRDD
.foreach(println(_
))val sessionId2FullInfoRDD
= getFullInfoData(sparkSession
, sessionId2GroupRDD
)val sessionStatAccumulator
= new SessionStatAccumulatorsparkSession
.sparkContext
.register(sessionStatAccumulator
, "sessionAccumulator")val sessionId2FilterRDD
= getFilteredData(taskParam
, sessionStatAccumulator
, sessionId2FullInfoRDD
)sessionId2FilterRDD
.foreach(println(_
))for ((k
,v
) <- sessionStatAccumulator
.value
){println("k="+k
+", value="+v
)}getFinalData(sparkSession
,taskUUID
,sessionStatAccumulator
.value
)}def
getFinalData(sparkSession
: SparkSession
,taskUUID
: String
,value
: mutable
.HashMap
[String
, Int
]) = {val session_count
= value
.getOrElse(Constants
.SESSION_COUNT
,1).toDoubleval visit_length_1s_3s
= value
.getOrElse(Constants
.TIME_PERIOD_1s_3s
,0)val visit_length_4s_6s
= value
.getOrElse(Constants
.TIME_PERIOD_4s_6s
, 0)val visit_length_7s_9s
= value
.getOrElse(Constants
.TIME_PERIOD_7s_9s
, 0)val visit_length_10s_30s
= value
.getOrElse(Constants
.TIME_PERIOD_10s_30s
, 0)val visit_length_30s_60s
= value
.getOrElse(Constants
.TIME_PERIOD_30s_60s
, 0)val visit_length_1m_3m
= value
.getOrElse(Constants
.TIME_PERIOD_1m_3m
, 0)val visit_length_3m_10m
= value
.getOrElse(Constants
.TIME_PERIOD_3m_10m
, 0)val visit_length_10m_30m
= value
.getOrElse(Constants
.TIME_PERIOD_10m_30m
, 0)val visit_length_30m
= value
.getOrElse(Constants
.TIME_PERIOD_30m
, 0)val step_length_1_3
= value
.getOrElse(Constants
.STEP_PERIOD_1_3
, 0)val step_length_4_6
= value
.getOrElse(Constants
.STEP_PERIOD_4_6
, 0)val step_length_7_9
= value
.getOrElse(Constants
.STEP_PERIOD_7_9
, 0)val step_length_10_30
= value
.getOrElse(Constants
.STEP_PERIOD_10_30
, 0)val step_length_30_60
= value
.getOrElse(Constants
.STEP_PERIOD_30_60
, 0)val step_length_60
= value
.getOrElse(Constants
.STEP_PERIOD_60
, 0)val visit_length_1s_3s_ratio
= NumberUtils
.formatDouble(visit_length_1s_3s
/ session_count
, 2)val visit_length_4s_6s_ratio
= NumberUtils
.formatDouble(visit_length_4s_6s
/ session_count
, 2)val visit_length_7s_9s_ratio
= NumberUtils
.formatDouble(visit_length_7s_9s
/ session_count
, 2)val visit_length_10s_30s_ratio
= NumberUtils
.formatDouble(visit_length_10s_30s
/ session_count
, 2)val visit_length_30s_60s_ratio
= NumberUtils
.formatDouble(visit_length_30s_60s
/ session_count
, 2)val visit_length_1m_3m_ratio
= NumberUtils
.formatDouble(visit_length_1m_3m
/ session_count
, 2)val visit_length_3m_10m_ratio
= NumberUtils
.formatDouble(visit_length_3m_10m
/ session_count
, 2)val visit_length_10m_30m_ratio
= NumberUtils
.formatDouble(visit_length_10m_30m
/ session_count
, 2)val visit_length_30m_ratio
= NumberUtils
.formatDouble(visit_length_30m
/ session_count
, 2)val step_length_1_3_ratio
= NumberUtils
.formatDouble(step_length_1_3
/ session_count
, 2)val step_length_4_6_ratio
= NumberUtils
.formatDouble(step_length_4_6
/ session_count
, 2)val step_length_7_9_ratio
= NumberUtils
.formatDouble(step_length_7_9
/ session_count
, 2)val step_length_10_30_ratio
= NumberUtils
.formatDouble(step_length_10_30
/ session_count
, 2)val step_length_30_60_ratio
= NumberUtils
.formatDouble(step_length_30_60
/ session_count
, 2)val step_length_60_ratio
= NumberUtils
.formatDouble(step_length_60
/ session_count
, 2)val stat
= SessionAggrStat(taskUUID
, session_count
.toInt
, visit_length_1s_3s_ratio
, visit_length_4s_6s_ratio
, visit_length_7s_9s_ratio
,visit_length_10s_30s_ratio
, visit_length_30s_60s_ratio
, visit_length_1m_3m_ratio
,visit_length_3m_10m_ratio
, visit_length_10m_30m_ratio
, visit_length_30m_ratio
,step_length_1_3_ratio
, step_length_4_6_ratio
, step_length_7_9_ratio
,step_length_10_30_ratio
, step_length_30_60_ratio
, step_length_60_ratio
)val statRDD
= sparkSession
.sparkContext
.makeRDD(Array(stat
))import sparkSession
.implicits
._statRDD
.toDF().write
.format("jdbc").option("url",ConfigurationManager
.config
.getString(Constants
.JDBC_URL
)).option("user",ConfigurationManager
.config
.getString(Constants
.JDBC_USER
)).option("password",ConfigurationManager
.config
.getString(Constants
.JDBC_PASSWORD
)).option("dbtable","session_ration_0308").mode(SaveMode
.Append
).save()}def
calculateVisitLength(visitLength
: Long
, sessionStatisticAccumulator
: SessionStatAccumulator
) = {if (visitLength
>= 1 && visitLength
<= 3) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_1s_3s
)} else if (visitLength
>= 4 && visitLength
<= 6) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_4s_6s
)} else if (visitLength
>= 7 && visitLength
<= 9) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_7s_9s
)} else if (visitLength
>= 10 && visitLength
<= 30) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_10s_30s
)} else if (visitLength
> 30 && visitLength
<= 60) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_30s_60s
)} else if (visitLength
> 60 && visitLength
<= 180) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_1m_3m
)} else if (visitLength
> 180 && visitLength
<= 600) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_3m_10m
)} else if (visitLength
> 600 && visitLength
<= 1800) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_10m_30m
)} else if (visitLength
> 1800) {sessionStatisticAccumulator
.add(Constants
.TIME_PERIOD_30m
)}}def
calculateStepLength(stepLength
: Long
, sessionStatisticAccumulator
: SessionStatAccumulator
): Unit
= {if (stepLength
>= 1 && stepLength
<= 3) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_1_3
)} else if (stepLength
>= 4 && stepLength
<= 6) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_4_6
)} else if (stepLength
>= 7 && stepLength
<= 9) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_7_9
)} else if (stepLength
>= 10 && stepLength
<= 30) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_10_30
)} else if (stepLength
> 30 && stepLength
<= 60) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_30_60
)} else if (stepLength
> 60) {sessionStatisticAccumulator
.add(Constants
.STEP_PERIOD_60
)}}def
getFilteredData(taskParam
: JSONObject
,sessionStatAccumulator
: SessionStatAccumulator
,sessionId2FullInfoRDD
: RDD
[(String
, String
)]) = {val startAge
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_START_AGE
)val endAge
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_END_AGE
)val professionals
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_PROFESSIONALS
)val cities
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_CITIES
)val sex
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_SEX
)val keywords
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_KEYWORDS
)val categoryIds
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_CATEGORY_IDS
)var filterInfo
= (if (startAge
!= null
) Constants
.PARAM_START_AGE
+ "=" + startAge
+ "|" else "") +(if (endAge
!= null
) Constants
.PARAM_END_AGE
+ "=" + endAge
+ "|" else "") +(if (professionals
!= null
) Constants
.PARAM_PROFESSIONALS
+ "=" + professionals
+ "|" else "") +(if (cities
!= null
) Constants
.PARAM_CITIES
+ "=" + cities
+ "|" else "") +(if (sex
!= null
) Constants
.PARAM_SEX
+ "=" + sex
+ "|" else "") +(if (keywords
!= null
) Constants
.PARAM_KEYWORDS
+ "=" + keywords
+ "|" else "") +(if (categoryIds
!= null
) Constants
.PARAM_CATEGORY_IDS
+ "=" + categoryIds
else "")if (filterInfo
.endsWith("\\|"))filterInfo
= filterInfo
.substring(0, filterInfo
.length
- 1)val sessionId2FilterRDD
= sessionId2FullInfoRDD
.filter
{case (sessionId
, fullInfo
) => {var success
= trueif (!ValidUtils
.between(fullInfo
, Constants
.FIELD_AGE
, filterInfo
, Constants
.PARAM_START_AGE
, Constants
.PARAM_END_AGE
))success
= falseif (!ValidUtils
.in(fullInfo
, Constants
.FIELD_PROFESSIONAL
, filterInfo
, Constants
.PARAM_PROFESSIONALS
))success
= falseif (!ValidUtils
.in(fullInfo
, Constants
.FIELD_CITY
, filterInfo
, Constants
.PARAM_CITIES
))success
= falseif (!ValidUtils
.equal(fullInfo
, Constants
.FIELD_SEX
, filterInfo
, Constants
.PARAM_SEX
))success
= falseif (!ValidUtils
.in(fullInfo
, Constants
.FIELD_SEARCH_KEYWORDS
, filterInfo
, Constants
.PARAM_KEYWORDS
))success
= falseif (!ValidUtils
.in(fullInfo
, Constants
.FIELD_CATEGORY_ID
, filterInfo
, Constants
.PARAM_CATEGORY_IDS
))success
= falseif (success
) {sessionStatAccumulator
.add(Constants
.SESSION_COUNT
)val visitLength
= StringUtils
.getFieldFromConcatString(fullInfo
, "\\|", Constants
.FIELD_VISIT_LENGTH
).toLongval stepLength
= StringUtils
.getFieldFromConcatString(fullInfo
, "\\|", Constants
.FIELD_STEP_LENGTH
).toLong
calculateVisitLength(visitLength
, sessionStatAccumulator
)calculateStepLength(stepLength
, sessionStatAccumulator
)}success
}}sessionId2FilterRDD
}def
getFullInfoData(sparkSession
: SparkSession
, sessionId2GroupRDD
: RDD
[(String
, Iterable
[UserVisitAction
])]) = {val userId2AggrInfoRDD
= sessionId2GroupRDD
.map
{case (sid
, iterableAction
) => {var startTime
: Date
= nullvar endTime
: Date
= nullvar userId
= -1Lval searchKeywords
= new StringBuffer("")val clickCategories
= new StringBuilder("")var stepLength
= 0for (action
<- iterableAction
) {if (userId
== -1) {userId
= action
.user_id
}val actionTime
= DateUtils
.parseTime(action
.action_time
)if (startTime
== null
|| startTime
.after(actionTime
))startTime
= actionTime
if (endTime
== null
|| endTime
.before(actionTime
))endTime
= actionTimeval searchKeyword
= action
.search_keywordval clickCategory
= action
.click_category_id
if (StringUtils
.isNotEmpty(searchKeyword
) &&!searchKeywords
.toString
.contains(searchKeyword
))searchKeywords
.append(searchKeyword
+ ",")if (clickCategory
!= -1 && !clickCategories
.toString().contains(clickCategory
))clickCategories
.append(clickCategory
+ ",")stepLength
+= 1}val searchKw
= StringUtils
.trimComma(searchKeywords
.toString
)val clickCg
= StringUtils
.trimComma(clickCategories
.toString())val visitLength
= (endTime
.getTime
- startTime
.getTime
) / 1000val aggrInfo
= Constants
.FIELD_SESSION_ID
+ "=" + sid
+ "|" +Constants
.FIELD_SEARCH_KEYWORDS
+ "=" + searchKw
+ "|" +Constants
.FIELD_CLICK_CATEGORY_IDS
+ "=" + clickCg
+ "|" +Constants
.FIELD_VISIT_LENGTH
+ "=" + visitLength
+ "|" +Constants
.FIELD_STEP_LENGTH
+ "=" + stepLength
+ "|" +Constants
.FIELD_START_TIME
+ "=" + DateUtils
.formatTime(startTime
)(userId
, aggrInfo
)}}val sql
= "select * from user_info"import sparkSession
.implicits
._val userInfoRDD
= sparkSession
.sql(sql
).as
[UserInfo
].rdd
.map(item
=> (item
.user_id
, item
))userId2AggrInfoRDD
.join(userInfoRDD
).map
{case (userId
, (aggrInfo
, userInfo
)) => {val age
= userInfo
.ageval professional
= userInfo
.professionalval sex
= userInfo
.sexval city
= userInfo
.cityval fullInfo
= aggrInfo
+ "|" + Constants
.FIELD_AGE
+ "=" + age
+ "|" +Constants
.FIELD_PROFESSIONAL
+ "=" + professional
+ "|" +Constants
.FIELD_SEX
+ "=" + sex
+ "|" +Constants
.FIELD_CITY
+ "=" + cityval sessionId
= StringUtils
.getFieldFromConcatString(aggrInfo
, "\\|", Constants
.FIELD_SESSION_ID
)(sessionId
, fullInfo
)}}}def
getActionRDD(sparkSession
: SparkSession
, taskParam
: JSONObject
) = {val startDate
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_START_DATE
)val endDate
= ParamUtils
.getParam(taskParam
, Constants
.PARAM_END_DATE
)val sql
= "select * from user_visit_action where date>='" + startDate
+"' and date<='" + endDate
+ "'"import sparkSession
.implicits
._sparkSession
.sql(sql
).as
[UserVisitAction
].rdd
}
}
自定義累加器代碼:
import org
.apache
.spark
.util
.AccumulatorV2
import scala
.collection
.mutable
class SessionStatAccumulator extends AccumulatorV2[String
, mutable
.HashMap
[String
, Int
]] {val countMap
= new mutable.HashMap[String
, Int
]()override def isZero
: Boolean
= countMap
.isEmptyoverride def
copy(): AccumulatorV2
[String
, mutable
.HashMap
[String
, Int
]] = {val acc
= new SessionStatAccumulatoracc
.countMap
++= this.countMapacc
}override def
reset(): Unit
= {countMap
.clear()}override def
add(v
: String
): Unit
= {if (!countMap
.contains(v
)) {countMap
+= (v
-> 0) }countMap
.update(v
, countMap(v
) + 1)}override def
merge(other
: AccumulatorV2
[String
, mutable
.HashMap
[String
, Int
]]): Unit
= {other match
{case acc
: SessionStatAccumulator
=>acc
.countMap
.foldLeft(this.countMap
) {case (map
, (k
, v
)) => map
+= (k
-> (map
.getOrElse(k
, 0) + v
))}}}override def value
: mutable
.HashMap
[String
, Int
] = {this.countMap
}
}
結果存儲到mysql里面,結果如下
5. 小結
總結
以上是生活随笔為你收集整理的spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。