spark.mllib:Optimizer
Spark中的求解器,根據輸入的訓練數據及設定的迭代次數、正則化項、參數收斂精度等進行迭代求解模型的參數。Spark內部實現來兩類求解器,基于隨機梯度下降(miniBatch選取樣本)的GradientDescent、基于大規模數值優化算法的LBFGS。
在整體架構上,兩個類都繼承自Optimizer,并需要調用Gradient和Updater
GradientDescent
GradientDescent是對隨機梯度下降算法封裝的一個求解器,通過runMiniBatchSGD方法實現模型參數的迭代計算,基本流程是:
1、根據miniBatchFraction參數進行樣本抽樣,獲得一個小樣本集
2、調用Gradient計算在小樣本集上的梯度值
3、調用Updater,根據regParam、stepSize、numIterations等參數值更新模型參數
4、判斷終止條件(精度收斂或者迭代次數達到上限),否則繼續上面步驟。
核心代碼如下;
while (!converged && i <= numIterations) {
? //將參數廣播到各臺機器上,實際上是集群下模型參數的共享和同步
? val bcWeights = data.context.broadcast(weights)
? // Sample a subset (fraction miniBatchFraction) of the total data
? // compute and sum up the subgradients on this subset (this is one map-reduce)
? //在各分區上調用seqOp計算梯度值、誤差值
? //調用combOp對各分區計算的結果進行聚合
? //這樣得到的是各分區計算得到的梯度值得總和,后面會利用miniBatchSize計算平均梯度并傳入updater進行更新
? val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
? ? .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
? ? ? seqOp = (c, v) => {
? ? ? ? // c: (grad, loss, count), v: (label, features)
? ? ? ? val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
? ? ? ? (c._1, c._2 + l, c._3 + 1)
? ? ? },
? ? ? combOp = (c1, c2) => {
? ? ? ? // c: (grad, loss, count)
? ? ? ? (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
? ? ? })
?
? if (miniBatchSize > 0) {
? ? /**
? ? ?* lossSum is computed using the weights from the previous iteration
? ? ?* and regVal is the regularization value computed in the previous iteration as well.
? ? ?*/
? ? stochasticLossHistory += lossSum / miniBatchSize + regVal
? ? val update = updater.compute(
? ? ? weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
? ? ? stepSize, i, regParam)
? ? weights = update._1
? ? regVal = update._2
?
? ? previousWeights = currentWeights
? ? currentWeights = Some(weights)
? ? if (previousWeights != None && currentWeights != None) {
? ? ? converged = isConverged(previousWeights.get,
? ? ? ? currentWeights.get, convergenceTol)
? ? }
? } else {
? ? logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
? }
? i += 1
}
LBFGS
梯度下降法是線性逼近的,在靠近最優點時容易出現震蕩(即在最優點旁邊晃來晃去,就是取不到這個最優值),相對來說,牛頓法、擬牛頓法、LM方法等二階優化方法,由于非線性逼近的特性,收斂速度和收斂精度斗會高于梯度下降法,但這些方法都是相當消耗內存的,比如牛頓法,需要計算Hessian矩陣(二階偏導數矩陣)及其逆矩陣 ,如果模型參數是N維,則Hessian矩陣大小為N*N,存儲和計算逆矩陣斗相當困難,限制來在大規模機器學習中的應用。基于牛頓法和擬牛頓法,LBFGS在計算Hessian矩陣逆矩陣的時候做了一些近似工作。
LBFGS詳細可參考http://mlworks.cn/posts/introduction-to-l-bfgs/
Spark本身沒有實現LBFGS底層算法,而是調用來breeze包,Spark實現了損失函數CostFun,使用CachedDiffFunction類緩存最近的m次輸入變量和梯度變量的差值。
private class CostFun(
? data: RDD[(Double, Vector)],
? gradient: Gradient,
? updater: Updater,
? regParam: Double,
? numExamples: Long) extends DiffFunction[BDV[Double]] {
?
? override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
? ? // Have a local copy to avoid the serialization of CostFun object which is not serializable.
? ? val w = Vectors.fromBreeze(weights)
? ? val n = w.size
? ? val bcW = data.context.broadcast(w)
? ? val localGradient = gradient
? ? val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
? ? ? ? seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
? ? ? ? ? val l = localGradient.compute(
? ? ? ? ? ? features, label, bcW.value, grad)
? ? ? ? ? (grad, loss + l)
? ? ? ? },
? ? ? ? combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
? ? ? ? ? axpy(1.0, grad2, grad1)
? ? ? ? ? (grad1, loss1 + loss2)
? ? ? ? })
? ? // broadcasted model is not needed anymore
? ? bcW.destroy()
? ? val regVal = updater.compute(w, Vectors.zeros(n), 0, 1, regParam)._2
? ? val loss = lossSum / numExamples + regVal
? ? val gradientTotal = w.copy
? ? axpy(-1.0, updater.compute(w, Vectors.zeros(n), 1, 1, regParam)._1, gradientTotal)
? ? axpy(1.0 / numExamples, gradientSum, gradientTotal)
? ? (loss, gradientTotal.asBreeze.asInstanceOf[BDV[Double]])
? }
}
def runLBFGS(
? ? data: RDD[(Double, Vector)],
? ? gradient: Gradient,
? ? updater: Updater,
? ? numCorrections: Int,
? ? convergenceTol: Double,
? ? maxNumIterations: Int,
? ? regParam: Double,
? ? initialWeights: Vector): (Vector, Array[Double]) = {
? val lossHistory = mutable.ArrayBuilder.make[Double]
? val numExamples = data.count()
? val costFun =
? ? new CostFun(data, gradient, updater, regParam, numExamples)
? val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)
? val states =
? ? lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.asBreeze.toDenseVector)
? var state = states.next()
? while (states.hasNext) {
? ? lossHistory += state.value
? ? state = states.next()
? }
? lossHistory += state.value
? val weights = Vectors.fromBreeze(state.x)
? val lossHistoryArray = lossHistory.result()
? (weights, lossHistoryArray)
}
再回頭看LBFGS幾個內部參數就簡單明了,numCorrections即控制輸入變量和梯度變量的差值要緩存最近的多少次,convergenceTol即收斂精度。
?
分布式機器學習思想主要體現在
1、模型參數的共享和同步(使用Spark broadcast機制實現)。
2、分布式的梯度求解,先求解各分區上單個樣本的梯度值,再聚合得到總的梯度值(通過treeAggregate算子實現)。之后在Drvie端調用updater來更新模型參數。
————————————————
版權聲明:本文為CSDN博主「大愚若智_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/64920424
總結
以上是生活随笔為你收集整理的spark.mllib:Optimizer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark.mllib:bagging方
- 下一篇: 胆囊炎能不能吃鹅蛋