2021年大数据Spark(二十九):SparkSQL案例四开窗函数
目錄
?
案例四:開窗函數
概述
介紹
聚合函數和開窗函數
開窗函數分類
???????聚合開窗函數
排序開窗函數
?ROW_NUMBER順序排序
???????RANK跳躍排序
????????DENSE_RANK連續排序
???????NTILE分組排名[了解]
???????代碼演示
???????案例四:開窗函數
概述
https://www.cnblogs.com/qiuting/p/7880500.html
介紹
開窗函數的引入是為了既顯示聚集前的數據,又顯示聚集后的數據。即在每一行的最后一列添加聚合函數的結果。
開窗用于為行定義一個窗口(這里的窗口是指運算將要操作的行的集合),它對一組值進行操作,不需要使用 GROUP BY 子句對數據進行分組,能夠在同一行中同時返回基礎行的列和聚合列。
?
聚合函數和開窗函數
聚合函數是將多行變成一行,count,avg....
開窗函數是將一行變成多行;
聚合函數如果要顯示其他的列必須將列加入到group by中
開窗函數可以不使用group by,直接將所有信息顯示出來
?
開窗函數分類
1.聚合開窗函數
聚合函數(列) OVER(選項),這里的選項可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
2.排序開窗函數
排序函數(列) OVER(選項),這里的選項可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。
?
???????聚合開窗函數
示例1
OVER 關鍵字表示把聚合函數當成聚合開窗函數而不是聚合函數。
SQL標準允許將所有聚合函數用做聚合開窗函數。
spark.sql("select ?count(name) ?from scores").show
spark.sql("select name, class, score, count(name) over()?name_count from scores").show
?
查詢結果如下所示:
+----+-----+-----+----------+ ??????????????????????????????????????????????????
|name|class|score|name_count|
+----+-----+-----+----------+
| ?a1| ???1| ??80| ???????11|
| ?a2| ???1| ??78| ???????11|
| ?a3| ???1| ??95| ???????11|
| ?a4| ???2| ??74| ???????11|
| ?a5| ???2| ??92| ???????11|
| ?a6| ???3| ??99| ???????11|
| ?a7| ???3| ??99| ???????11|
| ?a8| ???3| ??45| ???????11|
| ?a9| ???3| ??55| ???????11|
| a10| ???3| ??78| ???????11|
| a11| ???3| ?100| ???????11|
+----+-----+-----+----------+
?示例2
OVER 關鍵字后的括號中還可以添加選項用以改變進行聚合運算的窗口范圍。
如果 OVER 關鍵字后的括號中的選項為空,則開窗函數會對結果集中的所有行進行聚合運算。
開窗函數的 OVER 關鍵字后括號中的可以使用 PARTITION BY 子句來定義行的分區來供進行聚合計算。
與 GROUP BY 子句不同,PARTITION BY 子句創建的分區是獨立于結果集的,創建的分區只是供進行聚合計算的,而且不同的開窗函數所創建的分區也不互相影響。
下面的 SQL 語句用于顯示按照班級分組后每組的人數:
OVER(PARTITION BY class)表示對結果集按照 class 進行分區,并且計算當前行所屬的組的聚合計算結果。
spark.sql("select name, class, score, count(name)?over(partition by class) name_count from scores").show
查詢結果如下所示:
+----+-----+-----+----------+ ??????????????????????????????????????????????????
|name|class|score|name_count|
+----+-----+-----+----------+
| ?a1| ???1| ??80| ????????3|
| ?a2| ???1| ??78| ????????3|
| ?a3| ???1| ??95| ????????3|
| ?a6| ???3| ??99| ????????6|
| ?a7| ???3| ??99| ????????6|
| ?a8| ???3| ??45| ????????6|
| ?a9| ???3| ??55| ????????6|
| a10| ???3| ??78| ????????6|
| a11| ???3| ?100| ????????6|
| ?a4| ???2| ??74| ????????2|
| ?a5| ???2| ??92| ????????2|
+----+-----+-----+----------+
?
排序開窗函數
?ROW_NUMBER順序排序
row_number() over(order by score) as rownum 表示按score 升序的方式來排序,并得出排序結果的序號
注意:
在排序開窗函數中使用 PARTITION ?BY 子句需要放置在ORDER ?BY 子句之前。
?●示例1
spark.sql("select name, class, score, row_number()?over(order by score)?rank?from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| ?a4| ???2| ??74| ??3|
| ?a2| ???1| ??78| ??4|
| a10| ???3| ??78| ??5|
| ?a1| ???1| ??80| ??6|
| ?a5| ???2| ??92| ??7|
| ?a3| ???1| ??95| ??8|
| ?a6| ???3| ??99| ??9|
| ?a7| ???3| ??99| ?10|
| a11| ???3| ?100| ?11|
+----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+ ????????????????????????????????????????????????????????
|name|class|score|rank|
+----+-----+-----+----+
| ?a2| ???1| ??78| ??1|
| ?a1| ???1| ??80| ??2|
| ?a3| ???1| ??95| ??3|
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| a10| ???3| ??78| ??3|
| ?a6| ???3| ??99| ??4|
| ?a7| ???3| ??99| ??5|
| a11| ???3| ?100| ??6|
| ?a4| ???2| ??74| ??1|
| ?a5| ???2| ??92| ??2|
+----+-----+-----+----+
?
???????RANK跳躍排序
rank() over(order by score) as rank表示按 score升序的方式來排序,并得出排序結果的排名號。
這個函數求出來的排名結果可以并列,并列排名之后的排名將是并列的排名加上并列數
簡單說每個人只有一種排名,然后出現兩個并列第一名的情況,這時候排在兩個第一名后面的人將是第三名,也就是沒有了第二名,但是有兩個第一名
●示例2
spark.sql("select name, class, score, rank() over(order by score) rank?from scores").show() ????????????????????????????????????????????????????
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| ?a4| ???2| ??74| ??3|
| a10| ???3| ??78| ??4|
| ?a2| ???1| ??78| ??4|
| ?a1| ???1| ??80| ??6|
| ?a5| ???2| ??92| ??7|
| ?a3| ???1| ??95| ??8|
| ?a6| ???3| ??99| ??9|
| ?a7| ???3| ??99| ??9|
| a11| ???3| ?100| ?11|
+----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+ ????????????????????????????????????????????????????????
|name|class|score|rank|
+----+-----+-----+----+
| ?a2| ???1| ??78| ??1|
| ?a1| ???1| ??80| ??2|
| ?a3| ???1| ??95| ??3|
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| a10| ???3| ??78| ??3|
| ?a6| ???3| ??99| ??4|
| ?a7| ???3| ??99| ??4|
| a11| ???3| ?100| ??6|
| ?a4| ???2| ??74| ??1|
| ?a5| ???2| ??92| ??2|
+----+-----+-----+----+
?
????????DENSE_RANK連續排序
dense_rank() over(order by ?score) as ?dense_rank 表示按score 升序的方式來排序,并得出排序結果的排名號。
這個函數并列排名之后的排名只是并列排名加1
簡單說每個人只有一種排名,然后出現兩個并列第一名的情況,這時候排在兩個第一名后面的人將是第二名,也就是兩個第一名,一個第二名
●示例3
spark.sql("select name, class, score,?dense_rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| ?a4| ???2| ??74| ??3|
| ?a2| ???1| ??78| ??4|
| a10| ???3| ??78| ??4|
| ?a1| ???1| ??80| ??5|
| ?a5| ???2| ??92| ??6|
| ?a3| ???1| ??95| ??7|
| ?a6| ???3| ??99| ??8|
| ?a7| ???3| ??99| ??8|
| a11| ???3| ?100| ??9|
+----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank?from scores").show()
+----+-----+-----+----+ ????????????????????????????????????????????????????????
|name|class|score|rank|
+----+-----+-----+----+
| ?a2| ???1| ??78| ??1|
| ?a1| ???1| ??80| ??2|
| ?a3| ???1| ??95| ??3|
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| a10| ???3| ??78| ??3|
| ?a6| ???3| ??99| ??4|
| ?a7| ???3| ??99| ??4|
| a11| ???3| ?100| ??5|
| ?a4| ???2| ??74| ??1|
| ?a5| ???2| ??92| ??2|
+----+-----+-----+----+
?
???????NTILE分組排名[了解]
ntile(6) over(order by score)as ntile表示按 score 升序的方式來排序,然后 6 等分成 6 個組,并顯示所在組的序號。
?示例4
spark.sql("select name, class, score, ntile(6) over(order by score) rank?from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??1|
| ?a4| ???2| ??74| ??2|
| ?a2| ???1| ??78| ??2|
| a10| ???3| ??78| ??3|
| ?a1| ???1| ??80| ??3|
| ?a5| ???2| ??92| ??4|
| ?a3| ???1| ??95| ??4|
| ?a6| ???3| ??99| ??5|
| ?a7| ???3| ??99| ??5|
| a11| ???3| ?100| ??6|
+----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score)?rank from scores").show()
+----+-----+-----+----+ ????????????????????????????????????????????????????????
|name|class|score|rank|
+----+-----+-----+----+
| ?a2| ???1| ??78| ??1|
| ?a1| ???1| ??80| ??2|
| ?a3| ???1| ??95| ??3|
| ?a8| ???3| ??45| ??1|
| ?a9| ???3| ??55| ??2|
| a10| ???3| ??78| ??3|
| ?a6| ???3| ??99| ??4|
| ?a7| ???3| ??99| ??5|
| a11| ???3| ?100| ??6|
| ?a4| ???2| ??74| ??1|
| ?a5| ???2| ??92| ??2|
+----+-----+-----+----+
?
???????代碼演示
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** Author itcast* Date 2020/9/21 9:33* Desc 使用SparkSQL支持的開窗函數/窗口函數完成對各個班級的學生成績的排名*/
object RowNumberDemo {case class Score(name: String, clazz: Int, score: Int)def main(args: Array[String]): Unit = {//1.準備環境val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.加載數據val scoreDF: DataFrame = sc.makeRDD(Array(Score("a1", 1, 80),Score("a2", 1, 78),Score("a3", 1, 95),Score("a4", 2, 74),Score("a5", 2, 92),Score("a6", 3, 99),Score("a7", 3, 99),Score("a8", 3, 45),Score("a9", 3, 55),Score("a10", 3, 78),Score("a11", 3, 100))).toDF("name", "class", "score")scoreDF.createOrReplaceTempView("t_scores")scoreDF.show()/*+----+-----+-----+|name|class|score|num+----+-----+-----+| ?a1| ???1| ??80|| ?a2| ???1| ??78|| ?a3| ???1| ??95|| ?a4| ???2| ??74|| ?a5| ???2| ??92|| ?a6| ???3| ??99|| ?a7| ???3| ??99|| ?a8| ???3| ??45|| ?a9| ???3| ??55|| a10| ???3| ??78|| a11| ???3| ?100|+----+-----+-----+*///使用ROW_NUMBER順序排序spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()//使用RANK跳躍排序spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()//使用DENSE_RANK連續排序spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()/*
ROW_NUMBER順序排序--1234
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| ?a2| ???1| ??78| ?1|
| ?a1| ???1| ??80| ?2|
| ?a3| ???1| ??95| ?3|
| ?a8| ???3| ??45| ?1|
| ?a9| ???3| ??55| ?2|| a10| ???3| ??78| ?3|
| ?a6| ???3| ??99| ?4|
| ?a7| ???3| ??99| ?5|
| a11| ???3| ?100| ?6|| ?a4| ???2| ??74| ?1|
| ?a5| ???2| ??92| ?2|
+----+-----+-----+---+使用RANK跳躍排序--1224
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| ?a2| ???1| ??78| ?1|
| ?a1| ???1| ??80| ?2|
| ?a3| ???1| ??95| ?3|
| ?a8| ???3| ??45| ?1|
| ?a9| ???3| ??55| ?2|| a10| ???3| ??78| ?3|
| ?a6| ???3| ??99| ?4|
| ?a7| ???3| ??99| ?4|
| a11| ???3| ?100| ?6|| ?a4| ???2| ??74| ?1|
| ?a5| ???2| ??92| ?2|
+----+-----+-----+---+DENSE_RANK連續排序--1223
+----+-----+-----+---+
|name|class|score|num|
+----+-----+-----+---+
| ?a2| ???1| ??78| ?1|
| ?a1| ???1| ??80| ?2|
| ?a3| ???1| ??95| ?3|
| ?a8| ???3| ??45| ?1|
| ?a9| ???3| ??55| ?2|| a10| ???3| ??78| ?3|
| ?a6| ???3| ??99| ?4|
| ?a7| ???3| ??99| ?4|
| a11| ???3| ?100| ?5|| ?a4| ???2| ??74| ?1|
| ?a5| ???2| ??92| ?2|
+----+-----+-----+---+*//*val sql ="""|select 字段1,字段2,字段n,|row_number() over(partition by 字段1 order by 字段2 desc) num|from 表名|having num <= 3|""".stripMarginimport org.apache.spark.sql.functions._df.withColumn("num",row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))).filter('num <= 3).show(false)*/}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(二十九):SparkSQL案例四开窗函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十四):S
- 下一篇: 2021年大数据Spark(四十五):S