pyspark学习
由于公司的項目需要用pyspark做數(shù)據(jù)清洗等工作,于是現(xiàn)學現(xiàn)用,也有很多不懂的地方,如果文章里面有什么總結得有問題的,歡迎大家指出。
更詳細的介紹也可以參考PySpark教程:使用Python學習Apache Spark
-
一. pyspark簡介
1. pyspark是什么
要學習pyspark,肯定首先要知道pyspark是什么。
Apache Spark是用Scala編程語言編寫的。為了讓Spark支持Python,Apache Spark社區(qū)發(fā)布了一個工具PySpark,從而可以以交互的方式使用Python編寫Spark程序。
然后需要了解RDD是怎么一回事。
2. RDD是什么
這部分內(nèi)容參考了另一篇文章:pyspark的使用和操作(基礎整理)
pyspark里最核心的模塊是SparkContext(簡稱sc),最重要的數(shù)據(jù)載體是RDD。RDD就像一個NumPy array或者一個Pandas Series(NumPy和Pandas都是Python的包),可以視作一個有序的item集合。只不過這些item并不存在driver端的內(nèi)存里,而是被分割成很多個partitions,每個partition的數(shù)據(jù)存在集群的executor的內(nèi)存中。
總結一下,在pyspark中創(chuàng)建RDD有兩種方法,一種是并行化一個列表,一種是直接讀取文件
但是所有工作的前提是初始化SparkSession,SparkSession是Spark 2引入的新概念。SparkSession為用戶提供了統(tǒng)一的切入點,來讓用戶學習spark的各項功能。spark2將SparkConf、SparkConText、SQLContext和HiveContext和StreamingContext進行了組合。所以在SQLContext和HiveContext等上可用的API在SparkSession上同樣是可以使用的。SparkSession內(nèi)部封裝了SparkContext,所以計算實際上是由SparkContext完成的。
我的平臺裝的是spark2.2,可以用spark2的寫法,但spark1的寫法也不會報錯。
#spark2的寫法 from pyspark.sql import SparkSession from pyspark import SparkContext spark = SparkSession.builder.appName("Project").getOrCreate() sc = SparkContext.getOrCreate()#spark1的寫法 #from pyspark import SparkContext as sc #from pyspark import SparkConf #conf = SparkConf().setAppName("Project").setMaster("local[*]")#代碼只在本地運行 ##conf = SparkConf().setAppName("lg").setMaster("spark://192.168.10.10:7077")#代碼在master上運行,設置master的ip和端口 #sc = sc.getOrCreate(conf)##方法一 并行化一個集合 #使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。 rdd1 = sc.parallelize([('蘋果', 8), ('香蕉', 4), ('葡萄',6), ('梨', 3), ('火龍果', 9)]) rdd1 #ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195##方法二 讀取文件 rdd2 = sc.textFile("c1.py")#讀入一個文件 rdd3 = sc.wholeTextFiles("c1.py")#讀入整個文件夾的所有文件,RDD中的每個item實際上是一個形如(文件名,文件所有內(nèi)容)的元組。 rdd2 #Output:c1.py MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0#getNumPartitions()方法可以查看list被分成了幾部分 rdd1.getNumPartitions() #Output:8#glom().collect()查看分區(qū)狀況 #小數(shù)據(jù)量情況下,可以直接將分布式的RDD通過轉換函數(shù)collect()轉換成一個數(shù)組,大數(shù)據(jù)量如上BT文件讀入會爆掉內(nèi)存…… rdd1.glom().collect() #Output:[[], [('蘋果', 8)], [], [('香蕉', 4)], [('葡萄', 6)], [], [('梨', 3)], [('火龍果', 9)]] #如果是大數(shù)據(jù)量的rdd,可以用take(n)來選取n個數(shù)據(jù)查看 rdd1.take(3) #Output:[('蘋果', 8), ('香蕉', 4), ('葡萄', 6)]#first()方法取讀入的rdd數(shù)據(jù)第一個item rdd1.first() #Output:('蘋果', 8)通過以下命令查看CPU個數(shù):
import psutil print(u'cpu個數(shù):',psutil.cpu_count()) #Output:cpu個數(shù): 8CPU個數(shù)為8,用list創(chuàng)建RDD時,就被分成了8個部分,可見分區(qū)數(shù)是按CPU個數(shù)決定的。
更多RDD的內(nèi)容繼續(xù)參考另一篇文章,寫得挺不錯的(嬉笑臉):pyspark的使用和操作(基礎整理)
通過上面的內(nèi)容應該對pyspark和RDD有所了解了吧,知道是怎么回事之后我們就繼續(xù)開始后面的工作。
-
二. pyspark的安裝
1.準備spark和python的環(huán)境
以下內(nèi)容都是在Linux環(huán)境下實現(xiàn)。
pyspark既然是python+spark,那么肯定需要spark和python了,我這里是在這之前已經(jīng)搭建好了大數(shù)據(jù)集群,下面是我的集群:
?
如果還沒有spark環(huán)境的,網(wǎng)上有很多相關文章,這里就不做過多的說明。
也可以參考我寫的《大數(shù)據(jù)平臺環(huán)境部署手冊》,我做了完整的總結:https://www.jianshu.com/p/bb09da06e045
Linux系統(tǒng)是自帶了python的,不過是python2,現(xiàn)在基本上都用python3了,未來python2會停止維護。
安裝Anaconda,Anaconda是個超級好用的工具,關于Anaconda的安裝,網(wǎng)上有很多相關文章,這里也不做過多的說明。
2.安裝pyspark
在Anaconda中使用pyspark,直接執(zhí)行如下命令,就可以直接導入pyspark模塊了,可以拿上面RDD的代碼做測試。
conda install pyspark接下來要開始我重要工作了。
-
三. RDD轉換sparkDataFrame
本來我的寫到一半了,結果看到了有其他的博主寫了pyspark學習系列的文章:
pyspark學習系列(一)創(chuàng)建RDD
pyspark學習系列(二)讀取CSV文件 為RDD或者DataFrame進行數(shù)據(jù)處理
……這學習系列還有很多,基本上滿足了我對于pyspark的所有學習需求
看到這些文章,我就又忍不住想偷懶了(@~@),不過我還是繼續(xù)簡單的記錄一下自己學到的東西吧,跟緊大佬的步伐。
我們知道DataFrame是Python中Pandas庫中的一種重要的數(shù)據(jù)結構,操作起數(shù)據(jù)來相當方便。spark也有DataFrame,概念差不多,RDD是最重要的數(shù)據(jù)載體。但RDD是無schema的數(shù)據(jù)結構,DataFrame是有schema的數(shù)據(jù)結構。
所以RDD想轉換成DataFrame就是在RDD基礎上加上schema,如果沒有提前定義好schema的名稱,轉化過程中默認schema為:_1,_2,_3
RDD轉換成DataFrame:
from pyspark.sql import SQLContext sqlContest = SQLContext(sc)data1 = spark.createDataFrame(rdd1) data2 = sqlContest.createDataFrame(rdd1)然后toPandas()就可以查看結果:
data1.toPandas()同樣 dataframe也可以轉換成rdd: rdd.map(lambda x: -----)
-
四.?讀取CSV文件 為RDD或者DataFrame進行數(shù)據(jù)處理
1. 本地csv文件讀取:
最簡單的方法:
import pandas as pd file = pd.read_csv(file) df = sqlContest.createDataFrame(file)?2. hdfs上的csv文件讀取:
1)采用先讀為RDD再轉換的形式
2)采用sqlContext.read.format()
有興趣的可以參考下這個,不過我還沒有實踐過不知道能不能成功:python對hdfs/spark讀寫操作(hdfs/pyspark)
五. 其他
還可以利用SQL進行查詢
#用createOrReplaceTempView方法創(chuàng)建臨時表 df.createOrReplaceTempView("table") #用SQL語句對這個臨時表進行查詢統(tǒng)計 spark.sql("select * from table").show()?RDD還有很多跟Python的dataframe差不多的方法:
map() 對RDD的每一個item都執(zhí)行同一個操作 flatMap() 對RDD中的item執(zhí)行同一個操作以后得到一個list,然后以平鋪的方式把這些list里所有的結果組成新的list filter() 篩選出來滿足條件的item distinct() 對RDD中的item去重 sample() 從RDD中的item中采樣一部分出來,有放回或者無放回 sortBy() 對RDD中的item進行排序將RDD轉化為dataframe之后,大多數(shù)就可以用dataframe的方法進行數(shù)據(jù)清洗了:去重,刪除空值,統(tǒng)計等
這篇文章暫時就總結到這里,有問題的歡迎指正,也歡迎一起探討一起學習哦。
?
“當你的才華還撐不起你的野心的時候,你就應該靜下心來學習”
以后這句話將會出現(xiàn)在我的每一篇博文中,用于提醒我自己,靜下來好好學習。
總結
- 上一篇: VPI使用过程中遇到的问题
- 下一篇: android 技术亮点,Android