ML Pipelines管道
ML Pipelines管道
In this section, we introduce the concept of ML Pipelines. ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines. 介紹ML Pipelines的概念。ML管道提供一套統一的建立在DataFrames頂部的高級API ,幫助用戶創建和調實用機器學習管道。
Table of Contents
? Main concepts in Pipelines
o DataFrame
o Pipeline components
? Transformers
? Estimators
? Properties of pipeline components
o Pipeline
? How it works
? Details
o Parameters
o ML persistence: Saving and Loading Pipelines
? Backwards compatibility for ML persistence
? Code examples
o Example: Estimator, Transformer, and Param
o Example: Pipeline
o Model selection (hyperparameter tuning)
Main concepts in Pipelines
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.
? DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
? Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
? Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
? Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
? Parameter: All Transformers and Estimators now share a common API for specifying parameters.
MLlib對用于機器學習算法的API進行了標準化,從而使將多種算法組合到單個管道或工作流中變得更加容易。介紹了Pipelines API引入的關鍵概念,其中,管道概念主要受scikit-learn項目的啟發。
? DataFrame:此ML API使用DataFrameSpark SQL作為ML數據集,可以保存各種數據類型。例如,aDataFrame可能具有存儲文本,特征向量,真實標簽和預測的不同列。
? Transformer:一個Transformer是一種算法,可以將一個DataFrame到另一個DataFrame。例如,ML模型是一種Transformer,將DataFrame具有特征的a轉換為DataFrame具有預測的a的模型。
? Estimator:AnEstimator是一種算法,可以適合DataFrame來產生Transformer。例如,學習算法是在DataFrame上Estimator進行訓練并生成模型的算法。
? Pipeline:將Pipeline多個Transformers和Estimators鏈接在一起,指定ML工作流程。
? Parameter:所有Transformer和Estimator現在共享一個用于指定參數的通用API。
DataFrame
Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data. This API adopts the DataFrame from Spark SQL in order to support a variety of data types.
DataFrame supports many basic and structured types; see the Spark SQL datatype reference for a list of supported types. In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.
A DataFrame can be created either implicitly or explicitly from a regular RDD. See the code examples below and the Spark SQL programming guide for examples.
Columns in a DataFrame are named. The code examples below use names such as “text”, “features”, and “label”.
? 機器學習可以應用于多種數據類型,例如矢量,文本,圖像和結構化數據。該API采用DataFrameSpark SQL源,以支持多種數據類型。
? DataFrame支持許多基本和結構化類型;參閱Spark SQL數據類型參考以獲取受支持類型的列表。除了Spark SQL指南中列出的類型外,DataFrame還使用MLVector類型。
? 一個DataFrame可以從常規RDD或明或暗地創建。有關示例,請參見下面的代碼示例和Spark SQL編程指南。
? 列DataFrame被命名。下面的代碼示例使用諸如“文本”,“功能”和“標簽”之類的名稱。
Pipeline components
Transformers
A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:
? A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
? A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.
ATransformer是包括特征轉換器和學習模型的抽象。從技術上講,一種Transformer工具實現了一種方法transform(),該方法DataFrame通常通過附加一個或多個列,將一個方法轉換為另一方法。例如:
? 特征轉換器可以采用DataFrame,讀取列(例如,文本),將其映射到新列(例如,特征向量),并輸出附加了映射列新列的DataFrame。
? 學習模型可能采用DataFrame,讀取包含特征向量的列,預測每個特征向量的標簽,然后輸出帶有預測標簽新列的DataFrame,作為列添加。
Estimators
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
一個Estimator抽象學習算法的概念或算法適合或數據串。從技術上講,Estimator實現是一種fit()方法,該方法接受aDataFrame并產生a Model,即a Transformer。例如,學習算法(例如為LogisticRegression)Estimator和調用 fit()訓練a LogisticRegressionModel,其為Model,因此為Transformer。
Properties of pipeline components
Transformer.transform()s and Estimator.fit()s are both stateless. In the future, stateful algorithms may be supported via alternative concepts.
Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters (discussed below).
Transformer.transform()s和Estimator.fit()s都是無狀態的。可以通過替代概念來支持有狀態算法。
Transformer或Estimator的每個實例都有一個唯一的ID,該ID在指定參數(在下面討論)中很有用。
Pipeline
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
? Split each document’s text into words.
? Convert each document’s words into a numerical feature vector.
? Learn a prediction model using the feature vectors and labels.
MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.
在機器學習中,通常需要運行一系列算法來處理數據并從中學習。例如,一個簡單的文本文檔處理工作流程可能包括幾個階段:
? 將每個文檔的文本拆分為單詞。
? 將每個文檔的單詞轉換為數字特征向量。
? 使用特征向量和標簽學習預測模型。
MLlib將這樣的工作流程表示為Pipeline,其中包含要按特定順序運行的一系列 PipelineStages(Transformers和Estimators)。將使用這個簡單的工作流作為運行示例。
How it works
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.
We illustrate this for the simple text document workflow. The figure below is for the training time usage of a Pipeline.
APipeline被指定為一個階段序列,每個階段是aTransformer或an Estimator。這些階段按順序運行,并且輸入DataFrame,在通過每個階段時都會進行轉換。對于Transformer階段,該transform()方法在DataFrame上調用。對于Estimator階段,用fit()調用方法來生成Transformer(成為PipelineModel或一部分Pipeline)Transformer的transform()方法,并且在DataFrame上調用的方法。
為簡單的文本文檔工作流程說明了這一點。下圖是的訓練時間用法Pipeline。
Above, the top row represents a Pipeline with three stages. The first two (Tokenizer and HashingTF) are Transformers (blue), and the third (LogisticRegression) is an Estimator (red). The bottom row represents data flowing through the pipeline, where cylinders indicate DataFrames. The Pipeline.fit() method is called on the original DataFrame, which has raw text documents and labels. The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words to the DataFrame. The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the DataFrame. Now, since LogisticRegression is an Estimator, the Pipeline first calls LogisticRegression.fit() to produce a LogisticRegressionModel. If the Pipeline had more Estimators, it would call the LogisticRegressionModel’s transform() method on the DataFrame before passing the DataFrame to the next stage.
A Pipeline is an Estimator. Thus, after a Pipeline’s fit() method runs, it produces a PipelineModel, which is a Transformer. This PipelineModel is used at test time; the figure below illustrates this usage.
上方的第一行代表一個Pipeline包含三個階段的。前兩個(Tokenizer和HashingTF)是Transformers(藍色),第三個(LogisticRegression)是Estimator(紅色)。最下面的行表示流經管道的數據,其中,圓柱體表示DataFrames。Pipeline.fit()在原始DataFrame文件上調用此方法,原始文件包含原始文本文檔和標簽。該Tokenizer.transform()方法將原始文本文檔拆分為單詞,然后向DataFrame添加帶有單詞的新列。該HashingTF.transform()方法將words列轉換為特征向量,并將帶有這些向量的新列添加到DataFrame。現在,由于LogisticRegression為Estimator,因此Pipeline第一個調用LogisticRegression.fit(),產生一個LogisticRegressionModel。如果sPipeline更多Estimator,將調用LogisticRegressionModel上DataFrame的transform() 方法,然后再傳遞DataFrame到下一個階段。
APipeline是一個Estimator。因此,運行Pipeline的fit()方法后,會產生一個PipelineModel,即一個 Transformer。這PipelineModel是在測試時使用的; 下圖說明了這種用法。
In the figure above, the PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers. When the PipelineModel’s transform() method is called on a test dataset, the data are passed through the fitted pipeline in order. Each stage’s transform() method updates the dataset and passes it to the next stage.
Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
在上圖中,PipelineModel具有與原始相同的階段數Pipeline,但是原始中的所有EstimatorsPipeline已變為Transformer。在測試數據集上調用PipelineModel的transform()方法時,數據將按順序通過擬合的管道傳遞。每個階段的transform()方法都會更新數據集,并將其傳遞到下一個階段。
Pipeline和PipelineModel有助于確保訓練和測試數據經過相同的特征處理步驟。
Details
DAG Pipelines: A Pipeline’s stages are specified as an ordered array. The examples given here are all for linear Pipelines, i.e., Pipelines in which each stage uses data produced by the previous stage. It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order.
Runtime checking: Since Pipelines can operate on DataFrames with varied types, they cannot use compile-time type checking. Pipelines and PipelineModels instead do runtime checking before actually running the Pipeline. This type checking is done using the DataFrame schema, a description of the data types of columns in the DataFrame.
Unique Pipeline stages: A Pipeline’s stages should be unique instances. E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs. However, different instances myHashingTF1 and myHashingTF2 (both of type HashingTF) can be put into the same Pipeline since different instances will be created with different IDs.
DAGPipeline: Pipeline的級被指定為一個有序陣列。此處給出的示例都是針對線性Pipelines的,即Pipeline每個階段使用前一階段產生的數據。Pipeline只要數據流圖形成有向無環圖(DAG),就可以創建非線性s。當前根據每個階段的輸入和輸出列名稱(通常指定為參數)隱式指定該圖。如果Pipeline形成DAG,則必須按拓撲順序指定階段。
運行時檢查:由于Pipeline可以對DataFrames使用各種類型的s進行操作,不能使用編譯時類型檢查。 Pipelines和PipelineModels會在實際運行之前,進行運行時檢查Pipeline。使用DataFrame 模式完成類型檢查,模式是DataFrame中列的數據類型的描述。
唯一的管道階段: Pipeline的階段應該是唯一的實例。例如,同一實例 myHashingTF不應插入Pipeline兩次,因為Pipeline階段必須具有唯一的ID。但是,由于將使用不同的ID創建不同的實例,因此可以將不同的實例myHashingTF1和myHashingTF2(類型均為HashingTF)放入同一Pipeline實例。
Parameters
MLlib Estimators and Transformers use a uniform API for specifying parameters.
A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
There are two main ways to pass parameters to an algorithm:
- Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations. This API resembles the API used in spark.mllib package.
- Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.
Parameters belong to specific instances of Estimators and Transformers. For example, if we have two LogisticRegression instances lr1 and lr2, then we can build a ParamMap with both maxIter parameters specified: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20). This is useful if there are two algorithms with the maxIter parameter in a Pipeline.
MLlibEstimator和Transformers使用統一的API來指定參數。
AParam是帶有獨立文件的命名參數。AParamMap是一組(參數,值)對。
將參數傳遞給算法的主要方法有兩種: - 設置實例的參數。例如,如果lr是的一個實例LogisticRegression,一個可以調用lr.setMaxIter(10),使lr.fit()最多10次迭代使用。該API與spark.mllib程序包中使用的API相似。
- 將傳遞ParamMap給fit()或transform()中的任何參數ParamMap都將覆蓋先前通過setter方法指定的參數。
參數屬于Estimators和Transformers的特定實例。例如,如果有兩個LogisticRegression實例lr1和lr2,則可以ParamMap使用兩個maxIter參數指定來構建一個ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果在Pipeline中有兩個算法的maxIter參數,這將很有用。
ML persistence: Saving and Loading Pipelines
Often times it is worth it to save a model or a pipeline to disk for later use. In Spark 1.6, a model import/export functionality was added to the Pipeline API. As of Spark 2.3, the DataFrame-based API in spark.ml and pyspark.ml has complete coverage.
ML persistence works across Scala, Java and Python. However, R currently uses a modified format, so models saved in R can only be loaded back in R; this should be fixed in the future and is tracked in SPARK-15572.
通常,將模型或管道保存到磁盤以供以后使用是值得的。在Spark 1.6中,模型導入/導出功能已添加到管道API。從Spark 2.3開始,基于DataFrame的APIspark.ml并pyspark.ml已全面介紹。
ML持久性可跨Scala,Java和Python使用。但是,R當前使用修改后的格式,因此保存在R中的模型只能重新加載到R中。以后應該修復此問題,并在SPARK-15572中進行跟蹤。
Backwards compatibility for ML persistence
In general, MLlib maintains backwards compatibility for ML persistence. I.e., if you save an ML model or Pipeline in one version of Spark, then you should be able to load it back and use it in a future version of Spark. However, there are rare exceptions, described below.
Model persistence: Is a model or Pipeline saved using Apache Spark ML persistence in Spark version X loadable by Spark version Y?
? Major versions: No guarantees, but best-effort.
? Minor and patch versions: Yes; these are backwards compatible.
? Note about the format: There are no guarantees for a stable persistence format, but model loading itself is designed to be backwards compatible.
Model behavior: Does a model or Pipeline in Spark version X behave identically in Spark version Y?
? Major versions: No guarantees, but best-effort.
? Minor and patch versions: Identical behavior, except for bug fixes.
For both model persistence and model behavior, any breaking changes across a minor version or patch version are reported in the Spark version release notes. If a breakage is not reported in release notes, then it should be treated as a bug to be fixed.
通常,MLlib保持向后兼容性以實現ML持久性。即,如果在一個版本的Spark中保存ML模型或管道,則應該能夠將其加載回去并在以后的Spark版本中使用。但是,有極少數例外,如下所述。
模型持久性:是否可以通過Y版本的Spark加載使用X版本,X中的Apache Spark ML持久性保存的模型或管道?
? 主要版本:不保證,但是盡力而為。
? 次要版本和補丁程序版本:這些是向后兼容的。
? 關于格式的注意事項:無法保證穩定的持久性格式,但是模型加載本身被設計為向后兼容。
模型行為:Spark版本X中的模型或管道在Spark版本Y中的行為是否相同?
? 主要版本:不保證,但是盡力而為。
? 次要版本和修補程序版本:除錯誤修復外,行為相同。
對于模型持久性和模型行為,Spark版本發行說明中都會報告次要版本或修補程序版本中的所有重大更改。如果發行說明中未報告損壞,則應將其視為要修復的錯誤。
Code examples
This section gives code examples illustrating the functionality discussed above. For more info, please refer to the API documentation (Scala, Java, and Python).
Example: Estimator, Transformer, and Param
This example covers the concepts of Estimator, Transformer, and Param.
? Scala
? Java
? Python
Refer to the Estimator Scala docs, the Transformer Scala docs and the Params Scala docs for details on the API.
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF(“label”, “features”)
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> “myProbability”) // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF(“label”, “features”)
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the ‘features’ column.
// Note that model2.transform() outputs a ‘myProbability’ column instead of the usual
// ‘probability’ column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select(“features”, “label”, “myProbability”, “prediction”)
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, label)?>prob=label) -> prob=label)?>prob=prob, prediction=$prediction")
}
Find full example code at “examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala” in the Spark repo.
Example: Pipeline
This example follows the simple text document Pipeline illustrated in the figures above.
? Scala
? Java
? Python
Refer to the Pipeline Scala docs for details on the API.
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, “a b c d e spark”, 1.0),
(1L, “b d”, 0.0),
(2L, “spark f g h”, 1.0),
(3L, “hadoop mapreduce”, 0.0)
)).toDF(“id”, “text”, “label”)
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol(“text”)
.setOutputCol(“words”)
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol(“features”)
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, “spark i j k”),
(5L, “l m n”),
(6L, “spark hadoop spark”),
(7L, “apache hadoop”)
)).toDF(“id”, “text”)
// Make predictions on test documents.
model.transform(test)
.select(“id”, “text”, “probability”, “prediction”)
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, text)??>prob=text) --> prob=text)??>prob=prob, prediction=$prediction")
}
Find full example code at “examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala” in the Spark repo.
Model selection (hyperparameter tuning)
A big benefit of using ML Pipelines is hyperparameter optimization. See the ML Tuning Guide for more information on automatic model selection.
總結
以上是生活随笔為你收集整理的ML Pipelines管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 特征提取,转换和选择
- 下一篇: Structured Streaming