生活随笔
收集整理的這篇文章主要介紹了
Spark RDD编程模型及算子介绍(二)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
常見的Action算子
- countByKey算子:統計Key出現的次數,部分代碼如下:
rdd_file
= sc
.textFile
("../Data/input/words.txt")
rdd_map
= rdd_file
.flatMap
(lambda line
: line
.split
(" ")).map(lambda x
:(x
, 1))
rdd_count
= rdd_map
.countByKey
()
print(rdd_count
)
print(type(rdd_count
))
rdd
= sc
.parallelize
(range(1,10))
rdd_reduce
= rdd
.reduce(lambda a
,b
: a
+b
)
print(rdd_reduce
)
- fold算子:和reduce一樣接收傳入邏輯進行聚合,聚合是帶有初始值的。這個初始值既要作用在分區內,也要作用在分區間,部分代碼如下:
rdd
= sc
.parallelize
(range(1,10),3)
rdd_reduce
= rdd
.fold
(10,lambda a
,b
: a
+b
)
print(rdd_reduce
)
sc
.parallelize
([1,2,3,4]).first
()
sc
.parallelize
([1,2,3,4],3).take
(2)
sc
.parallelize
([1,2,3,4],3).top
(2)
- count算子:計算RDD有多少條數據,返回值為一個數字
sc
.parallelize
([1,2,3,4],3).count
()
- takeSample算子:隨機抽樣RDD的數據,部分代碼如下:
rdd
= sc
.parallelize
([1,2,3,4,5,6,7,6,5,4,3,2,1],1)
rdd_takeSample1
= rdd
.takeSample
(True, 18)
print(rdd_takeSample1
)
rdd_takeSample2
= rdd
.takeSample
(False, 18)
print(rdd_takeSample2
)
- takeOrdered算子:對RDD排序取前N個,部分代碼如下:
rdd
= sc
.parallelize
([1,2,3,4,5,6,7])
rdd_takeOrdered1
= rdd
.takeOrdered
(4)
rdd_takeOrdered2
= rdd
.takeOrdered
(4,lambda x
: -x
)print(rdd_takeOrdered1
)
print(rdd_takeOrdered2
)
- foreach算子:對RDD的每個元素,執行邏輯操作與map類似,但是這個方法沒有返回值。如果想顯示值,只能在里面自行打印(無需經過Driver,直接在Executor打印效率更高)。
rdd
= sc
.parallelize
([1,2,3,4,5,6,7],1)
rdd1
= rdd
.foreach
(lambda x
: 2*x
+1)
rdd2
= rdd
.foreach
(lambda x
: print(2*x
+1))
print(rdd1
)
3
5
7
9
11
13
15
None
- saveAsTextFile算子:保存文件API,分布式執行,不經過Driver,每個分區所在的Executor直接控制數據寫出到目標文件系統中,每個分區產生1個結果文件。
rdd_file
= sc
.textFile
("hdfs://node1:8020/Test/WordCount.txt",3)
rdd_words
= rdd_file
.flatMap
(lambda line
: line
.split
(" "))
rdd_map
= rdd_words
.map(lambda x
:(x
, 1))
rdd_total
= rdd_map
.reduceByKey
(lambda a
,b
: a
+ b
)
rdd_rs
= rdd_total
.saveAsTextFile
("hdfs://node1:8020/Test/word_rs1")
結果如下圖所示在HDFS WebUI上查看
常見分區操作算子
- mapPartitions算子:與map相似,只是一次被傳遞的是一整個分區的數據,雖然在執行次數上與map相同,但是可以因為減少了網絡io的傳輸次數,效率會大大的提高。部分代碼如下:
rdd
= sc
.parallelize
([1,2,3,4,5,6],3)
def func(iter):rs
= list()for it
in iter:rs
.append
(2 * it
+ 1)return rs
rdd_part
= rdd
.mapPartitions
(func
)
rdd_rs
= rdd_part
.collect
()
print(rdd_rs
)
- foreachPartition算子:與普通foreach一樣,只是一次被傳遞的是一整個分區的數據,部分代碼如下:
rdd
= sc
.parallelize
([1,2,3,4,5,6],3)
def func(iter):rs
= list()for it
in iter:rs
.append
(2 * it
+ 1)print(rs
)rdd_part
= rdd
.foreachPartition
(func
)
- partitionBy算子:對RDD進行自定義分區操作,部分代碼如下
rdd
= sc
.parallelize
([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])def func(key
):if key
== 'a' or key
== 'b' : return 0if key
== 'c' or key
== 'd' : return 1return 2rdd_part
= rdd
.partitionBy
(3,func
)
rdd_rs
= rdd_part
.glom
().collect
()
print(rdd_rs
)
- repartition算子:對RDD的分區執行重新分區。不建議使用此算子,除非做全局排序的時候,將其設置為1。如果修改盡量減少,不要增加,增加會導致shuffle。不管是增加還是減少都會影響并行計算(內存迭代并行的管道數量),部分代碼如下:
rdd
= sc
.parallelize
([1,2,3,4,5,6],3)
rdd_re1
= rdd
.getNumPartitions
()
print(rdd_re1
)
rdd_re2
= rdd
.repartition
(1).getNumPartitions
()
print(rdd_re2
)
rdd_re3
= rdd
.repartition
(5).getNumPartitions
()
print(rdd_re3
)
- coalesce算子:對分區數量進行增減,部分代碼如下:
rdd_re4
= rdd
.coalesce
(1).getNumPartitions
()
print(rdd_re4
)
rdd_re5
= rdd
.coalesce
(5).getNumPartitions
()
print(rdd_re5
)
rdd_re6
= rdd
.coalesce
(5,shuffle
=True).getNumPartitions
()
print(rdd_re6
)
- 在源碼中我們可以發現reparation算子底層調用的就是coalesce算子,只不過shuffle定義為true。源碼如下:
def repartition(self
, numPartitions
):return self
.coalesce
(numPartitions
, shuffle
=True)
總結
以上是生活随笔為你收集整理的Spark RDD编程模型及算子介绍(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。