Spark的Dataset操作
列的選擇select
來個例子邊看邊說:
scala> val df = spark.createDataset(Seq(
("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6))
).toDF("key1","key2","key3")
df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]
scala> df.printSchema
root
|-- key1: string (nullable = true)
|-- key2: integer (nullable = false)
|-- key3: integer (nullable = false)
scala> df.collect
res34: Array[org.apache.spark.sql.Row] = Array([aaa,1,2], [bbb,3,4], [ccc,3,5], [bbb,4,6])
上面的代碼創(chuàng)建了一個DataFrame,有三列,列名分別是key1, key2, key3, 類型對應string, integer, integer。
當前造了4條記錄,如上所示。
?
?
接下來看看選擇列的幾種調(diào)用方式:
?
scala> df.select("key1").collect
res49: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])
scala> df.select($"key1").collect
res50: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])
scala> df.select(df.col("key1")).collect
res51: Array[org.apache.spark.sql.Row] = Array([aaa], [bbb], [ccc], [bbb])
select方法中參數(shù)直接用字符串只能用DataFrame中的命名字段名。不能對字段名再使用像SQL的select語法的表達式。但是$"key1"這種寫法對應的是select方法的Column參數(shù)類型重載,可以支持sql的select語法了:
?
?
scala> df.select(upper($"key1")).collect
res58: Array[org.apache.spark.sql.Row] = Array([AAA], [BBB], [CCC], [BBB])
scala> df.select(upper("key1")).collect
<console>:27: error: type mismatch;
found : String("key1")
required: org.apache.spark.sql.Column
df.select(upper("key1")).collect
上面在select中對字段key1調(diào)用了upper函數(shù)轉(zhuǎn)換大小寫,注意"key1"前面有個$符號,這個是scala最喜歡搞的語法糖,了解下寫代碼會很方便。而下面沒有加$符號在key1前面時就報錯了,提示需要的是Column,而當前給的則是個String類型。
?
?
這時候的select也可以用selectExtr方法替換。比如下面的調(diào)用:
?
scala> df.selectExpr("upper(key1)", "key2 as haha2").show
+-----------+-----+
|upper(key1)|haha2|
+-----------+-----+
| AAA| 1|
| BBB| 3|
| CCC| 3|
| BBB| 4|
+-----------+-----+
key1字段調(diào)用了變大寫的函數(shù),而key2字段改了別名haha2, ok, 一切順利!
?
Where部分可以用filter函數(shù)和where函數(shù)。這倆函數(shù)的用法是一樣的,官網(wǎng)文檔里都說where是filter的別名。
數(shù)據(jù)還是用上一篇里造的那個dataset:
scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
filter函數(shù)
從Spark官網(wǎng)的文檔中看到,filter函數(shù)有下面幾種形式:
def filter(func: (T) ? Boolean): Dataset[T] def filter(conditionExpr: String): Dataset[T] def filter(condition: Column): Dataset[T]- 1
- 2
- 3
所以,以下幾種寫法都是可以的:
scala> df.filter($"key1">"aaa").show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df.filter($"key1"==="aaa").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter("key1='aaa'").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter("key2=1").show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| +----+----+----+scala> df.filter($"key2"===3).show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | ccc| 3| 5| +----+----+----+scala> df.filter($"key2"===$"key3"-1).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| +----+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
其中, ===是在Column類中定義的函數(shù),對應的不等于是=!=。
$”列名”這個是語法糖,返回Column對象
where函數(shù)
scala> df.where("key1 = 'bbb'").show +----+----+----+ |key1|key2|key3| +----+----+----+ | bbb| 3| 4| | bbb| 4| 6| +----+----+----+scala> df.where($"key2"=!= 3).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 4| 6| +----+----+----+scala> df.where($"key3">col("key2")).show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df.where($"key3">col("key2")+1).show +----+----+----+ |key1|key2|key3| +----+----+----+ | ccc| 3| 5| | bbb| 4| 6| +----+----+----+分組,聚合,排序
scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
先來個最簡單的分組計數(shù):
/* 等價SQL: select key1, count(*) from table */ scala> df.groupBy("key1").count.show +----+-----+ |key1|count| +----+-----+ | ccc| 1| | aaa| 1| | bbb| 2| +----+-----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
注意,上面代碼中的count不是記錄數(shù),而是對groupBy的聚合結(jié)果的計數(shù)。如果是要看分組后有多少條記錄,代碼如下。可以看到在這個示例數(shù)據(jù)集中結(jié)果應該是3條:
/* 等價SQL: select distinct key1 from table */ scala> df.select("key1").distinct.show +----+ |key1| +----+ | ccc| | aaa| | bbb| +----+/* 等價SQL: select count(distinct key1) from table */ scala> df.select("key1").distinct.count res3: Long = 3- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
上面的結(jié)果中,如果你跟我一樣有強迫癥的話,顯然應該注意到了key1的顯示沒有排序,不能忍。修改如下:
/* 等價sql: select key1 , count(*) from table group by key1 order by key1 */scala> df.groupBy("key1").count.sort("key1").show +----+-----+ |key1|count| +----+-----+ | aaa| 1| | bbb| 2| | ccc| 1| +----+-----+/* 等價sql: select key1 , count(*) from table group by key1 order by key1 desc */scala> df.groupBy("key1").count.sort($"key1".desc).show +----+-----+ |key1|count| +----+-----+ | ccc| 1| | bbb| 2| | aaa| 1| +----+-----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
注意,上面一個是升序,一個是降序。和”select key1 , count(*) from table group by key1 order by key1 desc”降序的時候指定desc的時候,前面的key1跟了一個前綴,上一篇說過了,這個是col(column?name)的語法糖。以后的前綴,上一篇說過了,這個是col(column?name)的語法糖。以后的$前綴就不再解釋了。
繼續(xù)完善下,之前默認是按照分組計數(shù)的大小的升序排列的。如果要按分組計數(shù)的大小的逆序排序要怎么做呢?看之前的show結(jié)果,計數(shù)列顯示的列名就是count。所以,自然就能想到下面的寫法:
/* 等價sql: select key1 , count(*) from table group by key1 order by count(*) desc */ scala> df.groupBy("key1").count.sort($"count".desc).show +----+-----+ |key1|count| +----+-----+ | bbb| 2| | ccc| 1| | aaa| 1| +----+-----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
或者是用withColumnRenamed函數(shù)給列重命名:
/* 等價sql: select key1 , count(*) as cnt from table group by key1 order by cnt desc */ scala> df.groupBy("key1").count.withColumnRenamed("count", "cnt").sort($"cnt".desc).show +----+---+ |key1|cnt| +----+---+ | bbb| 2| | aaa| 1| | ccc| 1| +----+---+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
更常用的方法是,直接給count(*)來個別名。如下:
/* 等價sql: select key1 , count(*) as cnt from table group by key1 order by cnt desc */ scala> df.groupBy("key1").agg(count("key1").as("cnt")).show +----+---+ |key1|cnt| +----+---+ | ccc| 1| | aaa| 1| | bbb| 2| +----+---+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
嗯,我們看到這里引入了聚合函數(shù)agg。這函數(shù)通常是配合groupBy的,用法靈活。下面用幾個示例代碼直接上,注意區(qū)別Column類型參數(shù)和String類型參數(shù):
def agg(expr: Column, exprs: Column*): DataFrame def agg(exprs: Map[String, String]): DataFrame def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame/* 等價sql: select key1, count(key1), max(key2), avg(key3) from table group by key1 */ scala> df.groupBy("key1").agg(count("key1"), max("key2"), avg("key3")).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg("key1"->"count", "key2"->"max", "key3"->"avg").show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg(Map(("key1","count"), ("key2","max"), ("key3","avg"))).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+scala> df.groupBy("key1").agg(("key1","count"), ("key2","max"), ("key3","avg")).show +----+-----------+---------+---------+ |key1|count(key1)|max(key2)|avg(key3)| +----+-----------+---------+---------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+-----------+---------+---------+/* 等價sql: select key1, count(key1) cnt, max(key2) max_key2, avg(key3) avg_key3 from table group by key1 order by key1, max_key2 desc */scala> df.groupBy("key1").agg(count("key1").as("cnt"), max("key2").as("max_key2"), avg("key3").as("avg_key3")).sort($"cnt",$"max_key2".desc).show +----+---+--------+--------+ |key1|cnt|max_key2|avg_key3| +----+---+--------+--------+ | ccc| 1| 3| 5.0| | aaa| 1| 1| 2.0| | bbb| 2| 4| 5.0| +----+---+--------+--------+其他單表操作
還有些雜七雜八的小用法沒有提到,比如添加列,刪除列,NA值處理之類的,就在這里大概列一下吧。
數(shù)據(jù)集還是之前的那個吧:
scala> val df = spark.createDataset(Seq(("aaa",1,2),("bbb",3,4),("ccc",3,5),("bbb",4, 6)) ).toDF("key1","key2","key3") df: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
下面來添加一列,可以是字符串類型,整型;可以是常量或者是對當前已有的某列的變換,都行:
/* 新增字符串類型的列key_4,都初始化為new_str_col,注意這里的lit()函數(shù) 還有人發(fā)消息說這個lit(),補一下說明吧。這里的lit()是spark自帶的函數(shù),需要import org.apache.spark.sql.functions def lit(literal: Any): Column Creates a Column of literal value. The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value. Since 1.3.0 */ scala> val df_1 = df.withColumn("key4", lit("new_str_col")) df_1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]scala> df_1.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)scala> df_1.show +----+----+----+-----------+ |key1|key2|key3| key4| +----+----+----+-----------+ | aaa| 1| 2|new_str_col| | bbb| 3| 4|new_str_col| | ccc| 3| 5|new_str_col| | bbb| 4| 6|new_str_col| +----+----+----+-----------+/* 同樣的,新增Int類型的列key5,都初始化為1024 */ scala> val df_2 = df_1.withColumn("key5", lit(1024)) df_2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df_2.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key5: integer (nullable = false)scala> df_2.show +----+----+----+-----------+-----+ |key1|key2|key3| key4|key5| +----+----+----+-----------+-----+ | aaa| 1| 2|new_str_col| 1024| | bbb| 3| 4|new_str_col| 1024| | ccc| 3| 5|new_str_col| 1024| | bbb| 4| 6|new_str_col| 1024| +----+----+----+-----------+-----+/* 再來個不是常量的新增列key6 = key5 * 2 */ scala> val df_3 = df_2.withColumn("key6", $"key5"*2) df_3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df_3.show +----+----+----+-----------+----+----+ |key1|key2|key3| key4|key5|key6| +----+----+----+-----------+----+----+ | aaa| 1| 2|new_str_col|1024|2048| | bbb| 3| 4|new_str_col|1024|2048| | ccc| 3| 5|new_str_col|1024|2048| | bbb| 4| 6|new_str_col|1024|2048| +----+----+----+-----------+----+----+/* 這次是用的expr()函數(shù) */ scala> val df_4 = df_2.withColumn("key6", expr("key5 * 4")) df_4: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df_4.show +----+----+----+-----------+----+----+ |key1|key2|key3| key4|key5|key6| +----+----+----+-----------+----+----+ | aaa| 1| 2|new_str_col|1024|4096| | bbb| 3| 4|new_str_col|1024|4096| | ccc| 3| 5|new_str_col|1024|4096| | bbb| 4| 6|new_str_col|1024|4096| +----+----+----+-----------+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
刪除列就比較簡單了,指定列名就好了
/* 刪除列key5 */ scala> val df_5 = df_4.drop("key5") df_5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df_4.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key5: integer (nullable = false)|-- key6: integer (nullable = false)scala> df_5.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key4: string (nullable = false)|-- key6: integer (nullable = false)scala> df_5.show +----+----+----+-----------+----+ |key1|key2|key3| key4|key6| +----+----+----+-----------+----+ | aaa| 1| 2|new_str_col|4096| | bbb| 3| 4|new_str_col|4096| | ccc| 3| 5|new_str_col|4096| | bbb| 4| 6|new_str_col|4096| +----+----+----+-----------+----+/* 可以一次刪除多列key4和key6 */ scala> val df_6 = df_5.drop("key4", "key6") df_6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]/* 這里的columns函數(shù)以數(shù)組形式返回所有列名 */ scala> df_6.columns res23: Array[String] = Array(key1, key2, key3)scala> df_6.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
再寫幾個null值等無效數(shù)據(jù)的一些處理吧
這次得換個數(shù)據(jù)集,null值的表用個csv文件導入,代碼如下:
多表操作 join
?
先看兩個源數(shù)據(jù)表的定義:
scala> val df1 = spark.createDataset(Seq(("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6)) ).toDF("key1","key2","key3") df1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> val df2 = spark.createDataset(Seq(("aaa", 2, 2), ("bbb", 3, 5), ("ddd", 3, 5), ("bbb", 4, 6), ("eee", 1, 2), ("aaa", 1, 5), ("fff",5,6))).toDF("key1","key2","key4") df2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df1.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)scala> df2.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key4: integer (nullable = false)scala> df1.show() +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+scala> df2.show() +----+----+----+ |key1|key2|key4| +----+----+----+ | aaa| 2| 2| | bbb| 3| 5| | ddd| 3| 5| | bbb| 4| 6| | eee| 1| 2| | aaa| 1| 5| | fff| 5| 6| +----+----+----+Spark對join的支持很豐富,等值連接,條件連接,自然連接都支持。連接類型包括內(nèi)連接,外連接,左外連接,右外連接,左半連接以及笛卡爾連接。
下面一一示例,先看內(nèi)連接
/* 內(nèi)連接 select * from df1 join df2 on df1.key1=df2.key1 */ scala> val df3 = df1.join(df2,"key1") df3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]scala> df3.printSchema root|-- key1: string (nullable = true)|-- key2: integer (nullable = false)|-- key3: integer (nullable = false)|-- key2: integer (nullable = false)|-- key4: integer (nullable = false)scala> df3.show +----+----+----+----+----+ |key1|key2|key3|key2|key4| +----+----+----+----+----+ | aaa| 1| 2| 1| 5| | aaa| 1| 2| 2| 2| | bbb| 3| 4| 4| 6| | bbb| 3| 4| 3| 5| | bbb| 4| 6| 4| 6| | bbb| 4| 6| 3| 5| +----+----+----+----+----+/* 還是內(nèi)連接,這次用joinWith。和join的區(qū)別是連接后的新Dataset的schema會不一樣,注意和上面的對比一下。 */ scala> val df4=df1.joinWith(df2,df1("key1")===df2("key1")) df4: org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = [_1: struct<key1: string, key2: int ... 1 more field>, _2: struct<key1: string, key2: int ... 1 more field>]scala> df4.printSchema root|-- _1: struct (nullable = false)| |-- key1: string (nullable = true)| |-- key2: integer (nullable = false)| |-- key3: integer (nullable = false)|-- _2: struct (nullable = false)| |-- key1: string (nullable = true)| |-- key2: integer (nullable = false)| |-- key4: integer (nullable = false)scala> df4.show +---------+---------+ | _1| _2| +---------+---------+ |[aaa,1,2]|[aaa,1,5]| |[aaa,1,2]|[aaa,2,2]| |[bbb,3,4]|[bbb,4,6]| |[bbb,3,4]|[bbb,3,5]| |[bbb,4,6]|[bbb,4,6]| |[bbb,4,6]|[bbb,3,5]| +---------+---------+然后是外連接:
/* select * from df1 outer join df2 on df1.key1=df2.key1 */ scala> val df5 = df1.join(df2,df1("key1")===df2("key1"), "outer") df5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df5.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ |null|null|null| ddd| 3| 5| | ccc| 3| 5|null|null|null| | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| aaa| 1| 5| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| | bbb| 4| 6| bbb| 4| 6| |null|null|null| fff| 5| 6| |null|null|null| eee| 1| 2| +----+----+----+----+----+----+下面是左外連接,右外連接和左半連接:
/* 左外連接 */ scala> val df6 = df1.join(df2,df1("key1")===df2("key1"), "left_outer") df6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df6.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| aaa| 2| 2| | bbb| 3| 4| bbb| 4| 6| | bbb| 3| 4| bbb| 3| 5| | ccc| 3| 5|null|null|null| | bbb| 4| 6| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+/* 右外連接 */ scala> val df7 = df1.join(df2,df1("key1")===df2("key1"), "right_outer") df7: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df7.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | bbb| 4| 6| bbb| 3| 5| | bbb| 3| 4| bbb| 3| 5| |null|null|null| ddd| 3| 5| | bbb| 4| 6| bbb| 4| 6| | bbb| 3| 4| bbb| 4| 6| |null|null|null| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| |null|null|null| fff| 5| 6| +----+----+----+----+----+----+/* 左半連接 */ scala> val df8 = df1.join(df2,df1("key1")===df2("key1"), "leftsemi") df8: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]scala> df8.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | bbb| 4| 6| +----+----+----+笛卡爾連接不太常用,畢竟現(xiàn)在用spark玩的表都大得很,做這種全連接成本太大了。
/* 笛卡爾連接 */ scala> val df9 = df1.crossJoin(df2) df9: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df9.count res17: Long = 28/* 就顯示前10條結(jié)果吧 */ scala> df9.show(10) +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| bbb| 3| 5| | aaa| 1| 2| ddd| 3| 5| | aaa| 1| 2| bbb| 4| 6| | aaa| 1| 2| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| fff| 5| 6| | bbb| 3| 4| aaa| 2| 2| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| ddd| 3| 5| +----+----+----+----+----+----+ only showing top 10 rows下面這個例子還是個等值連接,區(qū)別之前的等值連接是去調(diào)用兩個表的重復列,就像自然連接一樣:
/* 基于兩個公共字段key1和key的等值連接 */ scala> val df10 = df1.join(df2, Seq("key1","key2")) df10: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]scala> df10.show +----+----+----+----+ |key1|key2|key3|key4| +----+----+----+----+ | aaa| 1| 2| 5| | bbb| 3| 4| 5| | bbb| 4| 6| 6| +----+----+----+----+條件連接在spark的低版本好像是不支持的,反正現(xiàn)在是ok啦~
/* select df1.*,df2.* from df1 join df2 on df1.key1=df2.key1 and df1.key2>df2.key2 */ scala> val df11 = df1.join(df2, df1("key1")===df2("key1") && df1("key2")>df2("key2")) df11: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]scala> df11.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+
--------------------- 本文來自 野男孩 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/coding_hello/article/details/74853504?utm_source=copy
總結(jié)
以上是生活随笔為你收集整理的Spark的Dataset操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark MLlib学习笔记之二——S
- 下一篇: 如何利用DeepFM算法设计推荐系统