Spark MLlib
1. 機器學習
機器學習可以看做是一門人工智能的科學,該領域的主要研究對象是人工智能。機器學習利用數據或以往的經驗,以此優化計算機程序的性能標準。
機器學習強調三個關鍵詞:算法、經驗、性能,其處理過程如上圖所示。在數據的基礎上,通過算法構建出模型并對模型進行評估。評估的性能如果達到要求,就用該模型來測試其他的數據;如果達不到要求,就要調整算法來重新建立模型,再次進行評估。如此循環往復,最終獲得滿意的經驗來處理其他的數據。機器學習技術和方法已經被成功應用到多個領域,比如個性推薦系統,金融反欺詐,語音識別,自然語言處理和機器翻譯,模式識別,智能控制等。
2. 基于大數據的機器學習
傳統的機器學習算法,由于技術和單機存儲的限制,只能在少量數據上使用。即以前的統計/機器學習依賴于數據抽樣。但實際過程中樣本往往很難做好隨機,導致學習的模型不是很準確,在測試數據上的效果也可能不太好。隨著 HDFS(Hadoop Distributed File System) 等分布式文件系統出現,存儲海量數據已經成為可能。在全量數據上進行機器學習也成為了可能,這順便也解決了統計隨機性的問題。然而,由于 MapReduce 自身的限制,使得使用 MapReduce 來實現分布式機器學習算法非常耗時和消耗磁盤IO。因為通常情況下機器學習算法參數學習的過程都是迭代計算的,即本次計算的結果要作為下一次迭代的輸入,這個過程中,如果使用 MapReduce,我們只能把中間結果存儲磁盤,然后在下一次計算的時候從新讀取,這對于迭代 頻發的算法顯然是致命的性能瓶頸。
在大數據上進行機器學習,需要處理全量數據并進行大量的迭代計算,這要求機器學習平臺具備強大的處理能力。Spark 立足于內存計算,天然的適應于迭代式計算。即便如此,對于普通開發者來說,實現一個分布式機器學習算法仍然是一件極具挑戰的事情。幸運的是,Spark提供了一個基于海量數據的機器學習庫,它提供了常用機器學習算法的分布式實現,開發者只需要有 Spark 基礎并且了解機器學習算法的原理,以及方法相關參數的含義,就可以輕松的通過調用相應的 API 來實現基于海量數據的機器學習過程。其次,Spark-Shell的即席查詢也是一個關鍵。算法工程師可以邊寫代碼邊運行,邊看結果。spark提供的各種高效的工具正使得機器學習過程更加直觀便捷。比如通過sample函數,可以非常方便的進行抽樣。當然,Spark發展到后面,擁有了實時批計算,批處理,算法庫,SQL、流計算等模塊等,基本可以看做是全平臺的系統。把機器學習作為一個模塊加入到Spark中,也是大勢所趨。
3. Spark 機器學習庫MLLib
MLlib是Spark的機器學習(Machine Learning)庫,旨在簡化機器學習的工程實踐工作,并方便擴展到更大規模。MLlib由一些通用的學習算法和工具組成,包括分類、回歸、聚類、協同過濾、降維等,同時還包括底層的優化原語和高層的管道API。具體來說,其主要包括以下幾方面的內容:特征化公交:特征提取、轉化、降維,和選擇公交;
管道(Pipeline):用于構建、評估和調整機器學習管道的工具;
持久性:保存和加載算法,模型和管道;
實用工具:線性代數,統計,數據處理等工具。
spark.ml 則提供了基于DataFrames 高層次的API,可以用來構建機器學習工作流(PipeLine)。ML Pipeline 彌補了原始 MLlib 庫的不足,向用戶提供了一個基于 DataFrame 的機器學習工作流式 API 套件。
使用 ML Pipeline API可以很方便的把數據處理,特征轉換,正則化,以及多個機器學習算法聯合起來,構建一個單一完整的機器學習流水線。這種方式給我們提供了更靈活的方法,更符合機器學習過程的特點,也更容易從其他語言遷移。Spark官方推薦使用spark.ml。如果新的算法能夠適用于機器學習管道的概念,就應該將其放到spark.ml包中,如:特征提取器和轉換器。開發者需要注意的是,從Spark2.0開始,基于RDD的API進入維護模式(即不增加任何新的特性),并預期于3.0版本的時候被移除出MLLib。
Spark在機器學習方面的發展非常快,目前已經支持了主流的統計和機器學習算法。縱觀所有基于分布式架構的開源機器學習庫,MLlib可以算是計算效率最高的。MLlib目前支持4種常見的機器學習問題: 分類、回歸、聚類和協同過濾。下表列出了目前MLlib支持的主要的機器學習算法:
MLlib 是spark的可以擴展的機器學習庫,由以下部分組成:通用的學習算法和工具類,包括分類,回歸,聚類,協同過濾,降維,當然也包括調優的部分
- Data types
- Basic statistics (基本統計)
- summary statistics 概括統計
- correlations 相關性
- stratified sampling 分層取樣
- hypothesis testing 假設檢驗
- random data generation 隨機數生成
- Classification and regression (分類一般針對離散型數據而言的,回歸是針對連續型數據的。本質上是一樣的)
- linear models (SVMs, logistic regression, linear regression)線性模型(支持向量機,邏輯回歸,線性回歸)
- naive Bayes 貝葉斯算法
- decision trees 決策樹
- ensembles of trees (Random Forests and Gradient-Boosted Trees)
多種樹(隨機森林和梯度增強樹)
- Collaborative filtering 協同過濾
- alternating least squares (ALS) (交替最小二乘法(ALS) )
- Clustering 聚類
- k-means k均值算法
- Dimensionality reduction (降維)
- singular value decomposition (SVD) 奇異值分解
- principal component analysis (PCA) 主成分分析
- Feature extraction and transformation 特征提取和轉化
- Optimization (developer) 優化部分
- stochastic gradient descent 隨機梯度下降
- limited-memory BFGS (L-BFGS) 短時記憶的BFGS (擬牛頓法中的一種,解決非線性問題)
4. 線性回歸實例
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.evaluation.RegressionMetrics; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.regression.LinearRegressionModel; import org.apache.spark.mllib.regression.LinearRegressionWithSGD; import scala.Tuple2; public class SparkMLlibLinearRegression { public static void main(String[] args) { String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/ridge-data/lpsa.data"; SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(args[0], "Spark", conf); JavaRDD<String> data = sc.textFile(path); JavaRDD<LabeledPoint> parsedData = data.map(new Function<String, LabeledPoint>() { @Override public LabeledPoint call(String line) throws Exception { String[] parts = line.split(","); String[] features = parts[1].split(" "); double[] v = new double[features.length]; for (int i = 0; i < v.length; i++) { v[i] = Double.parseDouble(features[i]); } return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } }); parsedData.cache(); // Building the model int numIterations = 100; double stepSize = 0.00000001; final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize); // Evaluate model on training examples and compute training error JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(new Function<LabeledPoint, Tuple2<Double, Double>>(){ @Override public Tuple2<Double, Double> call(LabeledPoint point) throws Exception { double prediction = model.predict(point.features()); return new Tuple2<Double, Double>(prediction, point.label()); } }); double MSE = new JavaDoubleRDD(valuesAndPreds.map( new Function<Tuple2<Double, Double>, Object>() { public Object call(Tuple2<Double, Double> pair) { return Math.pow(pair._1() - pair._2(), 2.0); } } ).rdd()).mean(); System.out.println("training Mean Squared Error = " + MSE); //模型評測 JavaRDD<Tuple2<Object, Object>> valuesAndPreds2= parsedData.map(new Function<LabeledPoint, Tuple2<Object, Object>>(){ @Override public Tuple2<Object, Object> call(LabeledPoint point) throws Exception { double prediction = model.predict(point.features()); return new Tuple2<Object, Object>(prediction, point.label()); } }); RegressionMetrics metrics = new RegressionMetrics(JavaRDD.toRDD(valuesAndPreds2)); System.out.println("R2(決定系數)= "+metrics.r2()); System.out.println("MSE(均方差、方差) = "+metrics.meanSquaredError()); System.out.println("RMSE(均方根、標準差) "+metrics.rootMeanSquaredError()); System.out.println("MAE(平均絕對差值)= "+metrics.meanAbsoluteError()); // Save and load model model.save(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel"); LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel"); double result = sameModel.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865)); System.out.println(sameModel.weights()); System.out.println("save predict result="+ result); result = model.predict(Vectors.dense(-0.132431544081234,2.68769877553723,1.09253092365124,1.53428167116843,-0.522940888712441,-0.442797990776478,0.342627053981254,-0.687186906466865)); System.out.println(model.weights()); System.out.println("predict result="+ result); } }總結
以上是生活随笔為你收集整理的Spark MLlib的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: openssl https证书
- 下一篇: JS基础操作