spark.mllib:GradientBoostedTrees
Gradient-Boosted Trees(GBT或者GBDT) 和 RandomForests 都屬于集成學習的范疇,相比于單個模型有限的表達能力,組合多個base model后表達能力更加豐富。
關于集成學習的理論知識,包括GBT和Random Forests的一些比較好的參考資料:
周志華教授的"Ensemble Methods: Foundations and Algorithms",系統的介紹了集成學習的理論及方法
Random Forests
Greedy Function Approximation: A GradientBoosting Machine
Stochastic GradientBoosting,Spark GBT實現所參考的算法
?
GBT和Random Forests二者的區別:
二者的理論思想在spark.mllib源碼閱讀-bagging方法中從模型的方差和偏差的角度做了一些簡要的介紹,在Spark官方文檔上也有一段關于二者優劣的描述:
1、GBT比RandomForests的訓練成本更高,原因在于GBT中各個模型以序列串行的方式進行訓練,通常我們說的并行GBT是指base model的并行訓練,各個base model之間是無法做到并行的。而Random Forests
中各個子模型之間可以做到并行化。
2、Random Forests的base model越多越有助于降低過擬合,而GBT中base model越多會提高過擬合的程度。
3、二者訓練的時間成本不同,因此調參的成本不同。有限的時間內Random Forests可以實驗更多的參數組合。
4、通常來看,Random Forests的base model會得到一棵規模適中的樹,而GBT為了降低在basemodel數量多時引發的過擬合,會限制其base model的規模。
?
下面來看看Spark中GBT的實現過程,主要包括3部分:GBT模型、GBT參數配置、GBT訓練算法:
GradientBoostedTrees:
GBT的實現過程由GradientBoostedTrees類驅動并向用戶暴露模型的訓練方法。GradientBoostedTrees的2個關鍵方法是train和run,在run中,根據用戶定義的模型配置類boostingStrategy來調用ml包下的GradientBoostedTrees類進行模型的訓練,最后根據訓練得到的參數來新建一個GradientBoostedTreesModel:
def train(
? ? ? ? ? ?input: RDD[LabeledPoint],
? ? ? ? ? ?boostingStrategy: BoostingStrategy): GradientBoostedTreesModel = {
? new GradientBoostedTrees(boostingStrategy, seed = 0).run(input)
? }
def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel = {
? val algo = boostingStrategy.treeStrategy.algo
? //import org.apache.spark.ml.tree.impl.{GradientBoostedTrees => NewGBT}
? val (trees, treeWeights) = NewGBT.run(input.map { point =>
? ? NewLabeledPoint(point.label, point.features.asML)
? }, boostingStrategy, seed.toLong)
? new GradientBoostedTreesModel(algo, trees.map(_.toOld), treeWeights)
}
GradientBoostedTreesModel:
GradientBoostedTreesModel用來保存訓練后的模型,其繼承自TreeEnsembleModel。各個Base model保存在trees數組中,每個base model的權重在treeWeights數組中,
其父類TreeEnsembleModel實現的predict方法即是對各個base model的預測值加權treeWeights 得到最終的預測值。
class GradientBoostedTreesModel @Since("1.2.0") (
? ? @Since("1.2.0") override val algo: Algo, //模型算法:分類 or 回歸
? ? @Since("1.2.0") override val trees: Array[DecisionTreeModel], //base model的數組
? ? @Since("1.2.0") override val treeWeights: Array[Double]) //每個base model的權重
? extends TreeEnsembleModel(algo, trees, treeWeights, combiningStrategy = Sum)
BoostingStrategy
GBT的配置信息類,可配置的信息包括
treeStrategy:base tree的配置信息
Loss:損失函數,默認參數為2分類問題用LogLoss, 回歸問題用SquaredError
numIterations:GBT的迭代次數,默認值為100
learningRate:學習速率,默認值為0.1
validationTol:通過驗證集判斷訓練終止的條件:驗證集上歷史最小的殘差 - 驗證集當前殘差 < validationTol*max(驗證集當前殘差, 0.01) 即提前終止訓練
在訓練GBT時,base tree的參數設置也很重要,base tree的參數由Strategy類維護,Strategy的默認值如下,在訓練GBT時,務必要重新設置Strategy的值,這里我對可以設定的值都做了備注,方便初次使用的同學進行調參:
@Since("1.0.0") @BeanProperty var algo: Algo,//算法的類別:分類還是回歸 {Classification、Regression}
@Since("1.0.0") @BeanProperty var impurity: Impurity,//計算信息增益的準則 分類{基尼指數、信息增益} 回歸{impurity.Variance}
@Since("1.0.0") @BeanProperty var maxDepth: Int, //樹的最大深度
@Since("1.2.0") @BeanProperty var numClasses: Int = 2,//類別數
@Since("1.0.0") @BeanProperty var maxBins: Int = 32,//連續特征離散化的分箱數
@Since("1.0.0") @BeanProperty var quantileCalculationStrategy: QuantileStrategy = Sort,//計算分裂點的算法,待定
@Since("1.0.0") @BeanProperty var categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](),//存儲每個分類特征的值數目
@Since("1.2.0") @BeanProperty var minInstancesPerNode: Int = 1,//子結點擁有的最小樣本實例數,一個終止條件
@Since("1.2.0") @BeanProperty var minInfoGain: Double = 0.0,//最小的信息增益值,這個應該是用來控制迭代終止的
@Since("1.0.0") @BeanProperty var maxMemoryInMB: Int = 256,//聚合使用的內存大小。待定
@Since("1.2.0") @BeanProperty var subsamplingRate: Double = 1,//用于訓練數據的抽樣率
@Since("1.2.0") @BeanProperty var useNodeIdCache: Boolean = false,//待定
@Since("1.2.0") @BeanProperty var checkpointInterval: Int = 10 //checkpoint
模型的損失函數在BoostingStrategy類中自動設置,在二分類模型中損失函數被定義為LogLoss(對數損失函數)、在回歸問題中損失函數被定義為SquaredError(平方損失函數)。在Spark2.1.0版本中還沒有實現對多分類GBT的損失函數及多分類GBT模型。對于自定義損失函數,需要繼承org.apache.spark.mllib.tree.loss.Loss這個類,并覆寫gradient和computeError方法。
GradientBoostedTrees:
GradientBoostedTrees類是Spark訓練GBT模型參數的類,模型的訓練主要分為2步:1、將分類問題轉化為回歸問題,在GradientBoostedTrees的run方法中完成:
def run(
? ? input: RDD[LabeledPoint],
? ? boostingStrategy: OldBoostingStrategy,
? ? seed: Long): (Array[DecisionTreeRegressionModel], Array[Double]) = {
? val algo = boostingStrategy.treeStrategy.algo
? //都轉化為回歸問題
? algo match {
? ? case OldAlgo.Regression => GradientBoostedTrees.boost(input, input, boostingStrategy, validate = false, seed)
? ? case OldAlgo.Classification =>
? ? ? // Map labels to -1, +1 so binary classification can be treated as regression.
? ? ? val remappedInput = input.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
? ? ? GradientBoostedTrees.boost(remappedInput, remappedInput, boostingStrategy, validate = false, seed)
? ? case _ => throw new IllegalArgumentException(s"$algo is not supported by gradient boosting.")
? }
}
2、問題統一轉化為回歸問題后,調用GradientBoostedTrees的boost進行參數的訓練,看一下整個訓練過程的核心代碼(在源碼的基礎上有刪減):
// Initialize gradient boosting parameters
val numIterations = boostingStrategy.numIterations //總的迭代次數,決定了生成
val baseLearners = new Array[DecisionTreeRegressionModel](numIterations) //保存每次迭代的base模型的數組
val baseLearnerWeights = new Array[Double](numIterations)//模型權重?
val loss = boostingStrategy.loss //定義的損失函數
val learningRate = boostingStrategy.learningRate
// Prepare strategy for individual trees, which use regression with variance impurity.
val treeStrategy = boostingStrategy.treeStrategy.copy
val validationTol = boostingStrategy.validationTol
treeStrategy.algo = OldAlgo.Regression //org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
treeStrategy.impurity = OldVariance
treeStrategy.assertValid()
// Cache input
val persistedInput = if (input.getStorageLevel == StorageLevel.NONE) {
? input.persist(StorageLevel.MEMORY_AND_DISK)
? true
} else {
? false
}
// Prepare periodic checkpointers 定期Checkpointer
val predErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)](
? treeStrategy.getCheckpointInterval, input.sparkContext)
val validatePredErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)](
? treeStrategy.getCheckpointInterval, input.sparkContext)
?
val firstTree = new DecisionTreeRegressor().setSeed(seed)
//實際是用隨機森林訓練的一棵樹,GBT中樹的深度通常較小
//RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), instr = Some(instr), parentUID = Some(uid))
val firstTreeModel = firstTree.train(input, treeStrategy)
val firstTreeWeight = 1.0
baseLearners(0) = firstTreeModel
baseLearnerWeights(0) = firstTreeWeight
//(預測值,誤差值)
//如改成多分類的話應該是(list<pred>, list<Error>) 即每棵樹的預測值和誤差值
var predError: RDD[(Double, Double)] =
? computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss)
predErrorCheckpointer.update(predError)
var validatePredError: RDD[(Double, Double)] =
? computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss)
if (validate) validatePredErrorCheckpointer.update(validatePredError)
var bestValidateError = if (validate) validatePredError.values.mean() else 0.0
var bestM = 1
var m = 1
var doneLearning = false
while (m < numIterations && !doneLearning) {
? // Update data with pseudo-residuals
? //predError (預測值,誤差值) 預測值是前m-1輪的預測值之和,誤差值為lable-預測值
? //如改成多分類的話 此時該樣本的loss即可以用logitloss來表示,并對f1~fk都可以算出一個梯度,f1~fk便可以計算出當前輪的殘差,供下一輪迭代學習。
? val data = predError.zip(input).map { case ((pred, _), point) =>
? ? LabeledPoint(-loss.gradient(pred, point.label), point.features)//
? }
? val dt = new DecisionTreeRegressor().setSeed(seed + m)
? val model = dt.train(data, treeStrategy)//訓練下一個base model
? // Update partial model
? baseLearners(m) = model
? // Note: The setting of baseLearnerWeights is incorrect for losses other than SquaredError.
? // ? ? ? Technically, the weight should be optimized for the particular loss.
? // ? ? ? However, the behavior should be reasonable, though not optimal.
? // 這里learningRate是一個固定值,沒有使用shrinkage技術
? baseLearnerWeights(m) = learningRate // learningRate同時作為model的權重
? predError = updatePredictionError(
? ? input, predError, baseLearnerWeights(m), baseLearners(m), loss)
? predErrorCheckpointer.update(predError)
? if (validate) {//驗證集,驗證是否提前終止訓練
? ? // Stop training early if
? ? // 1. Reduction in error is less than the validationTol or
? ? // 2. If the error increases, that is if the model is overfit.
? ? // We want the model returned corresponding to the best validation error.
? ? validatePredError = updatePredictionError(
? ? ? validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss)
? ? validatePredErrorCheckpointer.update(validatePredError)
? ? val currentValidateError = validatePredError.values.mean()
? ? if (bestValidateError - currentValidateError < validationTol * Math.max(
? ? ? currentValidateError, 0.01)) {
? ? ? doneLearning = true
? ? } else if (currentValidateError < bestValidateError) {
? ? ? bestValidateError = currentValidateError
? ? ? bestM = m + 1
? ? }
? }
? m += 1
}
GBT的訓練是一個串行的過程,base treemodel在前一輪迭代殘差的基礎上逐棵生成。每次生成一棵樹之后需要更新整個數據集的殘差,再進行下一輪的訓練。在數據集規模較大,并且迭代輪次比較多時,訓練比較耗時,這在一定程度上增加了模型調參的成本。
?
截至Spark2.0.0,Spark的GBT模型比較初級,在分類問題上目前只支持2分類問題,梯度下降的過程控制也比較簡單,難于適應一些精度要求高的的機器學習任務,因此目前版本下的Spark來做GBT模型并不是一個好的選擇。相比較而言,XGBOOST是一個更好的選擇,當然,有條件的情況下順著Spark GBT的思路做一些改進也能達到接近的效果。
————————————————
版權聲明:本文為CSDN博主「大愚若智_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/70240989
總結
以上是生活随笔為你收集整理的spark.mllib:GradientBoostedTrees的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 胆囊炎能不能吃吊瓜子
- 下一篇: Tensorflow 入门教程