推荐算法工程笔记:PySpark特征工程入门总结
PySpark Feature Tool
1. 數(shù)據(jù)準(zhǔn)備
我們定義了一些測試數(shù)據(jù),方便驗證函數(shù)的有效性;同時對于大多數(shù)初學(xué)者來說,明白函數(shù)的輸入是什么,輸出是什么,才能更好的理解特征函數(shù)和使用特征:
df = spark.createDataFrame([('zhu', "Hi I heard about pySpark"),('xiang', "I wish python could use case classes"),('yu', "Logistic regression models are neat") ], ["id", "sentence"]) # functionTestData +-----+------------------------------------+ |id |sentence | +-----+------------------------------------+ |zhu |Hi I heard about pySpark. | |xiang|I wish python could use case classes| |yu |Logistic regression models are neat | +-----+------------------------------------+2.數(shù)據(jù)讀取
# !/usr/bin/env python # -*- coding: utf-8 -*-######################################################################################################################## # Creater : Zhu Xiangyu.DOTA # Creation Time : 2020-2-17 12:45:09 # Description : PySpark 特征工程工具集 # Modify By : # Modify Time : # Modify Content : # Script Version : 2.0.0.9 ########################################################################################################################import math from pyspark.sql import SparkSession spark = SparkSession.builder.appName('DOTAd_Features_Tool').enableHiveSupport().getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", 1000) spark.conf.set("spark.default.parallelism", 2000)def get_params():return {# Function Can be Used'column1' : "TFIDF", # 詞頻-逆向文件頻率'column2' : "Word2Vec",'column3' : "CountVectorizer",'column4' : "OneHotEncoder",'column5' : "StringIndexer",'column6' : "IndexToString",'column7' : "PCA",'column8' : "Binarizer",'column9' : "Tokenizer",'column10': "StopWordsRemover", #'column11': "NGram", #'column12': "DCT", # 離散余弦變換'column13': "ChiSqSelector", # 卡方校驗'column14': "PearsonCorr", # 皮爾遜系數(shù)}def main():# Reset params######################################################################################## 庫名.表名dataset_Name = ""dataset = spark.sql("select * from {dataset_Name}".format(dataset_Name = dataset_Name)).fillna(0)## 結(jié)果存儲目標(biāo) 庫名.表名saveAsTable_Name = ""## 指定對列col進行function操作 {col:function}params = {'sentence': "TFIDF"}######################################################################################### functionTestDatadf = spark.createDataFrame([('zhu', "Hi I heard about pySpark"),('xiang', "I wish python could use case classes"),('yu', "Logistic regression models are neat")], ["id", "sentence"])# Feature Transformfeatures = featureTool(dataset,params) # Test-Model : dataset = dffeatures.show(5)# Save Feature as tablesaveResult(features,saveAsTable_Name)3.數(shù)據(jù)存儲
# SaveTableAs def saveResult(res,saveAsTable_Name='dota_tmp.dota_features_tool_save_result', saveFormat="orc",saveMode="overwrite"):res.write.saveAsTable(name=saveAsTable_Name, format=saveFormat,mode=saveMode)4.特征函數(shù)
def featureTool(df,params):dataCols,targetCols = df.columns,params.keys()exeColumns = list(params.keys())[0]exeDefFunction = params[exeColumns]print(exeColumns+"-->"+exeDefFunction+"(df,{exeColumns})".format(exeColumns=exeColumns))exeOrder = "feat={exeDef}(df,'{exeCols}','{outputCol}')".format(exeCols=exeColumns,exeDef=exeDefFunction,outputCol=exeDefFunction+'_'+exeColumns)print("exeOrder : "+exeOrder)exec(exeOrder)return feat4.1 TFIDF
權(quán)重計算方法經(jīng)常會和余弦相似度(cosine similarity)一同使用于向量空間模型中,用以判斷兩份文件之間的相似性。當(dāng)前,真正在搜索引擎等實際應(yīng)用中廣泛使用的是Tf-idf 模型。Tf-idf 模型的主要思想是:如果詞w在一篇文檔d中出現(xiàn)的頻率高,并且在其他文檔中很少出現(xiàn),則認為詞w具有很好的區(qū)分能力,適合用來把文章d和其他文章區(qū)分開來。
def TFIDF(df,inputCol="sentence",outputCol="tfidf", numFeatures=20):"""詞頻-逆向文件頻率(TF-IDF)是一種在文本挖掘中廣泛使用的特征向量化方法,它可以體現(xiàn)一個文檔中詞語在語料庫中的重要程度。# 總結(jié):一個詞語在一篇文章中出現(xiàn)次數(shù)越多, 同時在所有文檔中出現(xiàn)次數(shù)越少, 越能夠代表該文章."""from pyspark.ml.feature import HashingTF, IDF, TokenizertokenizerX = Tokenizer(inputCol=inputCol, outputCol="words")wordsDataX = tokenizerX.transform(df)hashingTFX = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=numFeatures)featurizedData = hashingTFX.transform(wordsDataX)idfX = IDF(inputCol="rawFeatures", outputCol=outputCol)idfModel = idfX.fit(featurizedData)tfidfRes = idfModel.transform(featurizedData).drop('words','rawFeatures')return tfidfRes上述代碼輸出結(jié)果如下:
TFIDF Output +-----+------------------------------------+----------------------------------------------------------------------------------------+ |id |sentence |tfidf | +-----+------------------------------------+----------------------------------------------------------------------------------------+ |zhu |Hi I heard about pySpark |(20,[0,9,17],[0.6931471805599453,0.5753641449035617,1.3862943611198906]) | |xiang|I wish python could use case classes|(20,[2,9,13,15],[0.6931471853,1.15072071234,0.285178085,0.28768207245178085]) | |yu |Logistic regression models are neat |(20,[4,6,13,15,18],[0.69314718053,0.693147453,0.287682078085,0.2876085,0.693149453]) | +-----+------------------------------------+----------------------------------------------------------------------------------------+4.2 Word2Vec
word2vec模型其實就是簡單化的神經(jīng)網(wǎng)絡(luò),它可以將文本數(shù)據(jù)向量化。詞向量具有良好的語義特性,是表示詞語特征的常用方式。詞向量每一維的值代表一個具有一定的語義和語法上解釋的特征。所以,可以將詞向量的每一維稱為一個詞語特征。詞向量具有多種形式,distributed representation 是其中一種。一個 distributed representation 是一個稠密、低維的實值向量。distributed representation 的每一維表示詞語的一個潛在特征,該特 征捕獲了有用的句法和語義特性。可見,distributed representation 中的 distributed 一詞體現(xiàn)了詞向量這樣一個特點:將詞語的不同句法和語義特征分布到它的每一個維度去表示。
def Word2Vec(df,inputCol="sentence",outputCol="w2v",vectorSize=100, minCount=5, numPartitions=1,stepSize=0.025, maxIter=1, seed=None, windowSize=5, maxSentenceLength=1000):"""Word2vec:將word轉(zhuǎn)化為vector,word是順序有意義的實體,比如文檔中單詞、用戶依次點擊的商品。Word2vec 得到實體向量,可以用來度量實體間相似度,在此基礎(chǔ)上,以下方向都可以應(yīng)用:分類,聚類,推薦,句子向量,短文本分類。## 兩種實現(xiàn)方式# Skip-gram:用一個詞語作為輸入,來預(yù)測它周圍的上下文。# CBOW :用一個詞語的上下文作為輸入,來預(yù)測這個詞語本身。#Spark 的 Word2vec 是一個Estimator,它采用一系列代表文檔的詞語來訓(xùn)練word2vecmodel。[Spark實現(xiàn)的是Skip-gram模型]該模型將每個詞語映射到一個固定大小的向量。word2vecmodel使用文檔中每個詞語的平均數(shù)來將文檔轉(zhuǎn)換為向量,然后這個向量可以作為預(yù)測的特征,來計算文檔相似度計算等等。"""from pyspark.ml.feature import Word2Vecfrom pyspark.sql.functions import split# Input data: Each row is a bag of words from a sentence or document.df = df.withColumn("words",split(df[inputCol],' '))word2VecX = Word2Vec(vectorSize = vectorSize,minCount = minCount,inputCol = "words",outputCol = outputCol,numPartitions = numPartitions,stepSize = stepSize,maxIter = maxIter,seed = seed,windowSize = windowSize,maxSentenceLength = maxSentenceLength)w2vModel = word2VecX.fit(df)w2vRes = w2vModel.transform(df).drop('words')return w2vRes上述代碼輸出結(jié)果如下:
Word2Vec Output +-----+--------------------+--------------------+ | id| sentence| w2v| +-----+--------------------+--------------------+ | zhu|Hi I heard about ...|[0.08936496693640...| |xiang|I wish python cou...|[7.36715538161141...| | yu|Logistic regressi...|[-0.0063562680035...| +-----+--------------------+--------------------+4.3 CountVectorizer
Countvectorizer旨在通過計數(shù)來將一個文檔轉(zhuǎn)換為向量。
def CountVectorizer(df,inputCol="sentence",outputCol="cv",vectorSize=200000, minCount=1.0):"""Countvectorizer旨在通過計數(shù)來將一個文檔轉(zhuǎn)換為向量。當(dāng)不存在先驗字典時,Countvectorizer可作為Estimator來提取詞匯,并生成一個Countvectorizermodel。該模型產(chǎn)生文檔關(guān)于詞語的稀疏表示,其表示可以傳遞給其他算法如LDA。#在fitting過程中,countvectorizer將根據(jù)語料庫中的詞頻排序選出前vocabsize個詞。一個可選的參數(shù)minDF也影響fitting過程中,它指定詞匯表中的詞語在文檔中最少出現(xiàn)的次數(shù)。另一個可選的二值參數(shù)控制輸出向量,如果設(shè)置為真那么所有非零的計數(shù)為1。這對于二值型離散概率模型非常有用。"""from pyspark.ml.feature import CountVectorizerfrom pyspark.sql.functions import splitdf = df.withColumn("words",split(df[inputCol],' '))CountVectorizerX = CountVectorizer(inputCol="words", outputCol=outputCol, vocabSize=vectorSize, minDF=minCount)cvModelX = CountVectorizerX.fit(df)cvRes = cvModelX.transform(df).drop('words')return cvRes上述代碼輸出結(jié)果如下:
CountVectorizer Output +-----+------------------------------------+----------------------------------------------------+ |id |sentence |cv | +-----+------------------------------------+----------------------------------------------------+ |zhu |Hi I heard about pySpark |(16,[0,2,4,12,13],[1.0,1.0,1.0,1.0,1.0]) | |xiang|I wish python could use case classes|(16,[0,3,5,6,8,10,14],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])| |yu |Logistic regression models are neat |(16,[1,7,9,11,15],[1.0,1.0,1.0,1.0,1.0]) | +-----+------------------------------------+----------------------------------------------------+4.4 OneHotEncoder
將類別特征映射為二進制向量,其中只有一個有效值(為1,其余為0)。
def OneHotEncoder(df,inputCol="category",outputCol="categoryVec"):"""將類別特征映射為二進制向量,其中只有一個有效值(為1,其余為0)。"""from pyspark.ml.feature import OneHotEncoder, StringIndexerstringIndexerX = StringIndexer(inputCol=inputCol, outputCol="categoryIndex")modelX = stringIndexerX.fit(df)indexed = modelX.transform(df)encoderX = OneHotEncoder(inputCol="categoryIndex", outputCol=outputCol)encodedX = encoderX.transform(indexed).drop("categoryIndex")return encodedX上述代碼輸出結(jié)果如下:
OneHotEncoder Output +-------+--------+-------------+-------------+ | id|category|categoryIndex| categoryVec| +-------+--------+-------------+-------------+ | zhu| a| 0.0|(2,[0],[1.0])| |xiangyu| b| 2.0| (2,[],[])| | yu| c| 1.0|(2,[1],[1.0])| | is| a| 0.0|(2,[0],[1.0])| | coming| a| 0.0|(2,[0],[1.0])| | now| c| 1.0|(2,[1],[1.0])| +-------+--------+-------------+-------------+4.5 StringIndexer
將標(biāo)簽索引化,然后索引數(shù)值根據(jù)標(biāo)簽出現(xiàn)的頻率進行排序。
def StringIndexer(df,inputCol="category",outputCol="categoryVec"):"""將標(biāo)簽索引化,然后索引數(shù)值根據(jù)標(biāo)簽出現(xiàn)的頻率進行排序。"""from pyspark.ml.feature import StringIndexerindexerX = StringIndexer(inputCol=inputCol, outputCol=outputCol)indexedX = indexerX.fit(df).transform(df)return indexedX上述代碼輸出結(jié)果如下:
StringIndexer Output +-----+--------------------+-----------+ | id| sentence|categoryVec| +-----+--------------------+-----------+ | zhu|Hi I heard about ...| 2.0| |xiang|I wish python cou...| 0.0| | yu|Logistic regressi...| 1.0| +-----+--------------------+-----------+4.6 IndexToString
與StringIndexer對應(yīng),IndexToString將索引化標(biāo)簽還原成原始字符串。
def IndexToString(df,inputCol="categoryVec",outputCol="category"):"""與StringIndexer對應(yīng),IndexToString將索引化標(biāo)簽還原成原始字符串。"""from pyspark.ml.feature import IndexToStringconverterX = IndexToString(inputCol=inputCol, outputCol=outputCol)convertedX = converterX.transform(df)return convertedX上述代碼輸出結(jié)果如下:
IndexToString Output IndexToString(StringIndexer(df,"sentence")) +-----+--------------------+-----------+--------------------+ | id| sentence|categoryVec| category| +-----+--------------------+-----------+--------------------+ | zhu|Hi I heard about ...| 2.0|Hi I heard about ...| |xiang|I wish python cou...| 0.0|I wish python cou...| | yu|Logistic regressi...| 1.0|Logistic regressi...| +-----+--------------------+-----------+--------------------+4.7 PCA
主成分分析是一種對數(shù)據(jù)進行旋轉(zhuǎn)變換的統(tǒng)計學(xué)方法,其本質(zhì)是在線性空間中進行一個基變換,使得變換后的數(shù)據(jù)投影在一組新的"坐標(biāo)軸"上的方差最大化,隨后,裁剪掉變換后方差很小的"坐標(biāo)軸",剩下的新的"坐標(biāo)軸"即被稱為主成分,它們可以再一個較低維度的子空間中盡可能地表示原有數(shù)據(jù)的性質(zhì)。
PCA Input +--------------------+ | features| +--------------------+ | (5,[1,3],[1.0,7.0])| |[2.0,0.0,3.0,4.0,...| |[4.0,0.0,0.0,6.0,...| +--------------------+上述代碼輸出結(jié)果如下:
def PCA(df,vectorSize=3, inputCol="features", outputCol="pcaFeatures"):"""主成分分析是一種對數(shù)據(jù)進行旋轉(zhuǎn)變換的統(tǒng)計學(xué)方法,其本質(zhì)是在線性空間中進行一個基變換,使得變換后的數(shù)據(jù)投影在一組新的"坐標(biāo)軸"上的方差最大化,隨后,裁剪掉變換后方差很小的"坐標(biāo)軸",剩下的新的"坐標(biāo)軸"即被稱為主成分,它們可以再一個較低維度的子空間中盡可能地表示原有數(shù)據(jù)的性質(zhì)。"""from pyspark.ml.feature import PCApcaX = PCA(k=vectorSize, inputCol=inputCol, outputCol=outputCol)modelX = pcaX.fit(df)pcaRes = modelX.transform(df)return pcaRes上述代碼輸出結(jié)果如下:
PCA Output +-----------------------------------------------------------+ |pcaFeatures | +-----------------------------------------------------------+ |[1.6485728230883807,-4.013282700516296,-5.524543751369388] | |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]| |[-6.428880535676489,-5.337951427775355,-5.524543751369389] | +-----------------------------------------------------------+4.8 Binarizer
把數(shù)值型特征值轉(zhuǎn)化成二進制(0/1)輸出,設(shè)置一個閾值,大于閾值的輸出1,小于閾值的輸出0。
def Binarizer(df,threshold=0.5, inputCol="feature", outputCol="binarized_feature"):"""把數(shù)值型特征值轉(zhuǎn)化成二進制(0/1)輸出,設(shè)置一個閾值,大于閾值的輸出1,小于閾值的輸出0"""from pyspark.ml.feature import BinarizerbinarizerX = Binarizer(threshold=threshold, inputCol=inputCol, outputCol=outputCol)binarizedX = binarizerX.transform(df)return binarizedX上述代碼輸出結(jié)果如下:
Binarizer Output +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 0.1| 0.0| | 1| 0.8| 1.0| | 2| 0.2| 0.0| +---+-------+-----------------+4.9 Tokenizer
分詞器:提供默認分詞,也提供正則表達式分詞
def Tokenizer(df,inputCol="sentence", outputCol="words", pattern="\\W"):"""分詞器:提供默認分詞,也提供正則表達式分詞"""from pyspark.ml.feature import RegexTokenizerregexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=outputCol, pattern=pattern)regexTokenized = regexTokenizer.transform(df)return regexTokenized上述代碼輸出結(jié)果如下:
Tokenizer Output +-----------------------------------+------------------------------------------+------+ |sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ |Hi I heard about Spark |[hi, i, heard, about, spark] |5 | |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 | +-----------------------------------+------------------------------------------+------+4.10 StopWordsRemover
停用詞過濾
def StopWordsRemover(df,inputCol="words", outputCol="words2",add_stopwords=[]):"""停用詞過濾"""from pyspark.ml.feature import StopWordsRemoverremover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol).setStopWords(add_stopwords)# 添加停用詞# remover = remover.setStopWords(Array("saw","Mary"))removed = remover.transform(df)return removed上述代碼輸出結(jié)果如下:
StopWordsRemover Output +---+----------------------------+--------------------+ |id |words |words2 | +---+----------------------------+--------------------+ |0 |[I, saw, the, red, balloon] |[saw, red, balloon] | |1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]| +---+----------------------------+--------------------+4.11 NGram
把單詞轉(zhuǎn)成一個個連續(xù)詞輸出。
def NGram(df,n=2, inputCol="words", outputCol="ngrams"):"""把單詞轉(zhuǎn)成一個個連續(xù)詞輸出"""from pyspark.ml.feature import NGramngram = NGram(n=2, inputCol=inputCol, outputCol=outputCol)ngramDF = ngram.transform(df)return ngramDF上述代碼輸出結(jié)果如下:
NGram Output +---+--------------------------------------------+----------------------------------------------------------------------+ |id |words |ngrams | +---+--------------------------------------------+----------------------------------------------------------------------+ |0 |[Hi, I, heard, about, Spark] |[Hi I, I heard, heard about, about Spark] | |1 |[I, wish, python, could, use, case, classes]|[I wish, wish python, python could, could use, use case, case classes]| |2 |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat] | +---+--------------------------------------------+----------------------------------------------------------------------+4.12 DCT
離散余弦變換是將時域的N維實數(shù)序列轉(zhuǎn)換成頻域的N維實數(shù)序列的過程(有點類似離散傅里葉變換)。
def DCT(df, inverse=False, inputCol="features", outputCol="featuresDCT"):"""離散余弦變換是將時域的N維實數(shù)序列轉(zhuǎn)換成頻域的N維實數(shù)序列的過程(有點類似離散傅里葉變換)。"""from pyspark.ml.feature import DCTdct = DCT(inverse=inverse, inputCol=inputCol, outputCol=outputCol)dctDf = dct.transform(df)return dctDf4.13 ChiSqSelector
ChiSqSelector代表卡方特征選擇。ChiSqSelector根據(jù)獨立卡方檢驗,然后選取類別標(biāo)簽主要依賴的特征。 selectorType Supported options: numTopFeatures (default), percentile and fpr. - 1、numTopFeatures:通過卡方檢驗選取最具有預(yù)測能力的Top(num)個特征 - 2、percentile:類似于上一種方法,但是選取一小部分特征而不是固定(num)個特征 - 3、fpr:選擇P值低于門限值的特征,這樣就可以控制false positive rate來進行特征選擇
def ChiSqSelector(df, featuresCol='features', labelCol='label',numTopFeatures=50,outputCol="selectedFeatures",selectorType='numTopFeatures', percentile=0.1, fpr=0.05):"""ChiSqSelector代表卡方特征選擇。ChiSqSelector根據(jù)獨立卡方檢驗,然后選取類別標(biāo)簽主要依賴的特征。"""# selectorType Supported options: numTopFeatures (default), percentile and fpr.# 1、numTopFeatures:通過卡方檢驗選取最具有預(yù)測能力的Top(num)個特征# 2、percentile:類似于上一種方法,但是選取一小部分特征而不是固定(num)個特征# 3、fpr:選擇P值低于門限值的特征,這樣就可以控制false positive rate來進行特征選擇from pyspark.ml.feature import ChiSqSelectorselector = ChiSqSelector(numTopFeatures = numTopFeatures,featuresCol = featuresCol,outputCol = outputCol,labelCol = labelCol,selectorType = selectorType,percentile = percentile,fpr = fpr)result = selector.fit(df).transform(df)print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())return result上述代碼輸出結(jié)果如下:
# ChiSqSelector Output with top 1 features selected +---+------------------+-------+----------------+ | id| features| label|selectedFeatures| +---+------------------+-------+----------------+ | 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]| | 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]| | 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]| +---+------------------+-------+----------------+4.14 PearsonCorr
皮爾遜相關(guān)系數(shù)( Pearson correlation coefficient) 用于度量兩個變量X和Y之間的相關(guān)(線性相關(guān)),其值介于-1與1之間。
def PearsonCorr(df,featureCol='feature',labelCol='label'):"""皮爾遜相關(guān)系數(shù)( Pearson correlation coefficient)用于度量兩個變量X和Y之間的相關(guān)(線性相關(guān)),其值介于-1與1之間。"""return df.corr(featureCol,labelCol,method=None)總結(jié)
以上是生活随笔為你收集整理的推荐算法工程笔记:PySpark特征工程入门总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2020数字中国创新大赛-智能算法赛-冠
- 下一篇: 推荐算法炼丹笔记:电商搜索推荐业务词汇表