使用spark ml pipeline进行机器学习
一、關(guān)于spark ml?pipeline與機(jī)器學(xué)習(xí)
一個(gè)典型的機(jī)器學(xué)習(xí)構(gòu)建包含若干個(gè)過程
1、源數(shù)據(jù)ETL
2、數(shù)據(jù)預(yù)處理
3、特征選取
4、模型訓(xùn)練與驗(yàn)證
以上四個(gè)步驟可以抽象為一個(gè)包括多個(gè)步驟的流水線式工作,從數(shù)據(jù)收集開始至輸出我們需要的最終結(jié)果。因此,對以上多個(gè)步驟、進(jìn)行抽象建模,簡化為流水線式工作流程則存在著可行性,對利用spark進(jìn)行機(jī)器學(xué)習(xí)的用戶來說,流水線式機(jī)器學(xué)習(xí)比單個(gè)步驟獨(dú)立建模更加高效、易用。
受?scikit-learn?項(xiàng)目的啟發(fā),并且總結(jié)了MLlib在處理復(fù)雜機(jī)器學(xué)習(xí)問題的弊端(主要為工作繁雜,流程不清晰),旨在向用戶提供基于DataFrame 之上的更加高層次的 API 庫,以更加方便的構(gòu)建復(fù)雜的機(jī)器學(xué)習(xí)工作流式應(yīng)用。一個(gè)pipeline 在結(jié)構(gòu)上會(huì)包含一個(gè)或多個(gè)Stage,每一個(gè) Stage 都會(huì)完成一個(gè)任務(wù),如數(shù)據(jù)集處理轉(zhuǎn)化,模型訓(xùn)練,參數(shù)設(shè)置或數(shù)據(jù)預(yù)測等,這樣的Stage 在 ML 里按照處理問題類型的不同都有相應(yīng)的定義和實(shí)現(xiàn)。兩個(gè)主要的stage為Transformer和Estimator。Transformer主要是用來操作一個(gè)DataFrame 數(shù)據(jù)并生成另外一個(gè)DataFrame 數(shù)據(jù),比如svm模型、一個(gè)特征提取工具,都可以抽象為一個(gè)Transformer。Estimator 則主要是用來做模型擬合用的,用來生成一個(gè)Transformer。可能這樣說比較難以理解,下面就以一個(gè)完整的機(jī)器學(xué)習(xí)案例來說明spark ml pipeline是怎么構(gòu)建機(jī)器學(xué)習(xí)工作流的。
二、使用spark ml pipeline構(gòu)建機(jī)器學(xué)習(xí)工作流
在此以Kaggle數(shù)據(jù)競賽Display Advertising Challenge的數(shù)據(jù)集(該數(shù)據(jù)集為利用用戶特征進(jìn)行廣告點(diǎn)擊預(yù)測)開始,利用spark ml pipeline構(gòu)建一個(gè)完整的機(jī)器學(xué)習(xí)工作流程。
Display Advertising Challenge的這份數(shù)據(jù)本身就不多做介紹了,主要包括3部分,numerical型特征集、Categorical類型特征集、類標(biāo)簽。
首先,讀入樣本集,并將樣本集劃分為訓(xùn)練集與測試集:
???????//使用file標(biāo)記文件路徑,允許spark讀取本地文件
????????String fileReadPath = "file:\\D:\\dac_sample\\dac_sample.txt";
????????//使用textFile讀入數(shù)據(jù)
????????SparkContext sc = Contexts.sparkContext;
????????RDD<String> file = sc.textFile(fileReadPath,1);
????????JavaRDD<String> sparkContent = file.toJavaRDD();
????????JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
????????????public Row call(String string) {
????????????????String tempStr = string.replace("\t",",");
????????????????String[] features = tempStr.split(",");
????????????????int intLable= Integer.parseInt(features[0]);
????????????????String intFeature1 ?= features[1];
????????????????String intFeature2 ?= features[2]; ???????????????String CatFeature1 = features[14];
????????????????String CatFeature2 = features[15];
????????????????return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
????????????}
????????});
?
?
????????double[] weights = {0.8, 0.2};
????????Long seed = 42L;
????????JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);
得到樣本集后,構(gòu)建出?DataFrame格式的數(shù)據(jù)供spark ml pipeline使用:
????????List<StructField> fields = new ArrayList<StructField>();
????????fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
????????fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
????????//and so on
?
?
????????StructType schema = DataTypes.createStructType(fields);
????????DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//訓(xùn)練數(shù)據(jù)
????????dfTrain.registerTempTable("tmpTable1");
????????DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//測試數(shù)據(jù)
????????dfTest.registerTempTable("tmpTable2");
由于在dfTrain、dfTest中所有的特征目前都為string類型,而機(jī)器學(xué)習(xí)則要求其特征為numerical類型,在此需要對特征做轉(zhuǎn)換,包括類型轉(zhuǎn)換和缺失值的處理。
首先,將intFeature由string轉(zhuǎn)為double,cast()方法將表中指定列string類型轉(zhuǎn)換為double類型,并生成新列并命名為intFeature1Temp,
之后,需要?jiǎng)h除原來的數(shù)據(jù)列 并將新列重命名為intFeature1,這樣,就將string類型的特征轉(zhuǎn)換得到double類型的特征了。
? ? ? ? //Cast integer features from String to Double
? ? ? ?dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
? ? ? ?dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");
如果intFeature特征是年齡或者特征等類型,則需要進(jìn)行分箱操作,將一個(gè)特征按照指定范圍進(jìn)行劃分:
? ? ? ? /*特征轉(zhuǎn)換,部分特征需要進(jìn)行分箱,比如年齡,進(jìn)行分段成成年未成年等 */
? ? ? ? double[] splitV = {0.0,16.0,Double.MAX_VALUE};
? ? ? ? Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);
再次,需要將categorical 類型的特征轉(zhuǎn)換為numerical類型。主要包括兩個(gè)步驟,缺失值處理和編碼轉(zhuǎn)換。
缺失值處理方面,可以使用全局的NA來統(tǒng)一標(biāo)記缺失值:
? ? ? ? /*將categoricalb類型的變量的缺失值使用NA值填充*/
? ? ? ? String[] strCols = {"CatFeature1","CatFeature2"};
? ? ? ? dfTrain = dfTrain.na().fill("NA",strCols);
? ? ? ? dfTest = dfTest.na().fill("NA",strCols);
缺失值處理完成之后,就可以正式的對categorical類型的特征進(jìn)行numerical轉(zhuǎn)換了。在spark ml中,可以借助StringIndexer和oneHotEncoder完成
這一任務(wù):
? ? ? ? // StringIndexer ?oneHotEncoder 將 categorical變量轉(zhuǎn)換為 numerical 變量
? ? ? ? // 如某列特征為星期幾、天氣等等特征,則轉(zhuǎn)換為七個(gè)0-1特征
? ? ? ? StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
? ? ? ? OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
? ? ? ? StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
? ? ? ? OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");
至此,特征預(yù)處理步驟基本完成了。由于上述特征都是處于單獨(dú)的列并且列名獨(dú)立,為方便后續(xù)模型進(jìn)行特征輸入,需要將其轉(zhuǎn)換為特征向量,并統(tǒng)一命名,
可以使用VectorAssembler類完成這一任務(wù):
? ? ? ? /*轉(zhuǎn)換為特征向量*/
? ? ? ? String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
? ? ? ? VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");
通常,預(yù)處理之后獲得的特征有成千上萬維,出于去除冗余特征、消除維數(shù)災(zāi)難、提高模型質(zhì)量的考慮,需要進(jìn)行選擇。在此,使用卡方檢驗(yàn)方法,
利用特征與類標(biāo)簽之間的相關(guān)性,進(jìn)行特征選取:
? ? ? ? /*特征較多時(shí),使用卡方檢驗(yàn)進(jìn)行特征選擇,主要是考察特征與類標(biāo)簽的相關(guān)性*/
? ? ? ? ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
? ? ? ? ? ? ? ? .setOutputCol("selectedFeature");
在特征預(yù)處理和特征選取完成之后,就可以定義模型及其參數(shù)了。簡單期間,在此使用LogisticRegression模型,并設(shè)定最大迭代次數(shù)、正則化項(xiàng):
? ? ? ? /* 設(shè)置最大迭代次數(shù)和正則化參數(shù) setElasticNetParam=0.0 為L2正則化 setElasticNetParam=1.0為L1正則化*/
? ? ? ? /*設(shè)置特征向量的列名,標(biāo)簽的列名*/
? ? ? ? LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
? ? ? ? ? ? ? ? .setFeaturesCol("selectedFeature").setLabelCol("lable");
在上述準(zhǔn)備步驟完成之后,就可以開始定義pipeline并進(jìn)行模型的學(xué)習(xí)了:
? ? ? ? /*將特征轉(zhuǎn)換,特征聚合,模型等組成一個(gè)管道,并調(diào)用它的fit方法擬合出模型*/
? ? ? ? PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
? ? ? ? Pipeline pipline = new Pipeline().setStages(pipelineStage);
? ? ? ? PipelineModel pModle = pipline.fit(dfTrain);
上面pipeline的fit方法得到的是一個(gè)Transformer,我們可以使它作用于訓(xùn)練集得到模型在訓(xùn)練集上的預(yù)測結(jié)果:
? ? ? ? //擬合得到模型的transform方法進(jìn)行預(yù)測
? ? ? ? DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
? ? ? ? DataFrame prediction = output.select("label", "prediction");
? ? ? ? prediction.show();
分析計(jì)算,得到模型在訓(xùn)練集上的準(zhǔn)確率,看看模型的效果怎么樣:
? ? ? ? /*測試集合上的準(zhǔn)確率*/
? ? ? ? long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("'prediction"))).count();
? ? ? ? long total = prediction.count();
? ? ? ? double accuracy = correct / (double)total;
?
? ? ? ? System.out.println(accuracy);
最后,可以將模型保存下來,下次直接使用就可以了:
? ? ? ? String pModlePath = ""file:\\D:\\dac_sample\\";
? ? ? ? pModle.save(pModlePath);
三,梳理和總結(jié):
上述,借助代碼實(shí)現(xiàn)了基于spark ml pipeline的機(jī)器學(xué)習(xí),包括數(shù)據(jù)轉(zhuǎn)換、特征生成、特征選取、模型定義及模型學(xué)習(xí)等多個(gè)stage,得到的pipeline
模型后,就可以在新的數(shù)據(jù)集上進(jìn)行預(yù)測,總結(jié)為兩部分并用流程圖表示如下:
訓(xùn)練階段:
預(yù)測階段:
借助于Pepeline,在spark上進(jìn)行機(jī)器學(xué)習(xí)的數(shù)據(jù)流向更加清晰,同時(shí)每一stage的任務(wù)也更加明了,因此,無論是在模型的預(yù)測使用上、還是
模型后續(xù)的改進(jìn)優(yōu)化上,都變得更加容易。
————————————————
版權(quán)聲明:本文為CSDN博主「大愚若智_」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/50935274
總結(jié)
以上是生活随笔為你收集整理的使用spark ml pipeline进行机器学习的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java I/O体系原理
- 下一篇: 复方栀子止痛膏_功效作用注意事项用药禁忌