PySpark机器学习 ML
5 Introducing the ML Package
在前面,我們使用了Spark中嚴(yán)格基于RDD的MLlib包。 在這里,我們將基于DataFrame使用MLlib包。 另外,根據(jù)Spark文檔,現(xiàn)在主要的Spark機(jī)器學(xué)習(xí)API是spark.ml包中基于DataFrame的一套模型。
5.1 ML包的介紹
從頂層上看,ML包主要包含三大抽象類:轉(zhuǎn)換器、預(yù)測(cè)器和工作流。
5.1.1 轉(zhuǎn)換器(Transformer)
從Transformer抽象類派生出來的每一個(gè)新的Transformer都需要實(shí)現(xiàn)一個(gè).transform(…) 方法,該方法可以將一個(gè)DataFrame轉(zhuǎn)換成另一個(gè)DataFrame。
在spark.ml.feature中有許多Transformer:
Binarizer :給定一個(gè)閾值,該方法需要一個(gè)連續(xù)的變量將其轉(zhuǎn)換為二進(jìn)制。
Bucketizer:分箱(分段處理):將連續(xù)數(shù)值轉(zhuǎn)換為離散類別比如特征是年齡,是一個(gè)連續(xù)數(shù)值,需要將其轉(zhuǎn)換為離散類別(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。
ChiSqSelector:對(duì)于分類目標(biāo)變量(考慮到分類模型),此方法允許你預(yù)定義數(shù)量的特征(通過numTopFeatures參數(shù)指定)。 選擇完成后,如方法的名稱所示,使用卡方檢驗(yàn)。 需要兩步:首先,你需要.fit(…) 數(shù)據(jù)(為了這個(gè)方法可以計(jì)算卡方檢驗(yàn))。然后,調(diào)用.fit(…)方法(將你的DataFrame作為參數(shù)傳遞)返回一個(gè)可以用.transform(…)轉(zhuǎn)換的ChiSqSelectorModel對(duì)象。
CountVectorizer:將文本文檔轉(zhuǎn)換為單詞計(jì)數(shù)的向量。當(dāng)不存在先驗(yàn)字典時(shí),Countvectorizer作為Estimator提取詞匯進(jìn)行訓(xùn)練,并生成一個(gè)CountVectorizerModel用于存儲(chǔ)相應(yīng)的詞匯向量空間。該模型產(chǎn)生文檔關(guān)于詞語的稀疏表示,其表示可以傳遞給其他算法,
HashingTF : 生成詞頻率向量。它采用詞集合并將這些集合轉(zhuǎn)換成固定長(zhǎng)度的特征向量。在文本處理中,“一組詞”可能是一袋詞。 HashingTF使用散列技巧。通過應(yīng)用散列函數(shù)將原始要素映射到索引,然后基于映射的索引來計(jì)算項(xiàng)頻率。
IDF : 此方法計(jì)算逆文檔頻率。需要注意的是文本首先要用向量表示,可以用HashingTF 或者 CountVectorizer。
MinMaxScaler:最大-最小規(guī)范化,將所有特征向量線性變換到用戶指定最大-最小值之間。但注意在計(jì)算時(shí)還是一個(gè)一個(gè)特征向量分開計(jì)算的。通常將最大,最小值設(shè)置為1和0,這樣就歸一化到[0,1]。Spark中可以對(duì)min和max進(jìn)行設(shè)置,默認(rèn)就是[0,1]。(不移動(dòng)中心點(diǎn))
MaxAbsScaler:同樣對(duì)某一個(gè)特征操作,各特征值除以最大絕對(duì)值,因此縮放到[-1,1]之間。且不移動(dòng)中心點(diǎn)。不會(huì)將稀疏矩陣變得稠密。
NGram :返回 n-grams,兩個(gè)連續(xù)的序列詞,或三個(gè)或更多。例如 [‘good’,’morning’, ‘Robin’, ‘Williams’] ,返回兩個(gè)即 [‘good morning’, ‘morning Robin’, ‘Robin Williams’] 。
Normalizer : 將某個(gè)特征向量(由所有樣本某一個(gè)特征組成的向量)計(jì)算其p-范數(shù),然后對(duì)該每個(gè)元素除以p-范數(shù)。將原始特征Normalizer以后可以使得機(jī)器學(xué)習(xí)算法有更好的表現(xiàn)。(默認(rèn)是L2)。
PCA : 數(shù)據(jù)降維。
RegexTokenizer:使用正則表達(dá)式。
StandardScaler :將某個(gè)特征向量(由所有樣本某一個(gè)特征組成的向量)進(jìn)行標(biāo)準(zhǔn)化,使數(shù)據(jù)均值為0,方差為1。Spark中可以選擇是帶或者不帶均值和方差。
StopWordsRemover :移除停用詞。
Tokenizer:這是默認(rèn)的標(biāo)記器,它將字符串轉(zhuǎn)換為小寫,然后在空間上分割。
VectorAssembler:這是一個(gè)非常有用的轉(zhuǎn)換器,將多個(gè)數(shù)值(包含向量)列整合到一個(gè)用向量表示的列中。
VectorSlicer:適用于特征向量,無論是稠密還是稀疏:給定一個(gè)索引列表,它從特征向量中提取值。
Word2Vec :該方法將一個(gè)句子(字符串)作為輸入,并將其轉(zhuǎn)換為{string,vector}格式的映射,這種格式在自然語言處理中非常有用。
String<->Index 相互轉(zhuǎn)換:
VectorIndexer:提高決策樹或隨機(jī)森林等ML方法的分類效果。VectorIndexer是對(duì)數(shù)據(jù)集特征向量中的類別(離散值)特征(index categorical features categorical features )進(jìn)行編號(hào)。它能夠自動(dòng)判斷哪些特征是離散值型的特征,并對(duì)他們進(jìn)行編號(hào),具體做法是通過設(shè)置一個(gè)maxCategories,特征向量中某一個(gè)特征不重復(fù)取值個(gè)數(shù)小于maxCategories,則被重新編號(hào)為0~K(K<=maxCategories-1)。某一個(gè)特征不重復(fù)取值個(gè)數(shù)大于maxCategories,則該特征視為連續(xù)值,不會(huì)重新編號(hào)(不會(huì)發(fā)生任何改變)。
StringIndexer:按label出現(xiàn)的頻次,轉(zhuǎn)換成0~num numOfLabels-1(分類個(gè)數(shù)),頻次最高的轉(zhuǎn)換為0,
IndexToString:有StringIndexer,就應(yīng)該有IndexToString。在應(yīng)用StringIndexer對(duì)labels進(jìn)行重新編號(hào)后,帶著這些編號(hào)后的label對(duì)數(shù)據(jù)進(jìn)行了訓(xùn)練,并接著對(duì)其他數(shù)據(jù)進(jìn)行了預(yù)測(cè),得到預(yù)測(cè)結(jié)果,預(yù)測(cè)結(jié)果的label也是重新編號(hào)過的,因此需要轉(zhuǎn)換回來。
5.1.2 預(yù)測(cè)器(Estimators)
預(yù)測(cè)器可以被認(rèn)為是需要評(píng)估的統(tǒng)計(jì)模型,來進(jìn)行預(yù)測(cè)或?qū)τ^測(cè)結(jié)果進(jìn)行分類。
如果派生自抽象的Estimator類,則新模型必須實(shí)現(xiàn).fit(…)方法,該方法給DataFrame中的數(shù)據(jù)以及一些默認(rèn)或用戶指定的參數(shù)泛化模型。
一、分類
ML包提供了七種分類模型,這里介紹四種常用的模型。
LogisticRegression:邏輯回歸是分類的基本模型。邏輯回歸使用logit函數(shù)來計(jì)算觀測(cè)到屬于特定類別的概率。
DecisionTreeClassifier :構(gòu)建一棵決策樹以預(yù)測(cè)觀察類別的分類器。maxDepth指定參數(shù)限制樹的生長(zhǎng)深度,minInstancePerNode確定進(jìn)一步拆分所需的樹節(jié)點(diǎn)中觀察值的最小數(shù)目,maxBins參數(shù)指定連續(xù)變量將被分割的最大數(shù)量的區(qū)間, impurity 指定測(cè)量和計(jì)算來自分割的信息增益的度量。
RandomForestClassifier:這個(gè)模型產(chǎn)生多個(gè)決策樹(因此稱為森林),并使用這些決策樹的模式輸出分類結(jié)果。 RandomForestClassifier支持二元和多元標(biāo)簽。
NaiveBayes:基于貝葉斯定理,這個(gè)模型使用條件概率來分類觀測(cè)。 PySpark ML中的NaiveBayes模型支持二元和多元標(biāo)簽。
二、回歸
PySpark ML包中有七種模型可用于回歸任務(wù)。這里只介紹兩種模型,如后續(xù)需要用可查閱官方手冊(cè)。
LinearRegression:最簡(jiǎn)單的回歸模型,它假定了特征和連續(xù)標(biāo)簽之間的線性關(guān)系,以及誤差項(xiàng)的正態(tài)性。
DecisionTreeRegressor:與分類模型類似,標(biāo)簽是連續(xù)的而不是二元或多元的。
三、聚類
聚類是一種無監(jiān)督的模型。PySpark ML包提供了四種模型。
BisectingKMeans :k-means 聚類和層次聚類的組合。該算法以單個(gè)簇中的所有觀測(cè)值開始,并將數(shù)據(jù)迭代地分成k個(gè)簇。
KMeans : 將數(shù)據(jù)分成k個(gè)簇,隨機(jī)生成k個(gè)初始點(diǎn)作為質(zhì)心,將數(shù)據(jù)集中的數(shù)據(jù)按照距離質(zhì)心的遠(yuǎn)近分到各個(gè)簇中,將各個(gè)簇中的數(shù)據(jù)求平均值,作為新的質(zhì)心,重復(fù)上一步,直到所有的簇不再改變。
GaussianMixture:這個(gè)方法使用k個(gè)未知的高斯分布參數(shù)來剖析數(shù)據(jù)集。使用期望最大化算法,通過最大化對(duì)數(shù)似然函數(shù)來找到高斯參數(shù)。
LDA:此模型用于自然語言處理應(yīng)用程序中的主題建模。
5.1.3 管道/工作流(Pipeline)
一個(gè)管道串起多個(gè)轉(zhuǎn)換器和預(yù)測(cè)器,明確一個(gè)機(jī)器學(xué)習(xí)工作流。
預(yù)測(cè)上一篇中的嬰兒存活率。
載入數(shù)據(jù):
import pyspark.sql.types as typ
labels = [
(‘INFANT_ALIVE_AT_REPORT’, typ.IntegerType()),
(‘BIRTH_PLACE’, typ.StringType()),
(‘MOTHER_AGE_YEARS’, typ.IntegerType()),
(‘FATHER_COMBINED_AGE’, typ.IntegerType()),
(‘CIG_BEFORE’, typ.IntegerType()),
(‘CIG_1_TRI’, typ.IntegerType()),
(‘CIG_2_TRI’, typ.IntegerType()),
(‘CIG_3_TRI’, typ.IntegerType()),
(‘MOTHER_HEIGHT_IN’, typ.IntegerType()),
(‘MOTHER_PRE_WEIGHT’, typ.IntegerType()),
(‘MOTHER_DELIVERY_WEIGHT’, typ.IntegerType()),
(‘MOTHER_WEIGHT_GAIN’, typ.IntegerType()),
(‘DIABETES_PRE’, typ.IntegerType()),
(‘DIABETES_GEST’, typ.IntegerType()),
(‘HYP_TENS_PRE’, typ.IntegerType()),
(‘HYP_TENS_GEST’, typ.IntegerType()),
(‘PREV_BIRTH_PRETERM’, typ.IntegerType())
]
# Specifying the schema of the DataFrame
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv(‘births_transformed.csv.gz’,
header=True,
schema=schema)
創(chuàng)建轉(zhuǎn)換器
因?yàn)榻y(tǒng)計(jì)模型只能對(duì)數(shù)值數(shù)據(jù)進(jìn)行操作,我們必須對(duì)birth_place變量進(jìn)行編碼。
import pyspark.ml.feature as ft
# Casting the column to an IntegerType
births = births
.withColumn(‘BIRTH_PLACE_INT’, births[‘BIRTH_PLACE’]
.cast(typ.IntegerType()))
# Using the OneHotEncoder to encode
encoder = ft.OneHotEncoder(
inputCol=‘BIRTH_PLACE_INT’,
outputCol=‘BIRTH_PLACE_VEC’)
# Using the VectorAssembler to create a single column with all the features collated together.
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] +
[encoder.getOutputCol()],
outputCol=‘features’
)
創(chuàng)建預(yù)測(cè)器
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
labelCol=‘INFANT_ALIVE_AT_REPORT’)
創(chuàng)建一個(gè)工作流
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])
訓(xùn)練模型
births_train, births_test = births
.randomSplit([0.7, 0.3], seed=666)
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
評(píng)估
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol=‘probability’,
labelCol=‘INFANT_ALIVE_AT_REPORT’)
print(evaluator.evaluate(test_model,
{evaluator.metricName: ‘a(chǎn)reaUnderROC’}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: ‘a(chǎn)reaUnderPR’}))
保存模型
pipelinePath = ‘./infant_oneHotEncoder_Logistic_Pipeline’
pipeline.write().overwrite().save(pipelinePath)
# You can load it up later and use it straight away to .fit(…) and predict:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline
.fit(births_train)
.transform(births_test)
總結(jié)
以上是生活随笔為你收集整理的PySpark机器学习 ML的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 七大设计原理
- 下一篇: Connection reset by