【PySpark入门】手把手实现PySpark机器学习项目-回归算法
摘要??
?PySpark作為工業界常用于處理大數據以及分布式計算的工具,特別是在算法建模時起到了非常大的作用。PySpark如何建模呢?這篇文章手把手帶你入門PySpark,提前感受工業界的建模過程!
任務簡介??
在電商中,了解用戶在不同品類的各個產品的購買力是非常重要的!這將有助于他們為不同產品的客戶創建個性化的產品。在這篇文章中,筆者在真實的數據集中手把手實現如何預測用戶在不同品類的各個產品的購買行為。
如果有興趣和筆者一步步實現項目,可以先根據上一篇文章的介紹中安裝PySpark,并在網站中下載數據。
https://datahack.analyticsvidhya.com/contest/black-friday/
數據集簡介??
某零售公司想要了解針對不同類別的各種產品的顧客購買行為(購買量)。他們為上個月選定的大批量產品分享了各種客戶的購買匯總。該數據集還包含客戶人口統計信息(age, gender, marital status, city_type, stay_in_current_city),產品詳細信息(product_id and product category)以及上個月的purchase_amount總數。現在,他們希望建立一個模型來預測客戶對各種產品的購買量,這將有助于他們為不同產品的客戶創建個性化的產品。
手把手實戰項目??
1. 導入數據
這里我們使用PySpark的讀數據接口read.csv讀取數據,和pandas讀取數據接口迷之相似。
from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("test") \.config("spark.some.config.option", "setting") \.getOrCreate()train = spark.read.csv('./BlackFriday/train.csv', header=True, inferSchema=True) test = spark.read.csv('./BlackFriday/test.csv', header=True, inferSchema=True2. 分析數據的類型
要查看Dataframe中列的類型,可以使用printSchema()方法。讓我們在train上應用printSchema(),它將以樹格式打印模式。
train.printSchema() """ root|-- User_ID: integer (nullable = true)|-- Product_ID: string (nullable = true)|-- Gender: string (nullable = true)|-- Age: string (nullable = true)|-- Occupation: integer (nullable = true)|-- City_Category: string (nullable = true)|-- Stay_In_Current_City_Years: string (nullable = true)|-- Marital_Status: integer (nullable = true)|-- Product_Category_1: integer (nullable = true)|-- Product_Category_2: integer (nullable = true)|-- Product_Category_3: integer (nullable = true)|-- Purchase: integer (nullable = true) """3.?預覽數據集
在PySpark中,我們使用head()方法預覽數據集以查看Dataframe的前n行,就像python中的pandas一樣。我們需要在head方法中提供一個參數(行數)。讓我們看一下train的前5行。
train.head(5) """ [Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200), Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), Row(User_ID=1000002,?Product_ID='P00285442',?Gender='M',?Age='55+',?Occupation=16,?City_Category='C',?Stay_In_Current_City_Years='4+',?Marital_Status=0,?Product_Category_1=8,?Product_Category_2=None,?Product_Category_3=None,?Purchase=7969)] """要查看數據框架中的行數,我們需要調用方法count()。讓我們核對一下train上的行數。Pandas和Spark的count方法是不同的。
4.?插補缺失值
通過調用drop()方法,可以檢查train上非空數值的個數,并進行測試。默認情況下,drop()方法將刪除包含任何空值的行。我們還可以通過設置參數“all”,當且僅當該行所有參數都為null時以刪除該行。這與pandas上的drop方法類似。
train.na.drop('any').count(),test.na.drop('any').count() """ (166821, 71037) """在這里,為了填充簡單,我使用-1來填充train和test的null值。雖然這不是一個很好的填充方法,你可以選擇其他的填充方式。
train = train.fillna(-1) test = test.fillna(-1)5.?分析數值特征
我們還可以使用describe()方法查看Dataframe列的各種匯總統計信息,它顯示了數字變量的統計信息。要顯示結果,我們需要調用show()方法。
train.describe().show() """ +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ |summary| User_ID|Product_ID|Gender| Age| Occupation|City_Category|Stay_In_Current_City_Years| Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3| Purchase| +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ | count| 550068| 550068|550068|550068| 550068| 550068| 550068| 550068| 550068| 550068| 550068| 550068| | mean|1003028.8424013031| null| null| null| 8.076706879876669| null| 1.468494139793958|0.40965298835780306| 5.404270017525106| 6.419769919355425| 3.145214773446192|9263.968712959126| | stddev| 1727.591585530871| null| null| null|6.5226604873418115| null| 0.989086680757309| 0.4917701263173259| 3.936211369201324| 6.565109781181374| 6.681038828257864|5023.065393820593| | min| 1000001| P00000142| F| 0-17| 0| A| 0| 0| 1| -1| -1| 12| | max| 1006040| P0099942| M| 55+| 20| C| 4+| 1| 20| 18| 18| 23961| +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ """上面看起來好像比較亂,這里我們選擇某一列來看看
讓我們從一個列中選擇一個名為“User_ID”的列,我們需要調用一個方法select并傳遞我們想要選擇的列名。select方法將顯示所選列的結果。我們還可以通過提供用逗號分隔的列名,從數據框架中選擇多個列。
train.select('User_ID','Age').show(5) """ +-------+----+ |User_ID| Age| +-------+----+ |1000001|0-17| |1000001|0-17| |1000001|0-17| |1000001|0-17| |1000002| 55+| +-------+----+ only showing top 5 rows """ 6.?分析categorical特征為了建立一個模型,我們需要在“train”和“test”中看到分類特征的分布。這里我只對Product_ID顯示這個,但是我們也可以對任何分類特性執行相同的操作。讓我們看看在“train”和“test”中Product_ID的不同類別的數量。這可以通過應用distinct()和count()方法來實現。
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count() """ (3631, 3491) """在計算“train”和“test”的不同值的數量后,我們可以看到“train”和“test”有更多的類別。讓我們使用相減方法檢查Product_ID的類別,這些類別正在"test"中,但不在“train”中。我們也可以對所有的分類特征做同樣的處理。
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))diff_cat_in_train_test.distinct().count() """ (46, None) """diff_cat_in_train_test.distinct().show(5) """ +----------+ |Product_ID| +----------+ | P00322642| | P00300142| | P00077642| | P00249942| | P00294942| +----------+ only?showing?top?5?rows """以上你可以看到46個不同的類別是在"test"中,而不在"train"中。在這種情況下,我們要么收集更多關于它們的數據,要么跳過那些類別(無效類別)的“test”。
7.?將分類變量轉換為標簽我們還需要通過在Product_ID上應用StringIndexer轉換將分類列轉換為標簽,該轉換將標簽的Product_ID列編碼為標簽索引的列。
from pyspark.ml.feature import StringIndexer plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_id_trans') labeller = plan_indexer.fit(train)在上面,我們將fit()方法應用于“train”數據框架上,構建了一個標簽。稍后我們將使用這個標簽來轉換我們的"train"和“test”。讓我們在labeller的幫助下轉換我們的train和test的Dataframe。我們需要調用transform方法。我們將把轉換結果存儲在Train1和Test1中.
Train1 = labeller.transform(train) Test1 = labeller.transform(test) Train1.show(2) """ +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ |1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370| 766.0| |1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200| 183.0| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ only?showing?top?2?rows """Train1.select('product_id_trans').show(2) """ +----------------+ |product_id_trans| +----------------+ | 766.0| | 183.0| +----------------+ only showing top 2 rows """上面已經顯示了我們在以前的"train" Dataframe中成功的添加了一個轉化后的列“product_id_trans”,("Train1" Dataframe)。
8.?選擇特征來構建機器學習模型
首先,我們需要從pyspark.ml.feature導入RFormula;然后,我們需要在這個公式中指定依賴和獨立的列;我們還必須為為features列和label列指定名稱。
from pyspark.ml.feature import RFormula formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")在創建了這個公式之后,我們需要將這個公式應用到我們的Train1上,并通過這個公式轉換Train1,Test1。讓我們看看如何做到這一點,在擬合變換train1之后,
t1 = formula.fit(Train1) train1 = t1.transform(Train1) test1 = t1.transform(Test1) train1.show(2) """ +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans| features| label| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ |1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370| 766.0|(16,[6,10,13,14],...| 8370.0| |1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200| 183.0|(16,[6,10,13,14],...|15200.0| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ only showing top 2 rows """在應用了這個公式之后,我們可以看到train1和test1有兩個額外的列,稱為features和label,并對我們在公式中指定的列進行標記(featuresCol= features和labelCol= label)。直觀上,train1和test1中的features列中的所有分類變量都被轉換為數值,數值變量與之前應用ML時相同。我們還可以查看train1和test1中的列特性和標簽。
train1.select('features').show(2) """ +--------------------+ | features| +--------------------+ |(16,[6,10,13,14],...| |(16,[6,10,13,14],...| +--------------------+ only showing top 2 rows """train1.select('label').show(2) """ +-------+ | label| +-------+ | 8370.0| |15200.0| +-------+ only showing top 2 rows """9.?建立機器學習模型
在應用RFormula和轉換Dataframe之后,我們現在需要根據這些數據開發機器學習模型。我想為這個任務應用一個隨機森林回歸。讓我們導入一個在pyspark.ml中定義的隨機森林回歸器。然后建立一個叫做rf的模型。我將使用隨機森林算法的默認參數。
from pyspark.ml.regression import RandomForestRegressor rf = RandomForestRegressor()在創建一個模型rf之后,我們需要將train1數據劃分為train_cv和test_cv進行交叉驗證。這里,我們將train1數據區域劃分為train_cv的70%和test_cv的30%。
(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])在train_cv上建立模型,在test_cv上進行預測。結果將保存在predictions中。
model1 = rf.fit(train_cv) predictions = model1.transform(test_cv)10. 模型效果評估
讓我們評估對test_cv的預測,看看rmse和mse是多少。
為了評估模型,我們需要從pyspark.ml.evaluation中導入RegressionEvaluator。我們必須為此創建一個對象。有一種方法叫 evaluate for evaluator ,它對模型求值。我們需要為此指定度量標準。
from pyspark.ml.evaluation import RegressionEvaluator evaluator = RegressionEvaluator() mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" }) import numpy as np np.sqrt(mse), mse """ (3832.4796474051345, 14687900.247774584) """經過計算,我們可以看到我們的rmse是3827.767295494888。
現在,我們將在所有的train1數據集上再次訓練一個模型。
model = rf.fit(train1) predictions1 = model.transform(test1)預測之后,我們得到測試集預測結果,并將其保存成csv文件。
df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase') df.toPandas().to_csv('./BlackFriday/submission.csv')寫入csv文件后(submission.csv)。我們可以上傳我們的第一個解決方案來查看分數,我得到的分數是3844.20920145983。
總結??
在本文中,我以一個真實案例介紹了PySpark建模流程。這只是本系列文章的開始。在接下來的幾周,我將繼續分享PySpark使用的教程。同時,如果你有任何問題,或者你想對我要講的內容提出任何建議,歡迎留言。
個人微信:加時請注明 (昵稱+公司/學校+方向)
? 歷史精品文章推薦???
知否?知否?一文看懂深度文本分類之DPCNN原理與代碼
機器學習入門方法和資料合集
撩一發深度文本分類之RNN via Attention
15分鐘帶你入門sklearn與機器學習——分類算法篇
如何為你的回歸問題選擇最合適的機器學習方法?
PySpark 安裝、配置之使用初體驗
總結
以上是生活随笔為你收集整理的【PySpark入门】手把手实现PySpark机器学习项目-回归算法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 方程组的几何解释 [MIT线代第一课pd
- 下一篇: 如何科学的打开 Leetcode