JAVA hbase groupby_window操作和groupBy操作
window操作
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
window操作是連續流特有的操作,設置時間窗口大小,根據窗口大小來執行groupBy操作等。
看看dataset上的groupBy操作。
groupBy操作
定義:
def groupBy(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)
}
生成新的RelationalGroupedDataset對象。該對象最重要得方法:
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs ++ aggExprs
} else {
aggExprs
}
val aliasedAgg = aggregates.map(alias)
groupType match {
case RelationalGroupedDataset.GroupByType =>
Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.RollupType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.CubeType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.ofRows(
df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))
}
}
我們就看一個吧:
Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
看看它的實現機制是怎樣得?
這里得Aggregate是一種LogicPlan,我們只要看看Aggregate的實現機制就可以了。
Aggregate的實現機制就要涉及到catalyst包里的相關類了。
總結
以上是生活随笔為你收集整理的JAVA hbase groupby_window操作和groupBy操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求顾开头的成语接龙!
- 下一篇: 雪花秀水乳套装有几种?