python连接spark_python如何通过pyspark的API操作spark
park安裝略,下載解壓配置下就OK?我使用的是spark-2.2.0-bin-hadoop2.7
安裝完畢后需要配置一下SPARK_HOME:
SPARK_HOME=C:\spark\spark-2.2.0-bin-hadoop2.7
Path里也要記得添加一下:
Path=XXXX;%SPARK_HOME%\bin;
Python與Spark交互主要用到pyspark這個模塊,所以需要準(zhǔn)備好擴(kuò)展包,詳細(xì)請參考《
Whl安裝好后,能得到一個py4j文件夾,但是還需要pyspark模塊這個文件夾里的內(nèi)容,pyspark的獲得更簡單,直接去復(fù)制spark-2.2.0-bin-hadoop2.7/python/pyspark就好了。
PS:在某些版本的pyspark調(diào)用時會出現(xiàn),自己稍微查下原因,網(wǎng)上都有配套的py文件可以覆蓋,這里不是本文的重點(diǎn),所以略過。
我們在《Spark原理詳解》中介紹過,RDD分為轉(zhuǎn)化(transformation)和動作(action)兩種操作。RDD是基于當(dāng)前的partitions生成新的partitions;動作是基于當(dāng)前的partitions生成返回對象(數(shù)值、集合、字典等)。所以在通過python調(diào)用spark的API時需要搞清楚返回值是什么。如果返回的是partitions,調(diào)用collect()函數(shù)可以拿到封裝后的數(shù)據(jù)集,分區(qū)部分對客戶端是透明的,也可以調(diào)用glom()來關(guān)心具體的分區(qū)情況。如果調(diào)用的是action那么就簡單得多,API直接返回結(jié)果內(nèi)容。
Map、Reduce API:
最典型,也是最基本的入門API
from pyspark import SparkContext
sc = SparkContext('local')
#第二個參數(shù)2代表的是分區(qū)數(shù),默認(rèn)為1
old=sc.parallelize([1,2,3,4,5],2)
newMap = old.map(lambda x:(x,x**2))
newReduce = old.reduce(lambda a,b : a+b)
print(newMap.glom().collect())
print(newReduce)
[[(1, 1), (2, 4)], [(3, 9), (4, 16), (5, 25)]]
15
SparkContext是代碼的核心,初始化時需要設(shè)置spark的啟動類型,分為local、Mesos、YARN、Standalone模式(詳見
Map和reduce里都要設(shè)置一個function,我們這里用了lambda匿名函數(shù)來實(shí)現(xiàn)。從結(jié)果可以看將前兩和后三個分別放在了1個分區(qū)中,reduce是個action直接返回的是key的sum。
預(yù)留問題:能否reduce按第二行進(jìn)行求和合并,how?
flatMap、filter、distinc API:
數(shù)據(jù)的拆分、過濾和去重
sc = SparkContext('local')
old=sc.parallelize([1,2,3,4,5])
#新的map里將原來的每個元素拆成了3個
newFlatPartitions = old.flatMap(lambda x : (x, x+1, x*2))
#過濾,只保留小于6的元素
newFilterPartitions = newFlatPartitions.filter(lambda x: x<6)
#去重
newDiscinctPartitions = newFilterPartitions.distinct()
print(newFlatPartitions.collect())
print(newFilterPartitions.collect())
print(newDiscinctPartitions.collect())
[1, 2, 2, 2, 3, 4, 3, 4, 6, 4, 5, 8, 5, 6, 10]
[1, 2, 2, 2, 3, 4, 3, 4, 4, 5, 5]
[1, 2, 3, 4, 5]
Sample、taskSample、sampleByKey API:
數(shù)據(jù)的抽樣,在機(jī)器學(xué)習(xí)中十分實(shí)用的功能,而它們有的是傳輸有的是動作,需要留意這個區(qū)別。
代碼:
sc = SparkContext('local')
old=sc.parallelize(range(8))
samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
for num, element in zip(range(len(samplePartition)), samplePartition) :
print('sample: %s y=%s' %(str(num),str(element.collect())))
taskSamplePartition = [old.takeSample(withReplacement=False, num=4) for i in range(5)]
for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :
#注意因?yàn)槭莂ction,所以element是集合對象,而不是rdd的分區(qū)
print('taskSample: %s y=%s' %(str(num),str(element)))
mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
y = [mapRdd.sampleByKey(withReplacement=False,
fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
for num, element in zip(range(len(y)), y) :
#注意因?yàn)槭莂ction,所以element是集合對象,而不是rdd的分區(qū)
print('y: %s y=%s' %(str(num),str(element.collect())))
sample: 0 y=[2, 5]
sample: 1 y=[0, 3, 3, 6]
sample: 2 y=[0, 4, 7]
sample: 3 y=[1, 3, 3, 3, 6, 7]
sample: 4 y=[2, 4, 6]
taskSample: 0 y=[3, 4, 1, 6]
taskSample: 1 y=[2, 5, 3, 4]
taskSample: 2 y=[7, 1, 2, 5]
taskSample: 3 y=[6, 3, 1, 2]
taskSample: 4 y=[4, 6, 5, 0]
y: 0 y=[('B', 1)]
y: 1 y=[('B', 1), ('D', 4), ('E', 5)]
y: 2 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]
y: 3 y=[('B', 1), ('A', 2), ('D', 4), ('E', 5)]
y: 4 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]
有幾個參數(shù)需要說明下:
withReplacement代表取值后是否重新放回元素池,也就決定了某元素能否重復(fù)出現(xiàn)。
Fraction代表每個元素被取出來的概率。
Num代表取出元素的個數(shù)。
交集intersection、并集union、排序sortBy API:
sc = SparkContext('local')
rdd1 = sc.parallelize(['C','A','B','B'])
rdd2 = sc.parallelize(['A','A','D','E','B'])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd1.intersection(rdd2)
print(rdd3.collect())
print(rdd4.collect())
print(rdd3.sortBy(lambda x : x[0]).collect())
['C', 'A', 'B', 'B', 'A', 'A', 'D', 'E', 'B']
['A', 'B']
['A', 'A', 'A', 'B', 'B', 'B', 'C', 'D', 'E']
flod折疊、aggregate聚合API:
這倆都是action,雖然pyspark提供了max、min、sum、count、mean、stdev(標(biāo)準(zhǔn)差,反應(yīng)平均值的離散程度)、sampleStdev(與stdev意義相同,stdev分母N-1,sampleStdev分母N)、sampleVariance(方差,所有值平方和除N-1)、top、countByValue、first、collectAsMap等內(nèi)置的統(tǒng)計(jì)函數(shù),但是在某型特殊場景下還是希望能人工訂制聚合的公式,需要用到這兩個動作。
代碼:
sc = SparkContext('local')
rdd1 = sc.parallelize([2,4,6,1])
rdd2 = sc.parallelize([2,4,6,1],4)
zeroValue = 0
foldResult = rdd1.fold(zeroValue,lambda element, accumulate : accumulate+element)
zeroValue = (1,2)
seqOp = lambda accumulate,element : (accumulate[0] + element, accumulate[1] * element)
combOp = lambda accumulate,element : (accumulate[0]+element[0], accumulate[1] * element[1])
aggregateResult = rdd1.aggregate(zeroValue,seqOp,combOp)
print(foldResult)
print(aggregateResult)
aggregateResult = rdd2.aggregate(zeroValue,seqOp,combOp)
print(foldResult)
print(aggregateResult)
13
(15, 192)
13
(18, 1536)
Fold略簡單,但是agregate的理解非常難,不同的分區(qū)場景會得到不同的結(jié)果,這里用圖來解釋說明下:
默認(rèn)1個partition的情況:
4個partition的情況:
reduceByKey、reduceByKeyLocal API:
這兩個要計(jì)算的效果是一樣的,但是前者是傳輸,后者是動作,使用時候需要注意:
sc = SparkContext('local')
oldRdd=sc.parallelize([('Key1',1),('Key3',2),('Key1',3),('Key2',4),('Key2',5)])
newRdd = oldRdd.reduceByKey(lambda accumulate,ele : accumulate+ele)
newActionResult = oldRdd.reduceByKeyLocally(lambda accumulate,ele : accumulate+ele)
print(newRdd.collect())
print(newActionResult)
[('Key1', 4), ('Key3', 2), ('Key2', 9)]
{'Key1': 4, 'Key3': 2, 'Key2': 9}
回到前面map、reduce尾巴留的那個思考題,實(shí)現(xiàn)的方式不止一種,我這里給出兩種解題思路:
方案A:
sc = SparkContext('local')
#第二個參數(shù)2代表的是分區(qū)數(shù),默認(rèn)為1
old=sc.parallelize([1,2,3,4,5])
newMapRdd = old.map(lambda x : (str(x),x**2))
print(newMapRdd.collect())
mergeRdd = newMapRdd.values()
print(mergeRdd.sum())
sc = SparkContext('local')
oldRdd=sc.parallelize([1,2,3,4,5])
newListRdd = oldRdd.map(lambda x : x**2)
newMapRdd = oldRdd.zip(newListRdd)
print(newMapRdd.values().sum())
之所以給出這些思路,是因?yàn)槲覀冊谑褂?pyspark?的時候,除了要關(guān)心?transformation?和?action?之分,還需要注意你要處理的?rdd?里的數(shù)據(jù)是?list?還是?map?,因?yàn)閷τ谒麄儗?shí)用的方法又是不同的。如果有必要,可以像這樣做?list?和?map?的轉(zhuǎn)換。
總結(jié)
以上是生活随笔為你收集整理的python连接spark_python如何通过pyspark的API操作spark的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: led显示屏p10参数设置_LED显示屏
- 下一篇: 线性回归数据_数据科学笔记(三)——线性