Spark MLlib 机器学习
本章導讀
機器學習(machine learning, ML)是一門涉及概率論、統計學、逼近論、凸分析、算法復雜度理論等多領域的交叉學科。ML專注于研究計算機模擬或實現人類的學習行為,以獲取新知識、新技能,并重組已學習的知識結構使之不斷改善自身。
MLlib是Spark提供的可擴展的機器學習庫。MLlib已經集成了大量機器學習的算法,由于MLlib涉及的算法眾多,筆者只對部分算法進行了分析,其余算法只是簡單列出公式,讀者如果想要對公式進行推理,需要自己尋找有關概率論、數理統計、數理分析等方面的專門著作。本章更側重于機器學習API的使用,基本能夠滿足大多數讀者的需要。
1.?機器學習概率
機器學習也屬于人工智能的范疇,該領域主要研究的對象是人工智能,尤其是如何在經驗學習中改善具體算法。機器學習是人工智能研究較為年輕的分支,它的發展過程大致可分為如下4個階段:
- ?第一階段:20世紀50年代中葉至60年代中葉,屬于熱烈時期。
- 第二階段:20世紀60年代中葉至70年代中葉,稱為冷靜時期。
- 第三階段:20世紀70年代中葉至80年代中葉,稱為復興時期。
- 第四階段:從1986年開始至今。
(1)?機器學習的組成
機器學習的基本結構由環境、知識庫和執行部分三部分組成。環境向學習部分(屬于知識庫的一部分)提供某些信息,學習部分利用這些信息修改知識庫,以增進執行部分完成任務的效能,執行部分根據知識庫完成任務,同時把獲得的信息反饋給學習部分。
(2)?學習策略
學習策略是指機器學習過程中所采用的推理策略。學習系統一般由學習和環境兩部分組成。環境(如書本或教師)提供信息,學習部分則實現信息轉換、存儲,并從中獲取有用的信息。學習過程中,學生(學習部分)使用的推理越少,他對教師(環境)的依賴就越大,教師的負擔也就越重。根據學生實現信息轉換所需推理的多少和難易程度,以從簡單到復雜,從少到多的次序可以將學習策略分為以下6種基本類型:
- 機械學習(rote learning):學習者不需要任何推理或轉換,直接獲取環境所提供的信息。屬于此類的如塞繆爾的跳棋程序。
- 示教學習(learning from instruction):學習者從環境獲取信息,把知識轉換成內部可使用的表示形式,并將新知識和原有知識有機地合為一體。此種學習策略需要學生有一定程度的推理能力,但環境仍要做大量的工作。典型應用是FOO程序。
- 演繹學習(learning by deduction):學習者通過推理獲取有用的知識。典型應用是宏操作(macro-operation)學習。
- 類比學習(learning by analogy):學習者根據兩個不同領域(源域、目標域)中的知識相似性,通過類比,從源域的知識推導出目標域的相應知識。此類應用如盧瑟福類比。
- 基于解釋的學習(explanation-based learning, EBL):學習者根據教師提供的目標概念和此概念的例子、領域理論及可操作準則,首先給出解釋說明為什么該例子滿足目標概念,然后將解釋推廣未目標概念的一個滿足可操作準則的充分條件。著名的EBL系統由迪喬恩(G.DeJong)的GENESIS等。
- 歸納學習(learning from induction):由環境提供某概念的一些實例或反例,讓學習者通過歸納推理得出該概念的一般描述。歸納學習是最基本的,發展也較為成熟的學習方法,在人工智能領域中已得到廣泛的研究和應用。
學習策略還可以從所獲取知識的表示形式、應用領域等維度分類。
(3)?應用領域
目前,機器學習廣泛應用于數據挖掘、計算機視覺、自然語言處理、生物特征識別、搜索引擎、醫學診斷、檢測信用卡欺詐、證券市場分析、DNA序列測序、語音和手寫識別、戰略游戲和機器人等領域。
2.?Spark MLlib總體設計
MLlib(machine learning library)是Spark提供的可擴展的機器學習庫。MLlib中已經包含了一些通用的學習算法和工具,如:分類、回歸、聚類、協同過濾、降維以及底層的優化原語等算法和工具。
MLlib提供的API主要分為以下兩類:
- spark.mllib包中提供的主要API。
- spark.ml包中提供的構建機器學習工作流的高層次的API。
3.?數據類型
MLlib支持存儲在一臺機器上的局部向量和矩陣以及由一個或多個RDD支持的分布式矩陣。局部向量和局部矩陣是提供公共接口的簡單數據模型。Breeze和jblas提供了底層的線性代數運算。Breeze提供了一組線性代數和數字計算的庫,具體信息訪問http://www.scalanlp.org/。jblas提供了使用Java開發的線性代數庫,具體信息訪問http://jblas.org/。
3.1?局部向量
MLlib支持兩種局部向量類型:密集向量(dense)和稀疏向量(sparse)。密集向量由double類型的數組支持,而稀疏向量則由兩個平行數組支持。例如,向量(1.0,0.0,3.0)由密集向量表示的格式為[1.0,0.0,3.0],由稀疏向量表示的格式為(3,[0,2],[1.0,3.0])。
注意:這里對稀疏向量做些解釋。3是向量(1.0,0.0,3.0)的長度,除去0值外,其他兩個值的索引和值分別構成了數組[0,2]和數組[1.0,3.0]。
有關向量的類如圖所示。
Vector是所有局部向量的基類,Dense-Vector和SparseVector都是Vector的具體實現。
Spark官方推薦使用Vectors中實現的工廠方法創建局部向量,就像下面這樣:
import org.apache.spark.mllib.linalg.{Vector, Vectors} //創建密集向量(1.0, 0.0, 3.0) val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) //給向量(1.0, 0.0, 3.0)創建疏向量 val svl: Vector= Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) //通過指定非0的項目,創建稀疏向量(1.0, 0.0, 3.0) val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))注意: Scala默認會導入scala.collection.immutable.Vector,所以必須顯式導入org.apache.spark.mllib.linalg.Vector才能使用MLlib才能使用MLlib提供的Vector。
上面例子中以數組為參數,調用Vectors的sparse接口,見如下代碼。使用Seq創建稀疏向量,其本質依然是使用數組,見如下代碼。
3.2?標記點
標記點是將密集向量或者稀疏向量與應答標簽相關聯。在MLlib中,標記點用于監督學習算法。MLlib使用double類型存儲標簽,所以我們能在回歸和分類中使用標記點。如果只有兩種分類,可以使用二分法,一個標簽要么是1.0,要么是0.0。如果有很多分類,標簽應該從零開始:0、1、2....
標記點由樣例類LabeledPoint來表示,其使用方式如下。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.regression.LabeledPoint //使用標簽1.0和一個密集向量創建一個標記點 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) //使用標簽0.0和一個疏向量創建一個標記點 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))用稀疏的訓練數據做練習是很常見的,好在MLlib支持讀取存儲在LIBSVM格式中的訓練例子。LIBSVM格式是一種每一行表示一個標簽稀疏特征向量的文本格式,其格式如下:
label index1:value1 index2:value2 ...LIBSVM是林智仁教授等開發設計的一個簡單、易用和快速有效的SVM模式識別與回歸的軟件包。MLlib已經提供了MLUtils.loadLibSVMFile方法讀取存儲在LIBSVM格式文本文件中的訓練數據,見如下代碼:
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDDval examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")3.3?局部矩陣
MLlib支持數據存儲在單個double類型數組的密矩陣。先來看這樣一個矩陣:
這個矩陣是如何存儲的?它只是存儲到一維數組[1.0, 3.0, 5.0, 2.0, 4.0, 6.0],這個矩陣的尺寸是3*2,即3行2列。
有關局部矩陣的類如下圖所示。局部矩陣的基類是Matrix,目前有一個實現類DenseMatrix。Spark官方推薦使用Matrices中實現的工廠方法創建局部矩陣,例如:
import org.apache.spark.mllib.linalg.{Matrix, Matrices} //創建密矩陣((1.0,2.0),(3.0, 4.0),(5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))3.4?分布式矩陣
?分布式矩陣分布式地存儲在一個或者多個RDD中。如何存儲數據量很大的分布式矩陣?最重要的在于選擇一個正確的格式。如果將分布式矩陣轉換為不同格式,可能需要全局的shuffle,成本非常昂貴。
有關分布式矩陣的類如圖所示:
迄今為止,MLlib已經實現了4種類型的分布式矩陣:
- RowMatrix:最基本的分布式矩陣類型,是面向行且行索引無意義的分布式矩陣。RowMatrix的行實際是多個局部向量的RDD,列受限于integer的范圍大小。RowMatrix適用于列數不大以便單個局部向量可以合理地傳遞給Driver,也能在單個節點上存儲和操作的情況。
下面展示了可以使用RDD[Vector]實例來構建RowMatrix的例子。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... val mat: RowMatrix = new RowMatrix(rows) val m = mat.numRows() val n = mat.numCols()- IndexedRowMatrix:與RowMatrix類似,但卻面向索引的分布式矩陣。IndexedRowMatrix常用于識別行或者用于執行連接操作。可以使用RDD[IndexedRow]實例創建IndexedRowMatrix。IndexedRow的實現如下:
通過刪除IndexedRowMatrix的行索引,可以將IndexedRowMatrix轉換為RowMatrix。下面的例子演示了如何使用IndexedRowMatrix。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowEntry} val rows: RDD[IndexedRow] = ... val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) val m = mat.numRows() val n = mat.numCols() val rowMat: RowMatrix = mat.toRowMatrix()- CoordinateMatrix:使用坐標列表(COO)格式存儲的分布式矩陣。支持CoordinateMatrix的RDD實際是(i: Long,j: Long, value: Double)這樣的三元組,i是行索引,j是列索引,value是實際存儲的值。CoordinateMatrix適用于行和列都很大且矩陣很稀疏的情況。
可以使用RDD[MatrixEntry]實例創建CoordinateMatrix。MatrixEntry的實現如下。
@Experimental case class MatrixEntry(i: Long, j: Long, value: Double)通過調用CoordinateMatrix的toIndexedRowMatrix方法,可以將CoordinateMatrix轉換為IndexedRowMatrix。下面的例子演示了CoordinateMatrix的使用。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatriEntry] = ... val mat: CoordinateMatrix = new CoordinateMatrix(entries) val m = mat.numRows() val n = mat.numCols() val indexedRowMatrix = mat.toIndexedRowMatrix()- BlockMatrix:由RDD[MatrixBlock]支持的分布式矩陣。MatrixBlock實際是((Int, Int), Matrix)這樣的二元組,(Int, Int)是Block的索引,Matrix是記錄塊大小的子矩陣。BlockMatrix支持與其他BlockMatrix的add和multiply,還提供validate方法用于校驗當前BlockMatrix是否恰當構建。
通過調用IndexedRowRowMatrix或者CoordinateMatrix的toBlockMatrix方法,可以方便轉換為BlockMatrix。toBlockMatrix方法創建的Block的默認大小是1024 x 1024。可以使用toBlockMatrix(rowsPerBlock,colsPerBlock)方法改變Block的大小。下面的例子演示了BlockMatrix的使用。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) val matA: BlockMatrix = coordMat.toBlockMatrix().cache() matA.validate() val ata = matA.transpose.multiply(matA)注意:由于MLlib會緩存矩陣的大小,所以支持分布式矩陣的RDD必須要有明確的類型,否則會導致出錯。
4.?基礎統計
MLlib提供了很多統計方法,包括摘要統計、相關統計、分層抽樣、假設校驗、隨機數生成等。這些都涉及統計學、概率論的專業知識。
4.1?摘要統計
調用Statistics類的colStats方法,可以獲得RDD[Vector]的列的摘要統計。colStats方法返回了MultivariateStatisticalSummary對象,MultivariateStatisticalSummary對象包含了列的最大值、最小值、平均值、方差、非零元素的數量以及總數。下面的例子演示了如何使用colStats。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.stat.{MultivariateStatisticalSummary, Statistics} val observations: RDD[Vector] = ... val summary: MultivariateStatisticalSummary = Statistics.colStats(observations) println(summary.mean) //每個列值組成的密集向量 println(summary.variance) //列向量方差 println(summary.numNonzeros) //每個列的非零值個數?colStats實際使用了RowMatrix的computeColumnSummaryStatistics方法,見代碼如下:
4.2?相關統計
計算兩個序列之間的相關性是統計中通用的操作。MLlib提供了計算多個序列之間相關統計的靈活性。目前支持的關聯方法運用了皮爾森相關系數(Pearson correlation coefficient)和斯皮爾森相關系統(Spearman's rank correlation coefficient)。
1.?皮爾森相關系數
皮爾森相關系數也稱為皮爾森積矩相關系數(Pearson product-moment correlation coefficient),是一種線性相關系數。皮爾森相關系數是用來反映兩個變量線性相關程度的統計量。
相關系數用r表示,其中n為樣本量,xi,yi,sx,sy?分別為兩個變量的觀測值和均值。r描述的是兩個變量間線性相關強弱的程度。r的取值在-1與+1之間,若r>0,表明兩個變量是正相關,即一個變量的值越大,另一個變量的值也會越大;若r<0,表明兩個變量是負相關,即一個變量的值越大另一個變量的值反而會越小。r的絕對值越大表明相關性越強,要注意的是這里并不存在因果關系。若r=0,表明兩個變量間不是線性相關,但有可能是其他方式的相關(比如曲線方式)。
2.?斯皮爾森秩相關系數
斯皮爾森秩相關系數也稱為Spearman的p,是由Charles Spearman命名的,一般用希臘字母ps(rho)或rs表示。Spearman秩相關系數是一種無參數(與分布無關)的校驗方法,用于度量變量之間聯系的強弱。在沒有重復數據的情況下,如果一個變量是另外一個變量的嚴格單調函數,則Spearman秩相關系數就是+1或-1,稱變量完全Spearman秩相關。注意和Pearson完全相關的區別,只有當兩變量存在線性關系時,Pearson相關系數才為+1或-1。
Spearman秩相關系數為:
Statistics提供了計算序列之間相關性的方法,默認情況下使用皮爾森相關系數,使用方法如下:
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statisticsval sc: SparkContext = ... val seriesX: RDD[Double] = ... //a series val seriesY: RDD[Double] = ... //和seriesX必須有相同的分區和基數 val correlation:Double = Statistics.corr(seriesX, seriesY, "pearson") val data: RDD[Vector] = ... //每個向量必須是行,不能是列 val correlMatrix: Matrix = Statistics.corr(data, "pearson")Statistics中相關性的實現,見代碼如下:
其實質是代理了Correlations,Correlations中相關性的實現見代碼如下:
4.3?分層抽樣
分層抽樣(Stratified sampling)是先將總體按某種特征分為若干次級(層),然后再從每一層內進行獨立取樣,組成一個樣本的統計學計算方法。為了對分層抽樣有更直觀的感受,請看下面的例子:
某市現有機動車共1萬輛,其中大巴車500輛,小轎車6000輛,中拔車1000輛,越野車2000輛,工程車500輛。現在要了解這些車輛的使用年限,決定采用分層抽樣方式抽取100個樣本。按照車輛占比,各類車輛的抽樣數量分別為5,60,10,20,5.
摘要統計和相關統計都集成Statistics中,而分層抽樣只需要調用RDD[(K,V)]的sampleByKey和sampleByKeyExact即可。為了分層抽樣,其中的鍵可以被認為是標簽,值是具體的屬性。sampleByKey方法采用擲硬幣的方式來決定是否將一個觀測值作為采樣,因此需要一個預期大小的樣本數據。sampleByKeyExact則需要更多更有效的資源,但是樣本數據的大小是確定的。sampleByKeyExact方法允許用戶采用符合[fk * nk] V k ∈ K,其中fk是鍵k的函數,nk是RDD[(K,V)]中鍵為k的(K,V)對,K是鍵的集合。下例演示了如何使用分層抽樣。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext import org.apache.spark.rdd.PairRDDFunctionsval sc: SparkContext = ... val data = ... //an RDD[(K,V)] of any key value pairs val fractions: Map[K. Double] = ... //specify the exact fraction desired from each key val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)4.4?假設校驗
假設校驗(hypothesis testing)?是數理統計學中根據一定假設條件由樣本推斷總體的一種方法。
如果對總體的某種假設是真實的,那么不利于或不能支持這一假設的事件A(小概率事件)在一次試驗中幾乎不可能發生;要是在一次試驗中A竟然發生了,就有理由懷疑該假設的真實性,拒絕這一假設。小概率原理可以用圖表示。
H0表示原假設,H1表示備選假設。常見的假設校驗有如下幾種:
- 雙邊校驗:H0:u = u0,H1:u=/u0
- 右側單邊校驗:H0:u<=u0,H1:u>u0
- 左側單邊校驗:H0:u>=u0,H1:u<u0
假設校驗是一個強大的工具,無論結果是否偶然的,都可以決定結果是否具有統計特征。MLlib目前支持皮爾森卡方測試。輸入數據的類型決定了是做卡方適合度檢測還是獨立性檢測。卡方適合度檢測的輸入數據類型應當是向量,而卡方獨立性檢測需要的數據類型是矩陣。RDD[LabeledPoint]可以作為卡方檢測的輸入類型。下列演示了如何使用假設校驗。
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics._val sc: SparkContext = ... val vec: Vector = ... //事件的頻率組成的vector val goodnessOfFitTestResult = Statistics.chiSqTest(vec) println(goodnessOfFitTestResult) val mat: Matrix = ... //偶然性matrix val independenceTestResult = Statistics.chiSqTest(mat) println(independenceTestResult) val obs:RDD[LabeledPoint] = ... //(feature, label) pairs val featureTestResults: Arra[ChiSqTestResult] = Statistics.chiSqTest(obs) var i = 1 featureTestResults.foreach{ result =>println(s"Column $i:\n result")i += 1 } //summary of the test4.5?隨機數生成
隨機數可以看做隨機變量,什么是隨機變量?將一枚質地均勻的硬幣拋擲3次,記錄它的結果,有:
其中u代表正面朝上,d代表反面朝上,整個集合Ω是拋擲3次硬幣的樣本空間。正面朝上的次數可能是0,1,2,3。由于樣本空間Ω中的結果都是隨機發生的,所以出現正面的次數X是隨機的,X即為隨機變量。如果拋擲硬幣,直到出現正面的拋擲次數為Y,那么Y的取值可能是0,1,2,3...。如果隨機變量的取值是有限的(比如X)或者是可列的(比如Y),那么就稱為離散隨機變量。
剛才說的拋擲3次硬幣的情況下,使用P(X =?取值)的方式表達每種取值的概率,我們不難得出:
如果樣本空間上隨機變量的取值用x1,x2,x3,....表示,那么存在滿足p(x1) = P(X = xi)和Σip(xi) = 1?的函數p。這個函數p稱為隨機變量X的概率質量函數或者頻率函數。
如果X取值累積某個范圍的值,那么其累積分布函數定義如下:
累積分布函數滿足:
同一樣本空間上兩個離散隨機變量X和Y的可能取值分別為x1,x2,x3,...和y1,y2,y3,...,如果對所有i和j,滿足:
則X和Y是獨立的。將此定義推廣到兩個以上離散隨機變量的情形,如果對所有i, j?和?k,滿足:
則X、Y和Z是相互獨立的。
剛才所說的X、Y的取值都是離散的,還有一種情況下取值是連續的。以人的壽命為例,可以是任意的正實數值。與頻率函數相對的是密度函數?(x)。?(x)有這些性質:?(x)?≥ 0,?分段連續且∫∞-∞?(x)dx = 1。如果X是具有密度函數?的隨機變量,那么對于任意的a < b,X落在區間(a, b)上的概率是密度函數從a到b的下方面積:
隨機數生成對于隨機算法、隨機協議和隨機性能測試都很有用。MLlib支持均勻分布、標準正態分布、泊松分布等生成隨機RDD。
MLlib有關隨機數的類如圖所示:
以泊松分布為例,先看看它的數學定義。參數為λ(λ>0)的泊松頻率函數是
當λ = 0.1、1、5、10時的泊松分布如圖所示:
RandomRDDs提供了工廠方法創建RandomRDD和RandomVectorRDD。下面的例子中生成了一個包含100萬個double類型隨機數的RDD[double],其值符合標準正太分布N(0,1),分布于10個分區,然后將其映射到N(1, 4)。
import org.apache.spark.SparkContext import org.apache.spark.mllib.random.RandomRDDs._val sc: SparkContext = ... val u = normalRDD(sc, 1000000L, 10) val v = u.map(x => 1.0 + 2.0 * x)5. 分類和回歸
MLlib支持多種多樣的分析方法,例如,二元分類、多元分類和回歸。表11-1列出了各類問題的支持算法。
5.1?數學公式
許多標準的機器學習方法都可以配制成凸優化問題,即找到一個極小的凸函數?依賴于一個d項的可變向量w。形式上,我們可以寫為優化問題minwεR d?(w),其中所述目標函數的形式為:
這里的向量xi?ε Rd?是訓練數據,1?≤?i?≤ n?并且yi?ε??Rd?是想要預測數據的相應的標簽。如果L(w; xi, yi)能表示為?wτX和y的函數,我們就說這個方法是線性的。幾個MLlib的分類和回歸算法都屬于這一類,并在這里討論。
目標函數?有兩個部分:控制該模型的復雜的正則化部分和用于在訓練數據上測量模型的誤差的損失部分。損失函數L(w)是典型的基于w的凸函數。固定的正則化參數λ?≥ 0定義了最小損失(即訓練誤差)和最小化模型的復雜性(即避免過度擬合)這兩個目標之間的權衡。
?(1)?損失函數
在統計學,統計決策理論和經濟學中,損失函數是指一種將一個事件(在一個樣本空間中的一個元素)映射到一個表達與其事件相關的經濟成本和機會成本的實數上的一種函數。通常而言,損失函數由損失項和正則項組成。表11-2列出了常用的損失函數。
這里對表11-2中的一些內容做些說明:
- Hinge loss:常用于軟間隔支持向量機的損失函數;
- Logistic loss:常用于邏輯回歸的損失函數;
- Squared loss:常用于最小二乘的損失函數;
- Gradient or sub gradient:梯度與次梯度
(2)?正規化
正規化的目的是鼓勵簡單的模型,并避免過度擬合。MLlib支持以下正規化,如表11-3所示:
這里的sign(w)是由向量w中所有項的符號(±1)組成的向量。平滑度L2正規化問題一般比L1正規化容易解決。然而L1正規化能幫助促進稀疏權重,導致更小、更可解釋的模型,其中后者于特征選擇是有用的。沒有任何正規化,特別是當訓練實例的數目是小的,不建議訓練模型。
(3)?優化
線性方法使用凸優化來優化目標函數。MLlib使用兩種方法:新元和L-BFGS來描述優化部分。目前,大多數算法的API支持隨機梯度下降(SGD),并有一些支持L-BFGS。
5.2?線性回歸
線性回歸是一類簡單的指導學習方法。線性回歸是預測定量響應變量的有用工具。很多統計學習方法都是從線性回歸推廣和擴展得到的,所以我們有必要重點理解它。
1.簡單線性回歸
簡單線性回歸非常簡單,只根據單一的預測變量X預測定量響應變量Y。它假定X與Y之間存在線性關系。其數學關系如下:
≈表示近似。這種線性關系可以描述為Y對X的回歸。β0和β1是兩個未知的常量,被稱為線性模型的系數,它們分別表示線性模型中的截距和斜率。
β0和β1怎么得到呢?通過大量樣本數據估算出估計值。假如樣本數據如下:
(x1, y1),(x2, y2),....,(x3, y3)
此時問題轉換為在坐標中尋找一條與所有點的距離最大程度接近的直線問題,如圖11-7所示
使用最小二乘方法最終求得的估計值(β’0,β’1)。
實際情況,所有的樣本或者真實數據不可能真的都在一條直線上,每個坐標都會有誤差,所以可以表示為如下關系:
上式也稱為總體回歸直線,是對X和Y之間真實關系的最佳線性近似。
2.多元線性回歸
相比簡單線性回歸,實踐中常常不止一個預測變量,這就要求對簡單線性回歸進行擴展。雖然可以給每個預測變量單獨建立一個簡單線性回歸模型,但無法做出單一的預測。更好的方法是擴展簡單線性回歸模型,使它可以直接包含多個預測變量。一般情況下,假設有p個不同的預測變量,多元線性回歸模型為:
其中Xj代表第j個預測變量,βj代表第j個預測變量和響應變量之間的關聯。
5.3?分類
5.2節的線性回歸模型中假設響應變量Y是定量的,但很多時候,Y卻是定性的。比如杯子的材質是定性變量,可以是玻璃、塑料或不銹鋼等。定性變量也叫分類變量。預測定性響應值是指對觀測分類。
分類的目標是劃分項目分類。最常見的分類類型是二元分類,二元分類有兩種分類,通常命名為正和負。如果有兩個以上的分類,它被稱為多元分類。MLlib支持兩種線性方法分類:線性支持向量機和邏輯回歸。線性支持向量機僅支持二元分類,而邏輯回歸對二元分類和多元分類都支持。對于這兩種方法,MLlib支持L1和L2正規化變體。MLlib中使用RDD[LabeledPoint]代表訓練數據集,其中標簽引從0開始,如0,1,2,...。對于二元標簽γ在MLlib中使用0表示負,使用+1表示正。
1.線性支持向量機
線性支持向量機(SVM)是用于大規模分類任務的標準方法。正是在介紹損失函數時提到的:
默認情況下,線性支持向量機使用L2正規化訓練。MLlib也支持選擇L1正規化,在這種情況下,問題就變成了線性問題。線性支持向量機算法輸出SVM模型。給定一個新的數據點,記為X,該模型基于wτx的值做預測。默認情況下,如果wτx?≥ 0則結果是正的,否則為負。
下例展示了如何加載樣本數據集,執行訓練算法。
import org.apache.spark.mllib.classification.{SVMModel,SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils //加載LIBSVM格式的訓練數據 val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsbvm_data.txt") //將數據切分為訓練數據(60%)和測試數據(40%) val splits = data.randomSplit(Array(0.6,0.4),seed = 11L) val training = splits(0).cache() val test = splits(1) //運行訓練算法構建模型 val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) //在測試數據上計算原始分數 val scoreAndLabels = test.map{ point => val score = model.predict(point.features)(score, point.label) } //獲取評估指標 val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC) //保存和加載模型 model.save(sc, "myModelPath") val sameModel = SVMModel.load(sc, "myModelPath")SVMWithSGD.train默認執行L2正規化,可以設置正則化參數為1.0為執行L1正規化。配置及優化SVMWithSGD的代碼如下:
import org.apache.spark.mllib.optimization.L1Updater val svmAlg = new SVMWithSGD() svmAlg.optimizer.setNumIterations(200).setRegParam(0.1).setUpdater(new L1Updater) val modelL1 = svmAlg.run(training)2.邏輯回歸
?邏輯回歸被廣泛用于預測二元響應。它正是在介紹損失函數時提到的:
對于二元分類問題,該算法輸出二元邏輯回歸模型。給定一個新的數據點,記為X,該模型基于應用邏輯函數
做預測,其中z =?wτx。默認情況下,如果?(wτx) > 0.5,輸出為正,否則為負。雖然不像線性支持向量機,邏輯回歸模型中,?(z)的原始輸出具有一概率解釋(即X是正概率)。
二元邏輯回歸可以推廣到多元邏輯回歸來訓練和預測多元分類問題。對于多元分類問題,該算法將輸出一個多元邏輯回歸模型,其中包含K-1個二元邏輯回歸模型。MLlib實現了兩種算法來解決邏輯回歸分析:小批量梯度下降和L-BFGS。Spark官方推薦L-BFGS,因為它比小批梯度下降的收斂更快。
下例演示了如何使用邏輯回歸。
//運行訓練算法構建模型 val model = new LogisticRegressionWithLBFGS().setNumClasses(10).run(training) //在測試數據上計算原始分數 val predictionAndLabels = test.map{ case LabeledPoint(label, features) => val prediction = model.predict(features)(prediction, label) } //獲取評估指標 val metrics = new MulticlassMetrics(predictionAndLabels) val precision =metrics.precision println("Precision = " + precision) //保存和加載模型 model.save(sc, "myModelPath") val samleModel = LogisticRegressionModel.load(sc, "myModelPath")5.4?回歸
1.線性最小二乘、套索和嶺回歸
線性最小二乘公式是回歸問題最常見的公式。在介紹損失函數時也提到過它的公式:
多種多樣的回歸方法通過使用不同的正規化類型,都派生自線性最小二乘。例如,普通最小二乘或線性最小二乘使用非正規化:嶺回歸使用L2正規化;套索使用L1正規化。對于所有這些模型的損失和訓練誤差:
就是均方誤差。
下面的例子演示了如何使用線性回歸。
//加載解析數據 val data = sc.textFile("data/mllib/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',')LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))).cache() //構建模型 val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) //使用訓練樣本計算模型并且計算訓練誤差 val valueAndPreds = parsedData.map { point =>val prediction = model.predict(point.features)(point.label, prediction) } val MSE = valueAndPreds.map{case(v, p) => math.pow((v- p), 2)}.mean() println("training Mean Squared Error = " + MSE) //保存與加載模型 model.save(sc, "myModelPath") val sameModel = LinearRegressionModel.load(sc, "myModelPath") }2.流線性回歸
流式數據可以適用于線上的回歸模型,每當有新數據到達時,更新模型的參數。MLlib目前使用普通最小二乘法支持流線性回歸。除了每批數據到達時,模型更新最新的數據外,實際與線下的執行是類似的。
下面的例子,假設已經初始化好了StreamingContext ssc來演示流線性回歸。
val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()6.?決策樹
決策樹是分類和回歸的機器學習任務中常用的方法。決策樹廣泛使用,因為它們很容易解釋,處理分類的功能,延伸到多元分類設置,不需要縮放功能,并能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持決策樹的二元和多元的分類和回歸。通過行實現分區數據,允許分布式訓練數以百萬計的實例。
6.1?基本算法
決策樹是一個貪心算法,即在特性空間上執行遞歸的二元分割。決策樹為每個最底部(葉)分區預測相同的標簽。為了在每個樹節點上獲得最大的信息,每個分區是從一組可能的劃分中選擇的最佳分裂。
1.節點不純度和信息增益
節點不純度是節點上標簽的均勻性的量度。當前實現提供了兩種分類不純度測量的方法(基尼不純度和嫡)和一種回歸不純度測量的方法(方差),如表11-4所示:
信息增益是父節點不純度與兩個子節點不純度的加權總和之間的差。假設將有s個分區,大小為N的數據集D劃分為兩個數據集Dleft和Dright,那么信息增益為:
2.劃分候選人
(1)?連續特征
對于單機上實現的小數據集,給每個連續特征劃分的候選人在此特征上有唯一值。有些實現了對特征值排序,為了加速計算,使用這些有序的唯一值劃分候選人。對于大的分布式數據集,排序是很昂貴的。通過在樣本數據分數上執行位計算,實現了計算近似的劃分候選人集合。有序劃分創建了“箱”,可使用maxBins參數指定這樣的容器的最大數量。
(2)?分類特征
對于有M種可能值的分類特征,將會有2M-1-1個劃分候選人。對于二元(0/1)分類和回歸,我們可以通過平均標簽排序的分類特征值,減少劃分候選人至M-1多得數據量。例如,對于一個有A、B和C三個分類的分類特征的二元分類問題,其相應的標簽1的比例是0.2,0.6和0.4時,分類特征是有序的A、C、B。兩個劃分候選人分別是A|C,B和A,C|B,其中|標記劃分。
在多元分類中共有2M-1-1種可能的劃分,無論何時都可能被使用。當2M-1-1比參數maxBins大時,我們使用與二元分類和回歸相類似的方法。M種分類特征用不純度排序,最終得到需要考慮的M-1個劃分候選人。
3.停止規則
遞歸樹的構建當滿足下面三個條件之一時會停在一個節點。
- 節點的深度與maxBins相等;
- 沒有劃分候選人導致信息增益大于minInfoGain;
- 沒有劃分候選人產生的子節點都至少有minInstancesPerNode個訓練實例。
6.2?使用例子
下面的例子演示了使用基尼不純度作為不純度算法且樹深為5的決策樹執行分類。
import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.util.MLUtils val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") val splits = data.randomSplit(Array(0.7, 0.3)) //訓練決策樹模型 //空categoricalFeaturesInfo說明所有的特征是連續的 val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 5 val maxBins = 32 // trainingData是訓練數據 val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) //在測試實例上計算 val labelAndPreds = testData.map{ point =>val prediction = model.predict(point.features)(point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification tree model: \n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = DecisionTreeModel.load(sc, "myModelPath")下面的例子演示了使用方差作為不純度算法且樹深為5的決策樹執行分類
val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "variance" val maxDepth =5 val maxBins = 32 val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity, maxDepth, maxBins) val labelsAndPredictions = testData.map{ point =>val prediction = model.predict(point.features)(point.label, prediction) } val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean() println("Test Mean Squared Error =" + testMSE) println("Learned regression tree model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = DecisionTreeModel.load(sc, "myModelPath")7.?隨機森林
?合奏是一個創建由其他模型的集合組合而成的模型的學習算法。MLlib支持兩個主要的合奏算法:梯度提升決策樹和隨機森林,它們都使用決策樹作為其基礎模型。梯度提升決策樹和隨機森林雖然都是決策樹合奏的學習算法,但是訓練過程是不同的。關于合奏有以下幾個權衡點:
- GBT每次都要訓練一顆樹,所以它們比隨機森林需要更長的時間來訓練。隨機森林可以平行地訓練多棵樹。另一方面,GBT往往比隨機森林更合理地使用更小(淺)的樹并且訓練小樹會花費更少的時間。
- 隨機森林更不易發生過度擬合。隨機森林訓練更多的樹會減少多半的過度擬合,而GBT訓練更多的樹會增加過度擬合。(在統計語言中,隨機森林通過使用更多的樹木減少方差,而GBT通過使用更多的樹木減少偏差。)
- 隨機森林可以更容易調整,因為性能與樹木的數量是單調增加的。但如果GBT樹木的數量增長過大,性能可能開始下降。
總之,兩種算法都是有效的,具體選擇應取決于特定的數據集。
隨機森林是分類與回歸中最成功的機器學習模型之一。為了減少過度擬合的風險,隨機森林將很多決策樹結合起來。和決策樹相似,隨機森林處理分類的功能,延伸到多元分類設置,不需要縮放功能,并能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持隨機森林的二元和多元的分類和回歸。
7.1?基本算法
隨機森林訓練一個決策樹的集合,所以訓練可以并行。該算法隨機性注入訓練過程,使每個決策樹會有一點不同。結合每棵樹的預測降低了預測的方差,改進了測試數據的性能。
1.隨機注入
算法隨機性注入訓練的過程包括:
1)?每次迭代對原始數據集進行二次采樣獲得不同的訓練集,即引導。
2)?考慮在樹的每個節點上將特征的不同隨機子集分割。
除了這些隨機性,每個決策樹個體都以同樣的方法訓練。
2.預測
對隨機森林做預測,就必須聚合它的決策樹集合的預測。分類和回歸的聚合是不同的:
- 分類采用多數表決。每棵樹的預測作為對分類的一次投票,收到最多投票的分類就是預測結果。
- 回歸采用平均值。每棵樹都有一個預測值,這些樹的預測值的平均值就是預測結果。
7.2?使用例子
下面例子演示了使用隨機森林執行分類。
//訓練隨機森林模型 //空categoricalFeaturesInfo說明所有特征是連續的 val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 //use more practice val featureSubsetStrategy = "auto" //Let the algorithm choose. val impurity = "gini" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val labelAndPreds = testData.map{ point => val prediction = model.predict(point.features)(point.label, prediction) }val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification forest model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = RandomForestModel.load(sc, "myModelPath")下面例子演示了使用隨機森林執行回歸。
val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 //Use more in practice. val featureSubsetStrategy = "auto" //Let the algorithm choose val impurity = "variance" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val labelAndPredictions = testData.map{ point => val prediction = model.predict(point.features)(point.label, prediction) } val testMSE = labelAndPredictions.map{ case(v, p) => math.pow((v -p), 2)}.mean() println("Test Mean Squared Error = " + testMSE) println("Learned regression forest model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = RandomForestModel.load(sc, "myModelPath")8.?梯度提升決策樹
GBT迭代訓練決策樹,以便最小化損失函數。和決策樹相似,隨機森林處理分類的功能,延伸到多元分類設置,不需要縮放功能,并能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持梯度提升決策樹的二元和多元的分類和回歸。
8.1?基本算法
GBT迭代訓練一個決策樹的序列。在每次迭代中,算法使用當前合奏來預測每個訓練實例的標簽,然后將預測與真實的標簽進行比較。數據集被重新貼上標簽,將重點放在預測不佳的訓練實例上。因此,在下一迭代中,決策樹將幫助糾正先前的錯誤。重貼標簽的具體機制是由損失函數定義的。隨著每次迭代,GBT進一步減少訓練數據上的損失函數。表11-5列出了MLlib中GBT支持的損失函數。請注意,每個損失只適用于分類或回歸之一。其中N表示實例數量,y1表示實例i的標簽,xi表示實例i的特征,F(Xi)表示實例i的模型預測標簽。
8.2?使用例子
下例演示了用LogLoss作為損失函數,使用GBT執行分類的例子。
//訓練GradientBoostedTree模型 //默認使用LogLoss val boostingStrategy = BoostingStrategy.defaultParams("Classification") boostingStrategy.numIterations = 3 //Note:Use more iterations in practice. boostingStrategy.treeStrategy.numClasses = 2 boostingStrategy.treeStrategy.maxDepth = 5 //空categoricalFeaturesInfo說明所有特征是連續的 boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]() val model = GradientBoostedTrees.train(trainingData, boostingStrategy) val labelAndPreds = testData.map{ point => val prediction = model.predict(point.features)(point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification GBT model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")下例演示了用Squared Error?作為損失函數,使用GBT執行回歸的例子。
//訓練GradientBoostedTrees模型 // defaultParams指定了Regression, 默認使用SquaredError val boostingStrategy = BoostingStrategy.defaultParams("Regression") boostingStrategy.numIterations = 3 //Note: Use more iterations in practice. boostingStrategy.treeStrategy.maxDepth = 5 //空categoricalFeaturesInfo說明所有特征是連續的 boostingStrategy。treeStrategy.categoricalFeaturesInfo = Map [Int, Int]() val model = GradientBoostedTrees.train(trainingData, boostingStrategy) val labelsAndPredictions = testData.map{ point =>val prediction = model.predict(point.features)(point.label, prediction) } val testMSE = labelsAndPredictions.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Test Mean Squared Error = " + testMSE) println("Learned regression GBT model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")9.?樸素貝葉斯
9.1?算法原理
j我們首先來介紹一些數學中的理論,然后來看樸素貝葉斯。
條件概率:A和B表示兩個事件,且P(B)?≠ 0(B事件發生的概率不等于0),則給定事件B發生的條件下事件A發生的條件概率定義為:
使用條件概率推導出乘法定律:A和B表示兩個事件,且P(B)?≠ 0(B事件發生的概率不等于0)。那么:
將乘法定律擴展為全概率定律:事件B1,B2,...,Bn滿足Uni=1 Bi=Ω,Bi∩Bj=?,i?≠ j,且對所有的i,P(Bi) > 0。那么,對于任意的A,滿足:
貝葉斯公式:事件事件A, B1,B2,...,Bn,?其中Bi不相交,Uni=1?Bi=Ω,且對所有的i,P(Bi) > 0。那么:
樸素貝葉斯分類算法是一種基于每對特征之間獨立性的假設的簡單的多元分類算法。樸素貝葉斯的思想:對于給出的待分類項,求解在此項出現的條件下各個類別出現的概率,哪個最大,就認為此待分類項屬于哪個類別。樸素貝葉斯分類的定義如下:
1)?設x = {a1,a2,...,am}為待分類項,每個ai為x的一個特征屬性;
2)類別集合C = {y1,y2,...,yn};
3)?計算P(y1|x),P(y2|x),...,P(yn|x) ;
4)?如果P(yk|x) = max{(y1|x),P(y2|x),...,P(yn|x)?},則x?ε?yk 。
關鍵在第三步:
1)?找到一個已知分類的待分類項集合,這個集合叫做訓練樣本集。
2)?統計得到在各類別下各個特征屬性的條件概率估計。即P(a1|y1),P(a2|y1),...,P(am|y1);?P(a1|y2),P(a2|y2),... ,P(am|y2);...;P(a1|yn),P(a2|yn),...,P(am|yn) 。
3)?如果各個特征屬性都是獨立的,則根據貝葉斯公式可以得到以下推導:
樸素貝葉斯能夠被非常有效的訓練。它被單獨傳給訓練數據,計算給定標簽特征的條件概率分布并給出觀察結果用于預測。
MLlib支持多項樸素貝葉斯和伯努利樸素貝葉斯。這些模型典型的應用是文檔分類。在這方面,每個觀察是一個文檔,每個特征代表一個條件,其值是條件的頻率(在多項樸素貝葉斯中)或一個由零個或一個指示該條件是否在文檔中找到(在伯努利樸素貝葉斯中)。特征值必須是非負的。模型類型選擇使用可選的參數"多項"或“伯努利”。“多項”作為默認模型。通過設置參數λ(默認為1.0)添加劑平滑。為文檔分類,輸入特征向量通常是稀疏的,因為稀疏向量能利用稀疏性的優勢。因為訓練數據只使用一次,所以沒有必要緩存它。
9.2?使用例子
下面的例子演示了如何使用多項樸素貝葉斯。
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val data = sc.textFile("data/mllib/sample_naive_bayes_data.txt") val parsedData = data.map{ line => val parts = line.split(',')LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial") val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")10.?保序回歸
10.1?算法原理
保序回歸屬于回歸算法,其定義為:給定一個有限的實數集合Y =??{y1,y2,...,yn}?表示觀測響應,?X =??{x1,x2,...,xn}?表示未知的響應值,進行擬合找到一個最小化函數:
?并使用x1≤x2≤...≤xn?對目標排序,其中ωi?是大于0的權重。最終的函數被稱為保序回歸,并且它是唯一的。它可以看做是排序限制下的最小二乘問題。基本上保序回歸是擬合原始數據點最佳的單調函數。
MLlib支持PAVA,此算法使用一種辦法來平行化保序回歸。保序回歸有一個可選參數isotonic,默認值是true。此參數指定保序回歸是保序的(單調增加)還是不保序的(單調減少)。
保序回歸的結果被視為分段線性函數。因此,預測的規則是:
1)?如果預測輸入能準備匹配訓練特征,那么返回相關預測。如果有多個預測匹配訓練特征,那么會返回其中之一。
2)?如果預測輸入比所有的訓練特征低或者高,那么最低和最高的訓練特征各自返回。如果有多個預測比所有的訓練特征低或者高,那么都會返回。
3)?如果預測輸入介于兩個訓練特征,那么預測會被視為分段線性函數和從最接近的訓練特征中計算得到的插值。
10.2?使用例子
下面的例子演示了如何使用保序回歸。
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel} //省略數據加載及樣本劃分的代碼 val model = new IsotonicRegression().setIsotonic(true).run(training) val predictionAndLabel = test.map { point => val predictedLabel = model.predict(point._2)(predictedLabel, point._1) } val meanSquareError = predictionAndLabel.map{case(p, 1) => math.pow((p-1), 2)}.mean() println("Mean Squared Error = " + meanSquaredError) model.save(sc, "myModelPath") val sameModel = IsotonicRegressionModel.load(sc, "myModelPath")11.?協同過濾
協同過濾通常用于推薦系統。這些技術旨在填補用戶關聯矩陣的缺失項。MLlib支持基于模型的協同過濾,用戶和產品可以預測缺失項的潛在因素的小集合來描述。MLlib采用交替最小二乘算法來學習這些潛在的因素。
矩陣分解的標準方法基于協同過濾處理用戶項矩陣的條目是明確的。現實世界的用例只能訪問隱式反饋是更常見的(例如瀏覽、點擊、購買、喜歡、股份等)。
下面的例子演示了如何使用協同過濾。
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.MatrixFactorizationModel import org.apache.spark.mllib.recommendation.Ratingval data = sc.textFile("data/mllib/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>Rating(user.toInt, item.toInt, rate.toDouble) }) //使用ALS構建推薦模型 val rank = 10 val numIterations = 20 val model = ALS.train(ratings, rank, numIterations, 0.01) //模型計算 val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) val MSE = ratesAndPres.map { case ((user, product), (r1, r2)) =>val err = (r1 - r2)err * err}.mean() println("Mean Squared Error = " + MSE) model.save(sc, "myModelPath") val sameModel = MatrixFactorizationModel.load(sc, "myModelPath")12.?聚類
聚類分析又稱群分析,它是研究(樣品或指標)分類問題的一種統計分析方法。聚類分析以相似性為基礎,在一個聚類中的模式之間比不在同一聚類中的模式之間具有更多的相似性。MLlib支持的聚類算法如下:
- K-means;
- 高斯混合(Gaussian mixture);
- power iteration clustering(PIC);
- latent?Dirichlet allocation(LDA);
- 流式K-means.
12.1?K-means
K-means算法是硬聚類算法,是典型的基于原型的目標函數聚類方法的代表,它是數據點到原型的某種距離作為優化的目標函數,利用函數求極值的方法得到迭代運算的調整規則。
聚類屬于無監督學習,以往的回歸、樸素貝葉斯、SVM等都是有類別標簽y的,也就是說,樣本中已經給出了樣本的分類。而聚類的樣本中卻沒有給定y,只有特征x,比如假設宇宙中的星星可以表示成三維空間中點集(x, y, z)。聚類的目的是找到每個樣本x潛在的類別y,并將同類別y的樣本x放在一起。
在聚類問題中,訓練樣本X =??{x1,x2,...,xm},每個xi?ε Rn,K-means算法是將樣本聚類成k個簇,具體算法描述如下:
1)?隨機選取k個聚類質心點為μ1,μ2,...,μk?ε Rn;
2)?重復下面過程直到收斂。
對每一個樣本i計算它應該屬于的類
對于每一個類重新計算該類的質心
k是我們事先給定的聚類數,ci代表樣本i與k個類中距離最近的那個類,ci的值是1到k中的一個。質心uj代表我們對屬于同一個類的樣本中心點的猜測,拿星團模型來解釋就是要將所有的星星聚成k個星團,首先隨機選取k個宇宙中的點(或者k個星星)作為k個星團的質心,然后第一步對于每一個星星計算其到k個質心中每一個的距離,接著選取距離最近的那個星團作為ci,這樣經過第一步每一個星星都有了所屬的星團;第二步對于每一個星團,重新計算它的質心uj(對里面所有的星星坐標求平均)。重復迭代第一步和第二步直到質心不變或者變化很小。圖11-8演示了以上過程。
?
?下面的例子演示了K-means算法的使用。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectorsval data = sc.textFile("data/mllib/kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() val numClusters =2 val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) val WSSSE = clusters.computeCost(parsedData) println("Within Set Sum of Squared Errors = " + WSSSE) clusters.save(sc, "myModelPath") val sameModel = KMeansModel.load(sc, "myModelPath")12.2?高斯混合
K-means的結果是每個數據點被分配到其中某一個cluster了,而高斯混合則給出這些數據點被分配到每個cluster的概率。高斯混合的算法與K-means算法類似。MLlib中高斯混合的使用例子
import org.apache.spark.mllib.clustering.GaussianMixture import org.apache.spark.mllib.clustering.GaussianMixtureModel import org.apache.spark.mllib.linalg.Vectorsval data = sc.textFile("data/mllib/gmm_data.txt") val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache() val gmm = new GaussianMixture().setK(2).run(parsedData) gmm.save(sc, "myGMMModel") val sameModel = GaussianMixtureModel.load(sc, "myGMMModel") for(i <- 0 until gmm.k){println("weight=%f\nmu=%s\nsigma=\n%s\n" format(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma)) }12.3?快速迭代聚類
快速迭代聚類是一種簡單可擴展的圖聚類方法。其使用例子如下:
?
import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel} import org.apache.spark.mllib.linalg.Vectors val similarities: RDD[(Long, Long, Double)] = ... val pic = new PowerIterationClustering().setK(3).setMaxIterations(20) val model = pic.run(similarities) model.assignments.foreach{ a => println(s"${a.id} -> ${a.cluster}") } model.save(sc, "myModelPath") val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")12.4?latent Dirichlet allocation
latent Dirichlet allocation(LDA)是一個三層貝葉斯概率模型,包含詞、主題和文檔三層結構。文檔到主題服從Dirichlet分布,主題到詞服從多項式分布。
LDA是一種非監督機器學習技術,可以用來識別大規模文檔集或語料庫中潛藏的主題信息。它采用了詞袋的方法,這種方法將每一篇文檔視為一個詞頻向量,從而將文本信息轉化為了易于建模的數字信息。但是詞袋方法沒有考慮詞與詞之間的順序,這簡化了問題的復雜性,同時也為模型的改進提供了契機。每一篇文檔代表了一些主題所構成的一個概率分布,而每一個主題又代表了很多單詞所構成的一個概率分布。由于Dirichlet分布隨機向量各分量間的弱相關性(之所以還有點“相關”,是因為各分量之和必須為1),使得我們假想的潛在主題之間也幾乎是不相關的,這與很多實際問題并不相符,從而造成了LDA的又一個遺留問題。
對于語料庫中的每篇文檔,LDA定義了如下生成過程:
1)?對每一篇文檔,從主題分布中抽取一個主題;
2)?從上述被抽到的主題所對應的單詞分布中抽取一個單詞;
3)?重復上述過程直至遍歷文檔中的每一個單詞。
下例演示了LDA的使用。
12.5?流式K-means
當數據流到達,我們可能想要動態地估算cluster,并更新它們。該算法采用了小批量的K-means更新規則。對每一批數據,將所有的點分配到最近的cluster,并計算最新的cluster中心,然后更新每個cluster的公式為:
ci是前一次計算得到的cluster中心,ni是已經分配到cluster的點數,xi是從當前批次得到的cluster的新中心,mi是當前批次加入cluster的點數。衰減因子a可被用于忽略過去的數據;a=1時所有數據都從一開始就被使用;a=0時只有最近的數據將被使用。這類似于一個指數加權移動平均值。
下面的例子演示了流失K-means的使用。
13.?維數減縮
維數減縮是減少所考慮變量的數量的過程。維數減縮有兩種方式:
- 奇異值分解;
- 主成分分析。
13.1?奇異值分解
奇異值分解將一個矩陣因子分解為三個矩陣U、Σ?和?V:
其中U是正交矩陣,其列被稱為左奇異向量;Σ是對角矩陣,其對角線是非負的且以降序排列,因此被稱為奇異值;V也是正交矩陣,其列被稱為右奇異向量。
對于大的矩陣,除了頂部奇異值和它的關聯奇異值,我們不需要完全分解。這樣可以節省存儲、去噪聲和恢復矩陣的低秩結構。如果我們保持前7個頂部奇異值,那么最終的低秩矩陣為:
假設n小于m。奇異值和右奇異值向量來源于特征值和Gramian矩陣AτA的特征向量。矩陣存儲左奇異向量U,通過矩陣乘法U = A(VS-1)計算。使用的實際方法基于計算成本自動被定義:
1)?如果(n < 100)?或者(k > n/2),我們首先計算Gramian矩陣,然后再計算其頂部特征向量并將特征向量本地化到Driver。這需要單次傳遞,在每個Executor和Driver上使用O(n2)的存儲,并花費Driver上O(n2k)的時間。
2)?否則,將使用分布式的方式計算AτA的值,并且發送給ARPACK計算頂部特征向量。這需要O(k)次傳遞,每個Executor上使用O(n)的存儲,在Driver上使用O(nk)的存儲。
下面的例子演示了SVD的使用。
13.2?主成分分析
主成分分析是一種統計方法,此方法找到一個旋轉,使得第一坐標具有可能的最大方差,并且每個隨后的坐標都具有可能的最大方差。旋轉矩陣的列被稱為主成分。下面的例子演示了使用RowMatrix計算主成分。
14.?特征提取與轉型
14.1?術語頻率反轉
術語頻率反轉是一個反映文集的文檔中的術語的重要性,廣泛應用于文本挖掘的特征矢量化方法。術語表示為t,文檔表示為d,文集表示為D。術語頻率TF(t,d)表示術語t在文檔d中出現的頻率,文檔頻率DF(t, D)表示包含術語t的文檔數量。如果我們僅使用術語頻率來測量重要性,則很容易過度強調術語出現的很頻繁,而攜帶的關于文檔的信息很少,例如a、the?和of等。如果術語非常頻繁地跨文集出現,這意味著它并沒有攜帶文檔的特定信息。反轉文檔頻率是一個術語提供了多少信息的數值度量:
|D|?表示文集中的文檔總數。因為使用了對數,如果一個術語出現于所有的文檔中,它的IDF值變0。需要注意的是,應用一個平滑項,以避免被零除。TF-IDF方法基于TF與IDF,它的公式如下:
這里有一些術語頻率和文檔頻率定義的變種。在MLlib中,為了使TF和IDF更靈活,將它們分開了,MLlib實現術語頻率時使用了哈希。應用歐冠哈希函數將原始特征映射到了索引(術語),術語頻率因此依賴于map的索引計算。這種方法避免了需要計算一個全局術語到索引圖,這對于大型語料庫開銷會很大。但由于不同的原始特征在哈希后可能變為同樣的術語,所以存在潛在的哈希沖突。為了降低碰撞的機會,我們可以增加目標特征的維度(即哈希表中的桶數)。默認的特征維度是220 = 1048576.
下面的例子演示了HashingTF的使用。
14.2?單詞向量轉換
Word2Vec計算由單詞表示的分布式向量。分布式表征的主要優點是,類似的單詞在矢量空間是接近的,這使得泛化小說模式更容易和模型估計更穩健。分布式向量表示被證實在許多自然語言處理應用中有用,例如,命名實體識別、消歧、解析、標記和機器翻譯。
MLlib的Word2Vec實現采用了skip-gram模型。skip-gram的訓練目標是學習單詞的向量表示,其善于在同一個句子預測其上下文。給定了單詞序列w1,w2,...,wτ,skip-gram模型的目標是最大化平均對數似然,公式如下:
其中k是訓練窗口的大小。每個單詞w與兩個向量uw和vw關聯,并且由單獨的向量來表示。通過給定的單詞wj正確預測wi的概率是由softmax模型決定的。softmax模型如下:
w表示詞匯量。
由于計算logp(wi|wj)的成本與V成正比(V很容易就達到百萬以上),所以softmax這種skip-gram模型是很昂貴的。為了加速Word2Vec計算,MLlib使用分級softmax,它可以減少計算logp(wi|wj)?復雜度到O(log(V))。
下面的例子演示了Word2Vec的使用,
14.3?標準尺度
通過縮放到單位方差和/或通過在訓練集的樣本上使用列摘要統計溢出均值使特征標準化,這是常見的預處理步驟。例如,當所有的特征都有單位方差和/或零均值時,支持向量機的RBF核或者L1和L2正規化線性模型通常能更好地工作。標準化可以提高在優化過程中的收斂速度,并且還可以防止在模型訓練期間,非常大的差異會對特征發揮過大的影響。
StandardScaler的構造器有兩個參數:
- withMean:默認false。用于縮放前求均值,這將建立一個密集的輸出,所以不能在稀疏輸入上正常工作,并將引發異常。
- withStd:默認true。縮放數據到標準單位誤差。
以下例子演示了StandardScaler的使用。
14.4?正規化尺度
正規化尺度把樣本劃分為單位Lp范式,即維度。這是一種常見的對文本分類或集群化的操作。例如,兩個L2正規化TF-IDF向量的點積是這些向量的余弦近似值。
設二維空間內有兩個向量a和b,它們的夾角為θ(0≤θ≤π),則點積定義為以下實數:
MLlib提供Normalizer支持正規化,Normalizer有以下構造參數:
p:?正規化到Lp空間,默認為2。
下面的例子演示了Normalizer的使用。
14.5?卡方特征選擇器
ChiSqSelector用于卡方特征選擇。它運轉在具有分類特征的標簽數據上。ChiSqSelector對基于分類進行獨立卡方測試的特征排序,并且過濾(選擇)最接近標簽的頂部特征。
ChiSqSelector有以下構造器參數:
numTopFeatures:選擇器將要過濾(選擇)的頂部特征數量。
下邊的例子演示了ChiSqSelector的使用。
14.6?Hadamard積
ElementwiseProduct采用逐個相乘的方式,使用給定的權重與每個輸入向量相乘。換言之,它采用一個標量乘法器擴展數據集的每一列。這表示Hadamard積對輸入向量v,使用轉換向量w,最終生成一個結果向量。Hadamard積可由以下公式表示:
ElementwiseProduct的構造器參數為:
w:轉換向量。
下面代碼演示了ElementwiseProduct的使用。
15.?頻繁模式挖掘
分析大規模數據集的第一個步驟通常是挖掘頻繁項目、項目集、亞序列或其他子結構,這在數據挖掘中作為一個活躍的研究主題已多年了。其數學原理讀者可以取維基百科了解。MLlib提供了頻繁模式挖掘的并行實現——FP-growth算法。
FP-growth
給定一個交易數據集,FP-growth的第一步驟是計算項目的頻率,并確定頻繁項目。FP-growth雖然與Apriori類算法有相同的設計目的,但是FP-growth的第二步使用后綴樹(FP樹)結構對交易數據編碼且不會顯式生成候選集(生成候選集通常開銷很大)。第二步之后,就可以從FP樹中抽取頻繁項目集。MLlib中實現了FP-growth的平行版本,叫做PFP。PFP可以將FP-growth的工作分發到其他機器,比單機運行有更好的擴展性。
FPGrowth有以下參數:
- minSupport:項目集被確定為頻繁的最小數量。
- numPartitions:分發任務的數量。
下面的例子演示了FPGrowth的使用。
16.?預言模型標記語言
預言模型標記是一種基于XML的語言,它能夠定義和共享應用程序之間的預測模型。
MLlib支持將模型導出為預言模型標記語言。表11-6列出了MLlib模型導出為PMML的相應模型。
下面的例子演示了將KMeamsModel導出為PMML格式。
17.?管道
Spark1.2增加了一個新包spark.ml,目的是提供一套高層次的API,幫助用戶創建、調試機器學習的管道。spark.ml的標準化API用于將多種機器學習算法組合到一個管道或工作流中。下面列出了Spark ML API的主要概念:
- ML?DataSet:由Hive table或者數據源的數據構成的可容納各種數據類型的DataFrame作為數據集。例如,數據集可以由不同的列分別存儲文本、特征向量、標簽和預測值。
- Transformer:是一種將DataFrame轉換為另一個DataFrame的算法。例如,ML模型是一個將特征RDD轉換為預測值RDD的Transformer。
- Estimator:適用于DataFrame,并生成一個Transformer。例如,學習算法是一個在數據集上訓練并生成一個模型的Estimator。
- Pipeline:鏈接多個Transformer和Estimator,一起構成ML的工作流。
- Param:所有Transformer和Estimator用于指定參數的通用API。
17.1?管道工作原理
機器學習中,運行一系列的算法取處理數據或者從數據學習的場景是很常見的。例如,一個簡單的文本文檔處理工作流可能包含以下階段:
1)?將文檔文本切分成單詞;
2)?將文檔的單詞轉換為數字化的特征向量;
3)?使用特征向量和標簽學習一個預測模型。
Spark ML以一系列按序運行的PipelineStage組成的管道來表示這樣的工作流。這一系列的Stage要么是Transformer,要么是Estimator。數據集通過管道中的每個Stage都會被修改。比如Transformer的transform()方法將在數據集上被調用,Estimator的fit()方法被調用生成一個Transformer,然后此Transformer的transform()方法也將在數據集上被調用。圖11-9展示了簡單文本文檔工作流例子使用管道的處理流程。
圖11-9說明整個管道由3個Stage組成。Tokenizer和HashingTF都是Transformer,LogisticRegression是Estimator。每個圓柱體都說明它本身是一個DataFrame。整個處理流程如下:
1)?在由原生文本文檔構成的原始數據集上應用Pipeline.fit()方法。
2) Tokenizer.transform()將原生文本文檔切分為單詞,并向數據集增加單詞列。
3) HashingTF.transform()將單詞列轉換為特征向量,并向數據集增加向量列。
4)?因為LogisticRegression是Estimator,所以管道第一次調用LogisticRegression.fit()生成了LogisticRegressionModel。如果管道中還有更多的Stage,將會傳遞數據集到下一個Stage之前在數據集上調用LogisticRegressionModel的transform()。
管道本身是一個Estimator。因此調用Pipeline的fit()方法最后生成了PipelineModel,PipelineModel也是一個Transformer。這個PipelineModel會在測試時間使用,測試過程如圖11-10所示。
圖11-10說明PipelineModel的測試過程與圖11-9的管道有相同的Stage數量。但是圖11-9的管道中的所有Estimator在此時都已經變為Transformer。當在測試數據集上調用PipelineModel的transform()方法時,數據在管道中按序通過。每個Stage的transform()方法都會更新數據集,并將數據集傳遞給下一個Stage。
剛才介紹的例子中,管道是線性的,即每個stage都使用由上一個stage生產的數據。只要數據流圖構成了DAG,它就有可能不是線性的。如果管道構成了DAG,那么這些Stage就必須指定拓撲順序。
17.2?管道API介紹
Spark ML的Transformer和Estimator指定參數具有統一的API。有兩種方式指定參數:
- 給實例設置參數。例如,Ir是LogisticRegression的實例,可以調用lr.setMaxIter(10)使得調用lr.fit()時最多迭代10次。
- 傳遞ParamMap給fit()或者transform()方法。通過這種方式指定的參數值將會覆蓋所有由set方式指定的參數值。
下面的例子演示了Estimator、Transformer和Param的使用。
17.3?交叉驗證
?模型選擇是Spark ML中很重要的課題。通過對整個管道的調整,而不是對管道中的每個元素的調整,促成對管道模型的選擇。當前spark.ml使用CrossValidator支持模型選擇。CrossValidator本身攜帶一個Estimator、一組ParamMap以及一個Evaluator。CrossValidator開始先將數據集劃分為多組,每組都由訓練數據集和測試數據集組成。例如,需要劃分3組,那么CrossValidator將生成三個數據集對(訓練,測試)。每一對都使用2/3的數據用于訓練,1/3的數據用于測試。CrossValidator會迭代ParamMap的集合。對于每個ParamMap,它都會訓練給定的Estimator并使用給定的Evaluator計算。ParamMap將會產出最佳的計算模型(對多個數據集對求平均),CrossValidator最終使用這個最佳的ParamMap和整個數據集擬合Estimator。下邊的例子演示了CrossValidator的使用。使用ParamGridBuilder構造網格參數:hashingTF.numFeatures有3個值,r.regParam有2個值。這個網格將會有3*2=6個參數設置供CrossValidator選擇。使用了2組數據集對,那么一共有(3*2)*2=12種不同的模型被訓練。
下面的代碼演示了交叉驗證的使用。
轉載于:https://www.cnblogs.com/swordfall/p/9456222.html
總結
以上是生活随笔為你收集整理的Spark MLlib 机器学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php 其他格式数据与数组互转
- 下一篇: python基础(十二):正则、re模块