XGBoost4J-Spark基本原理
XGBoost4J-Spark基本原理
XGBoost4J-Spark是一個(gè)項(xiàng)目,旨在通過(guò)使XGBoost適應(yīng)Apache Spark的MLLIB框架,無(wú)縫集成XGBoost和Apache Spark。通過(guò)集成,用戶不僅可以使用XGBoost的高性能算法實(shí)現(xiàn),還可以利用Spark強(qiáng)大的數(shù)據(jù)處理引擎實(shí)現(xiàn)以下功能:
? 特征工程:特征提取,變換,降維和選擇等。
? 管道:構(gòu)造,評(píng)估和調(diào)整ML管道
? 持久性:持久化并加載機(jī)器學(xué)習(xí)模型,甚至整個(gè)管道
本文將介紹使用XGBoost4J-Spark構(gòu)建機(jī)器學(xué)習(xí)管道的端到端過(guò)程。討論
? 使用Spark預(yù)處理數(shù)據(jù)以適合XGBoost / XGBoost4J-Spark的數(shù)據(jù)接口
? 使用XGBoost4J-Spark訓(xùn)練XGBoost模型
? 使用Spark服務(wù)XGBoost模型(預(yù)測(cè))
? 使用XGBoost4J-Spark構(gòu)建機(jī)器學(xué)習(xí)管道
? 在生產(chǎn)中運(yùn)行XGBoost4J-Spark
筆記
XGBoost訓(xùn)練任務(wù)失敗時(shí),默認(rèn)情況下,SparkContext將停止。
XGBoost4J-Spark 1.2.0+公開(kāi)了一個(gè)參數(shù)kill_spark_context_on_worker_failure。將kill_spark_context_on_worker_failure設(shè)置為false,以使SparkContext在訓(xùn)練失敗時(shí)不會(huì)停止。XGBoost4J-Spark不會(huì)引發(fā)SparkContext,而是引發(fā)異常。想要重用SparkContext的用戶應(yīng)將訓(xùn)練代碼包裝在try-catch塊中。
? 使用XGBoost4J-Spark構(gòu)建ML應(yīng)用程序
o 參考XGBoost4J-Spark依賴關(guān)系
o 資料準(zhǔn)備
? 使用Spark的內(nèi)置讀取器讀取數(shù)據(jù)集
? 轉(zhuǎn)換原始虹膜數(shù)據(jù)集
? 處理缺失的價(jià)值
o 訓(xùn)練
? 提前停止
? 使用評(píng)估集進(jìn)行訓(xùn)練
o 預(yù)言
? 批量預(yù)測(cè)
? 單實(shí)例預(yù)測(cè)
o 模型持久性
? 模型和管道持久性
? 與XGBoost的其它綁定進(jìn)行交互
? 使用XGBoost4J-Spark構(gòu)建ML管道
o 基本ML管道
o 具有超參數(shù)調(diào)整功能的管道
? 在生產(chǎn)中運(yùn)行XGBoost4J-Spark
o 并行/分布式訓(xùn)練
o 幫派調(diào)度
o 訓(xùn)練中的檢查點(diǎn)
使用XGBoost4J-Spark構(gòu)建ML應(yīng)用程序
參考XGBoost4J-Spark依賴關(guān)系
在介紹如何使用XGBoost4J-Spark之前,應(yīng)該首先咨詢Maven存儲(chǔ)庫(kù)中的安裝,以便將XGBoost4J-Spark添加為項(xiàng)目的依賴項(xiàng)。提供穩(wěn)定的版本和快照。
筆記
XGBoost4J-Spark需要Apache Spark 2.4+
XGBoost4J-Spark現(xiàn)在需要Apache Spark 2.4+。XGBoost4J-Spark的最新版本廣泛使用了org.apache.spark.ml.param.shared的功能,以提供與Spark MLLIB框架的緊密集成,而這些功能在Spark的早期版本中并不完全可用。
另外,確保直接從Apache網(wǎng)站安裝Spark 。不能保證上游XGBoost可以與Spark的第三方發(fā)行版(例如Cloudera Spark)一起使用。咨詢適當(dāng)?shù)牡谌揭垣@取XGBoost的分發(fā)。
從Maven回購(gòu)安裝
筆記
在XGBoost4J-Spark中使用Python
默認(rèn)情況下,在dmlc-core中使用跟蹤器來(lái)驅(qū)動(dòng)XGBoost4J-Spark的訓(xùn)練。需要Python 2.7+。也有跟蹤器的實(shí)驗(yàn)斯卡拉版本,可以通過(guò)傳遞參數(shù)啟用tracker_conf為scala。
數(shù)據(jù)準(zhǔn)備
如前所述,XGBoost4J-Spark無(wú)縫集成了Spark和XGBoost。該集成使用戶可以通過(guò)便捷而強(qiáng)大的數(shù)據(jù)處理框架Spark,在訓(xùn)練/測(cè)試數(shù)據(jù)集上應(yīng)用各種類型的轉(zhuǎn)換。
在本節(jié)中,以虹膜數(shù)據(jù)集為例,展示如何使用Spark轉(zhuǎn)換原始數(shù)據(jù)集并使之適合XGBoost的數(shù)據(jù)接口。
虹膜數(shù)據(jù)集以CSV格式提供。每個(gè)實(shí)例都包含4個(gè)特征,即“分隔長(zhǎng)度”,“分隔寬度”,“花瓣長(zhǎng)度”和“花瓣寬度”。此外,包含“ class”列,該列實(shí)際上是帶有三個(gè)可能值的標(biāo)簽:“ Iris Setosa”,“ Iris Versicolour”和“ Iris Virginica”。
使用Spark的內(nèi)置讀取器讀取數(shù)據(jù)集
數(shù)據(jù)轉(zhuǎn)換的第一件事是將數(shù)據(jù)集作為Spark的結(jié)構(gòu)化數(shù)據(jù)抽象DataFrame加載。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val spark = SparkSession.builder().getOrCreate()
val schema = new StructType(Array(
StructField(“sepal length”, DoubleType, true),
StructField(“sepal width”, DoubleType, true),
StructField(“petal length”, DoubleType, true),
StructField(“petal width”, DoubleType, true),
StructField(“class”, StringType, true)))
val rawInput = spark.read.schema(schema).csv(“input_path”)
在第一行,創(chuàng)建一個(gè)SparkSession實(shí)例,它是任何與DataFrame一起使用的Spark程序的條目。該schema變量定義數(shù)據(jù)幀包裝虹膜數(shù)據(jù)的架構(gòu)。使用此顯式設(shè)置的架構(gòu),可以定義列的名稱及其類型。否則,列名將是Spark派生的默認(rèn)名稱,例如_col0等等。最后,可以使用Spark的內(nèi)置csv閱讀器將Iris csv文件作為名,為DataFrame的DataFrame加載rawInput。
Spark還包含許多其它格式的內(nèi)置閱讀器。最新版本的Spark支持CSV,JSON,Parquet和LIBSVM。
轉(zhuǎn)換原始虹膜數(shù)據(jù)集
為了使Iris數(shù)據(jù)集可識(shí)別為XGBoost,需要
- 將字符串型標(biāo)簽(即“類”)轉(zhuǎn)換為雙型標(biāo)簽。
- 將要素列組裝為向量,以適合Spark ML框架的數(shù)據(jù)接口。
要將String類型的標(biāo)簽轉(zhuǎn)換為Double,可以使用Spark的內(nèi)置功能轉(zhuǎn)換器StringIndexer。
import org.apache.spark.ml.feature.StringIndexer
val stringIndexer = new StringIndexer().
setInputCol(“class”).
setOutputCol(“classIndex”).
fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop(“class”)
使用新創(chuàng)建的StringIndexer實(shí)例: - 設(shè)置輸入列,即包含字符串類型標(biāo)簽的列
- 設(shè)置輸出列,即該列包含Double-typed標(biāo)簽。
- 然后使用fitStringIndex和輸入DataFrame rawInput,以便Spark內(nèi)部構(gòu)件可以獲取諸如不同值的總數(shù)之類的信息。
現(xiàn)在,有了一個(gè)StringIndexer,可以隨時(shí)將其應(yīng)用于輸入DataFrame。為了執(zhí)行StringIndexer的轉(zhuǎn)換邏輯,transform輸入DataFramerawInput并保持簡(jiǎn)潔的DataFrame,刪除“類”列,僅保留要素列和轉(zhuǎn)換后的Double-typed標(biāo)簽列(在上述代碼段的最后一行) )。
該fit和transform在MLLIB兩個(gè)關(guān)鍵操作。基本上,fit產(chǎn)生一個(gè)“轉(zhuǎn)換器”,例如StringIndexer,并且每個(gè)轉(zhuǎn)換器transform在DataFrame上應(yīng)用方法以添加包含轉(zhuǎn)換后的特征/標(biāo)簽或預(yù)測(cè)結(jié)果等的新列。
類似地,可以使用另一個(gè)轉(zhuǎn)換器VectorAssembler將特征列“分隔長(zhǎng)度”,“分隔寬度”,“花瓣長(zhǎng)度”和“花瓣寬度”組合為向量。
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().
setInputCols(Array(“sepal length”, “sepal width”, “petal length”, “petal width”)).
setOutputCol(“features”)
val xgbInput = vectorAssembler.transform(labelTransformed).select(“features”, “classIndex”)
現(xiàn)在,有一個(gè)僅包含兩列的DataFrame,“功能”包含矢量表示的“分隔長(zhǎng)度”,“分隔寬度”,“花瓣長(zhǎng)度”和“花瓣寬度”以及帶有雙類型標(biāo)簽的“ classIndex”。這樣的DataFrame(包含向量表示的特征和數(shù)字標(biāo)簽)可以直接饋送到XGBoost4J-Spark的訓(xùn)練引擎。
與遺漏值處理
XGBoost默認(rèn)支持缺失值。如果給定SparseVector,則XGBoost會(huì)將SparseVector缺少的任何值視為丟失。還可以指定XGBoost將數(shù)據(jù)集中的特定值視為缺少的值。默認(rèn)情況下,XGBoost會(huì)將NaN視為表示缺失的值。
在XGBoostClassifier中將缺失值(例如-999)設(shè)置為“ missing”參數(shù)的示例:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map(“eta” -> 0.1f,
“missing” -> -999,
“objective” -> “multi:softprob”,
“num_class” -> 3,
“num_round” -> 100,
“num_workers” -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol(“features”).
setLabelCol(“classIndex”)
筆記
Spark的VectorAssembler缺少值
如果給定具有足夠值為0的特征的數(shù)據(jù)集,Spark的VectorAssembler轉(zhuǎn)換器類將返回SparseVector,其中不存在的值表示為0。這與XGBoost的默認(rèn)值(將SparseVector缺少的值視為丟失)相沖突。該模型將有效地將0視為缺失,但不會(huì)聲明為0,這會(huì)在其它平臺(tái)上使用經(jīng)過(guò)訓(xùn)練的模型時(shí)導(dǎo)致混淆。為避免這種情況,如果XGBoost收到SparseVector并且“ missing”參數(shù)未明確設(shè)置為0,將引發(fā)異常。要解決此問(wèn)題,用戶可以使用以下三個(gè)選項(xiàng):
1.將從VectorAssembler返回的Vector顯式轉(zhuǎn)換為DenseVector,以將零返回到數(shù)據(jù)集。如果使用缺少的編碼為NaN的值來(lái)執(zhí)行此操作,則需要在VectorAssembler上進(jìn)行設(shè)置,以將NaN值保留在數(shù)據(jù)集中。然后,可以將“ missing”參數(shù)設(shè)置為任何希望被視為丟失的參數(shù)。但是,如果數(shù)據(jù)集非常稀疏,這可能會(huì)導(dǎo)致大量的內(nèi)存使用。例如:setHandleInvalid = “keep”
val匯編程序= new VectorAssembler()。setInputCols(feature_names.toArray).setOutputCol(“ features”)。setHandleInvalid(“ keep”)
//使用Array()轉(zhuǎn)換為密集向量
val featurePipeline = new Pipeline()。setStages(Array(assembler))val featureModel = featurePipeline.fit(df_training)val featureDf = featureModel.transform(df_training)
val xgbParam = Map(“ eta”-> 0.1f,
“最大深度”-> 2,“目標(biāo)”->“ multi:softprob”,“ num_class”-> 3,“ num_round”-> 100,“ num_workers”-> 2,“ allow_non_zero_for_missing”->“ true”,“ missing” ”-> -999)
val xgb =新的XGBoostClassifier(xgbParam)val xgbclassifier = xgb.fit(featureDf)
2.在調(diào)用VectorAssembler之前,可以將要表示缺失的值轉(zhuǎn)換為不為0,NaN或Null的不規(guī)則值,并將“ missing”參數(shù)設(shè)置為0。理想情況下,應(yīng)將不規(guī)則值選擇為超出范圍功能所具有的價(jià)值。
3.不要使用VectorAssembler類,而應(yīng)使用自定義的構(gòu)造SparseVector的方式,該方式允許指定稀疏度以指示非零值。然后,可以將“ missing”參數(shù)設(shè)置為數(shù)據(jù)集中指示的任何稀疏性。如果采用這種方法,則可以傳遞參數(shù) 以繞過(guò)XGBoost的斷言,即斷言給定SparseVector時(shí)“丟失”必須為零。“allow_non_zero_for_missing_value” -> true
如果內(nèi)存限制不成問(wèn)題,則建議使用選項(xiàng)1。選項(xiàng)3需要更多的工作來(lái)進(jìn)行設(shè)置,但是可以保證為提供正確的結(jié)果,而選項(xiàng)2可以更快地進(jìn)行設(shè)置,但是可能很難找到與特征值不沖突的良好不規(guī)則值。
筆記
使用XGBoost的其它綁定時(shí),使用非默認(rèn)缺少的值。
當(dāng)XGBoost以本機(jī)格式保存時(shí),僅會(huì)保存booster本身,丟失的參數(shù)值不會(huì)與模型一起保存。如果在Spark中使用非默認(rèn)的缺少參數(shù)來(lái)訓(xùn)練模型,則在另一個(gè)綁定中使用保存的模型時(shí),用戶應(yīng)注意使用相同的缺少參數(shù)。
訓(xùn)練
XGBoost支持回歸和分類。雖然在本文中使用Iris數(shù)據(jù)集顯示了如何使用XGBoost / XGBoost4J-Spark解決多類分類問(wèn)題,但回歸中的用法與分類非常相似。
要訓(xùn)練XGBoost模型進(jìn)行分類,需要先聲明XGBoostClassifier:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map(“eta” -> 0.1f,
“max_depth” -> 2,
“objective” -> “multi:softprob”,
“num_class” -> 3,
“num_round” -> 100,
“num_workers” -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol(“features”).
setLabelCol(“classIndex”)
可以找到用于訓(xùn)練XGBoost模型的可用參數(shù)。在XGBoost4J-Spark中,不僅支持默認(rèn)參數(shù)集,還支持這些參數(shù)的駝峰式變體,以與Spark的MLLIB參數(shù)保持一致。
每個(gè)參數(shù)在XGBoost4J-Spark(帶有駝峰式大小寫(xiě))中都具有等效形式。例如,要max_depth為每棵樹(shù)進(jìn)行設(shè)置,可以像在上面的代碼片段(max_depth包裝在Map中)中那樣傳遞參數(shù),也可以通過(guò)XGBoostClassifer中的setter進(jìn)行設(shè)置:
val xgbClassifier = new XGBoostClassifier().
setFeaturesCol(“features”).
setLabelCol(“classIndex”)
xgbClassifier.setMaxDepth(2)
設(shè)置XGBoostClassifier參數(shù)和功能/標(biāo)簽列后,可以通過(guò)將XGBoostClassifier與輸入DataFrame擬合來(lái)構(gòu)建轉(zhuǎn)換器XGBoostClassificationModel。此fit操作本質(zhì)上是訓(xùn)練過(guò)程,然后可以將生成的模型用于預(yù)測(cè)。
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
提前停止
提前停止是一項(xiàng)功能,可以防止不必要的訓(xùn)練重復(fù)。通過(guò)指定num_early_stopping_rounds或直接調(diào)用setNumEarlyStoppingRoundsXGBoostClassifier或XGBoostRegressor,如果評(píng)估指標(biāo)偏離最佳迭代和提前停止訓(xùn)練迭代,可以定義輪數(shù)。
關(guān)于自定義評(píng)估指標(biāo),除了num_early_stopping_rounds,還需要定義maximize_evaluation_metrics或調(diào)用setMaximizeEvaluationMetrics,以指定是否要在訓(xùn)練中最大化或最小化指標(biāo)。對(duì)于內(nèi)置的評(píng)估指標(biāo),XGBoost4J-Spark將自動(dòng)選擇方向。
例如,需要最大化評(píng)估指標(biāo)(設(shè)置maximize_evaluation_metrics為true),并設(shè)置num_early_stopping_rounds為5。第10次迭代的評(píng)估指標(biāo)是迄今為止最大的評(píng)估指標(biāo)。在接下來(lái)的迭代中,如果沒(méi)有大于第10次迭代(最佳)的評(píng)估指標(biāo),則轉(zhuǎn)換將在第15次迭代時(shí)提前停止。
使用評(píng)估集進(jìn)行訓(xùn)練
還可以在訓(xùn)練期間使用多個(gè)評(píng)估數(shù)據(jù)集監(jiān)視模型的性能。通過(guò)指定eval_sets或調(diào)用setEvalSetsXGBoostClassifier或XGBoostRegressor,可以傳入多個(gè)評(píng)估數(shù)據(jù)集,這些評(píng)估數(shù)據(jù)集的類型是從String到DataFrame的Map。
預(yù)測(cè)
XGBoost4j-Spark支持兩種模型服務(wù)方式:批處理預(yù)測(cè)和單實(shí)例預(yù)測(cè)。
批量預(yù)測(cè)
當(dāng)?shù)玫揭粋€(gè)模型(XGBoostClassificationModel或XGBoostRegressionModel)時(shí),它將獲取一個(gè)DataFrame,讀取包含特征向量的列,為每個(gè)特征向量進(jìn)行預(yù)測(cè),并默認(rèn)輸出包含以下列的新DataFrame:
? XGBoostClassificationModel將為每個(gè)可能的標(biāo)簽輸出邊距(rawPredictionCol),概率(probabilityCol)和最終的預(yù)測(cè)標(biāo)簽(predictionCol)。
? XGBoostRegressionModel將輸出預(yù)測(cè)標(biāo)簽(predictionCol)。
批量預(yù)測(cè)期望用戶以DataFrame的形式通過(guò)測(cè)試集。XGBoost4J-Spark為DataFrame的每個(gè)分區(qū)啟動(dòng)一個(gè)XGBoost工作程序以進(jìn)行并行預(yù)測(cè),并批量生成整個(gè)DataFrame的預(yù)測(cè)結(jié)果。
val xgbClassificationModel = xgbClassifier.fit(xgbInput)
val results = xgbClassificationModel.transform(testSet)
使用上面的代碼片段,得到一個(gè)結(jié)果DataFrame,結(jié)果包含邊距,每個(gè)類的概率以及每個(gè)實(shí)例的預(yù)測(cè)
單實(shí)例預(yù)測(cè)
XGBoostClassificationModel或XGBoostRegressionModel支持也可以對(duì)單個(gè)實(shí)例進(jìn)行預(yù)測(cè)。它接受單個(gè)Vector作為特征,并輸出預(yù)測(cè)標(biāo)簽。
但是,由于XGBoost的內(nèi)部開(kāi)銷,單實(shí)例預(yù)測(cè)的開(kāi)銷很高,謹(jǐn)慎使用!
val features = xgbInput.head().getAsVector
val result = xgbClassificationModel.predict(features)
模型持久
型號(hào)和管道持久性
數(shù)據(jù)科學(xué)家將生成一個(gè)ML模型,并將其移交給工程團(tuán)隊(duì)以在生產(chǎn)環(huán)境中進(jìn)行部署。相反,在數(shù)據(jù)探索過(guò)程中,數(shù)據(jù)科學(xué)家可以使用經(jīng)過(guò)訓(xùn)練的模型,例如作為基準(zhǔn)。重要的是要支持模型持久性,以使模型可以跨使用場(chǎng)景和編程語(yǔ)言使用。
XGBoost4j-Spark支持保存和加載XGBoostClassifier / XGBoostClassificationModel和XGBoostRegressor / XGBoostRegressionModel。它還支持保存和加載包含這些估計(jì)器和模型的ML管道。
可以將XGBoostClassificationModel保存到文件系統(tǒng):
val xgbClassificationModelPath = “/tmp/xgbClassificationModel”
xgbClassificationModel.write.overwrite().save(xgbClassificationModelPath)
然后在另一個(gè)會(huì)話中加載模型:
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel
val xgbClassificationModel2 = XGBoostClassificationModel.load(xgbClassificationModelPath)
xgbClassificationModel2.transform(xgbInput)
關(guān)于ML管道的保存和加載,參閱下一節(jié)。
與XGBoost的其它綁定交互
在大型數(shù)據(jù)集上使用XGBoost4j-Spark訓(xùn)練模型后,有時(shí)想在單機(jī)中進(jìn)行模型服務(wù)或?qū)⑵渑c其它單節(jié)點(diǎn)庫(kù)集成以進(jìn)行進(jìn)一步處理。XGBoost4j-Spark通過(guò)以下方式支持將模型導(dǎo)出到本地:
val nativeModelPath = “/tmp/nativeModel”
xgbClassificationModel.nativeBooster.saveModel(nativeModelPath)
然后,可以使用單節(jié)點(diǎn)Python XGBoost加載此模型:
import xgboost as xgb
bst = xgb.Booster({‘nthread’: 4})
bst.load_model(nativeModelPath)
筆記
使用HDFS和S3通過(guò)nativeBooster.saveModel()導(dǎo)出模型
與其它語(yǔ)言綁定進(jìn)行交互時(shí),XGBoost還支持將模型保存到本地文件系統(tǒng)以及從本地文件系統(tǒng)加載模型。可以通過(guò)分別在路徑前面加上hdfs://和來(lái)使用HDFS和S3 s3://。然而,對(duì)于這種能力,必須做一個(gè)如下:
- 使用描述的步驟構(gòu)建XGBoost4J-Spark ,但是打開(kāi)USE_HDFS(或USE_S3等)。使用這種方法,可以通過(guò)用HDFS路徑替換“ nativeModelPath”來(lái)重用上面的代碼示例。
o 如果使用USE_HDFS等進(jìn)行構(gòu)建,則必須確保將涉及的共享庫(kù)文件(例如libhdfs.so)放入群集的LIBRARY_PATH中。為避免復(fù)雜的集群環(huán)境配置,選擇另一個(gè)選項(xiàng)。 - 使用HDFS,S3等的綁定來(lái)傳遞模型文件。步驟如下(以HDFS為例):
o 創(chuàng)建一個(gè)新文件
o val outputStream = fs.create(“hdfs_path”)
其中“ fs”是Hadoop中org.apache.hadoop.fs.FileSystem類的實(shí)例。
o 第一步,將返回的OutputStream傳遞給nativeBooster.saveModel():
o xgbClassificationModel.nativeBooster.saveModel(outputStream)
o 從HDFS下載其它語(yǔ)言的文件,并加載XGBoost的預(yù)構(gòu)建版本(無(wú)需libhdfs.so)。(函數(shù)“ download_from_hdfs”是用戶要實(shí)現(xiàn)的輔助函數(shù))
o import xgboost as xgb
o bst = xgb.Booster({‘nthread’: 4})
o local_path = download_from_hdfs(“hdfs_path”)
o bst.load_model(local_path)
筆記
XGBoost4J-Spark與其它綁定之間的一致性問(wèn)題
XGBoost4J-Spark與XGBoost的其它語(yǔ)言綁定之間存在一致性問(wèn)題。
當(dāng)用戶使用Spark通過(guò)以下代碼片段以LIBSVM格式加載訓(xùn)練/測(cè)試數(shù)據(jù)時(shí):
spark.read.format(“l(fā)ibsvm”).load(“trainingset_libsvm”)
Spark假定數(shù)據(jù)集正在使用基于1的索引(以1開(kāi)頭的功能索引)。但是,當(dāng)使用XGBoost的其它綁定(例如XGBoost的Python API)進(jìn)行預(yù)測(cè)時(shí),默認(rèn)情況下,XGBoost假定數(shù)據(jù)集使用基于0的索引(功能索引從0開(kāi)始)。它為使用Spark訓(xùn)練模型但在XGBoost的其它綁定中使用相同格式的數(shù)據(jù)集進(jìn)行預(yù)測(cè)的用戶帶來(lái)了陷阱。解決方案是在使用Python API進(jìn)行預(yù)測(cè)之前,或者?indexing_mode=1在使用DMatirx加載時(shí)追加到文件路徑之前,先將數(shù)據(jù)集轉(zhuǎn)換為基于0的索引。例如在Python中:
xgb.DMatrix(‘test.libsvm?indexing_mode=1’)
使用XGBoost4J-Spark構(gòu)建ML管道
基本ML管道
Spark ML管道可以將多種算法或功能組合到一個(gè)管道中。它涵蓋了從特征提取,轉(zhuǎn)換,選擇到模型訓(xùn)練和預(yù)測(cè)的各個(gè)方面。XGBoost4j-Spark使將XGBoost無(wú)縫地嵌入到這樣的管道中變得可行。以下示例顯示了如何構(gòu)建由Spark MLlib功能轉(zhuǎn)換器和XGBoostClassifier估計(jì)器組成的管道。
仍然使用虹膜數(shù)據(jù)集和rawInputDataFrame。首先,需要將數(shù)據(jù)集分為訓(xùn)練和測(cè)試數(shù)據(jù)集。
val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)
建立了ML管道,其中包括4個(gè)階段:
? 將所有要素組合到單個(gè)向量列中。
? 從字符串標(biāo)簽到索引雙標(biāo)簽。
? 使用XGBoostClassifier訓(xùn)練分類模型。
? 將索引的雙標(biāo)簽轉(zhuǎn)換回原始字符串標(biāo)簽。
已經(jīng)在前面的部分中顯示了前三個(gè)步驟,最后一步是使用新的轉(zhuǎn)換器IndexToString完成的:
val labelConverter = new IndexToString()
.setInputCol(“prediction”)
.setOutputCol(“realLabel”)
.setLabels(stringIndexer.labels)
需要將這些步驟組織為Spark ML框架中的Pipeline,并評(píng)估整個(gè)管道以獲得PipelineModel:
import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, stringIndexer, booster, labelConverter))
val model = pipeline.fit(training)
獲得PipelineModel之后,可以對(duì)測(cè)試數(shù)據(jù)集進(jìn)行預(yù)測(cè)并評(píng)估模型的準(zhǔn)確性。
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val prediction = model.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(prediction)
具有超參數(shù)調(diào)優(yōu)的管道
最大化XGBoost功能的最關(guān)鍵操作是為模型選擇最佳參數(shù)。手動(dòng)調(diào)整參數(shù)是一個(gè)繁瑣且費(fèi)力的過(guò)程。使用最新版本的XGBoost4J-Spark,可以利用Spark模型選擇工具來(lái)自動(dòng)執(zhí)行此過(guò)程。
以下示例顯示了使用CrossValidation和MulticlassClassificationEvaluator搜索兩個(gè)XGBoost參數(shù)max_depth和的最佳組合的代碼段eta。
選擇產(chǎn)生由MulticlassClassificationEvaluator定義的最大精度的模型,并將其用于生成測(cè)試集的預(yù)測(cè)。
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.PipelineModel
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel
val paramGrid = new ParamGridBuilder()
.addGrid(booster.maxDepth, Array(3, 8))
.addGrid(booster.eta, Array(0.2, 0.6))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = cv.fit(training)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
.asInstanceOf[XGBoostClassificationModel]
bestModel.extractParamMap()
運(yùn)行XGBoost4J-Spark在生產(chǎn)
XGBoost4J-Spark是使XGBoost輕松進(jìn)入生產(chǎn)環(huán)境的最重要步驟之一。在本節(jié)中,介紹了三個(gè)在生產(chǎn)環(huán)境中運(yùn)行XGBoost4J-Spark的關(guān)鍵功能。
并行/分布式訓(xùn)練
訓(xùn)練數(shù)據(jù)集的龐大規(guī)模是生產(chǎn)環(huán)境中最重要的特征之一。為了確保XGBoost的訓(xùn)練能夠隨數(shù)據(jù)大小擴(kuò)展,XGBoost4J-Spark將Spark的分布式/并行處理框架與XGBoost的并行/分布式訓(xùn)練機(jī)制架起了橋梁。
在XGBoost4J-Spark中,每個(gè)XGBoost工作程序都由一個(gè)Spark任務(wù)包裝,并且Spark內(nèi)存空間中的訓(xùn)練數(shù)據(jù)集以對(duì)用戶透明的方式饋送給XGBoost工作人員。
在構(gòu)建XGBoostClassifier的代碼段中,設(shè)置參數(shù)num_workers(或numWorkers)。此參數(shù)控制在訓(xùn)練XGBoostClassificationModel時(shí)希望擁有多少并行工作器。
筆記
關(guān)于OpenMP優(yōu)化
默認(rèn)情況下,為每個(gè)XGBoost工作者分配一個(gè)核心。每個(gè)XGBoost工作程序中的OpenMP優(yōu)化不會(huì)生效,并且通過(guò)同時(shí)運(yùn)行多個(gè)工作程序(即Spark任務(wù))來(lái)實(shí)現(xiàn)訓(xùn)練的并行化。
如果確實(shí)要優(yōu)化OpenMP,則必須
- nthread創(chuàng)建XGBoostClassifier / XGBoostRegressor時(shí)將其設(shè)置為大于1的值
- spark.task.cpus在Spark中設(shè)置為與nthread
幫派調(diào)度
XGBoost使用AllReduce。一種算法,用于在訓(xùn)練過(guò)程中同步每個(gè)工人的統(tǒng)計(jì)數(shù)據(jù)(例如直方圖值)。XGBoost4J-Spark要求在訓(xùn)練運(yùn)行之前所有內(nèi)核都應(yīng)可用。nthread * numWorkers
在許多用戶共享同一集群的生產(chǎn)環(huán)境中,很難保證XGBoost4J-Spark應(yīng)用程序可以為每次運(yùn)行獲取所有求的資源。默認(rèn)情況下,當(dāng)需要更多資源可用時(shí),XGBoost中的通信層將阻止整個(gè)應(yīng)用程序。此過(guò)程通常會(huì)帶來(lái)不必要的資源浪費(fèi),因?yàn)樗鼤?huì)保留可用資源并嘗試索要更多資源。此外,這通常是無(wú)聲的,不會(huì)引起用戶的注意。
XGBoost4J-Spark允許用戶設(shè)置超時(shí)閾值,以從群集中聲明資源。如果應(yīng)用程序在此時(shí)間段內(nèi)無(wú)法獲得足夠的資源,則該應(yīng)用程序?qū)⑹?#xff0c;而不是浪費(fèi)資源以使其長(zhǎng)時(shí)間掛起。要啟用此功能,可以使用XGBoostClassifier / XGBoostRegressor進(jìn)行設(shè)置:
xgbClassifier.setTimeoutRequestWorkers(60000L)
或通過(guò)在timeout_request_workers在xgbParamMap建XGBoostClassifier時(shí):
val xgbParam = Map(“eta” -> 0.1f,
“max_depth” -> 2,
“objective” -> “multi:softprob”,
“num_class” -> 3,
“num_round” -> 100,
“num_workers” -> 2,
“timeout_request_workers” -> 60000L)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol(“features”).
setLabelCol(“classIndex”)
如果XGBoost4J-Spark無(wú)法獲得足夠的資源來(lái)運(yùn)行兩個(gè)XGBoost工作程序,則該應(yīng)用程序?qū)⑹ S脩艨梢跃哂型獠繖C(jī)制來(lái)監(jiān)視應(yīng)用程序的狀態(tài)并在這種情況下得到通知。
檢查點(diǎn)訓(xùn)練期間
瞬態(tài)故障在生產(chǎn)環(huán)境中也很常見(jiàn)。為了簡(jiǎn)化XGBoost的設(shè)計(jì),如果任何分布式工作人員失敗,將停止訓(xùn)練。但是,如果經(jīng)過(guò)很長(zhǎng)時(shí)間的訓(xùn)練仍然失敗,那將是對(duì)資源的極大浪費(fèi)。
支持在訓(xùn)練期間創(chuàng)建檢查點(diǎn),以幫助從故障中更有效地恢復(fù)。要啟用此功能,可以使用以下命令設(shè)置構(gòu)建每個(gè)檢查點(diǎn)的迭代次數(shù)setCheckpointInterval以及檢查點(diǎn)的位置setCheckpointPath:
xgbClassifier.setCheckpointInterval(2)
xgbClassifier.setCheckpointPath("/checkpoint_path")
一種等效的方法是在XGBoostClassifier的構(gòu)造函數(shù)中傳遞參數(shù):
val xgbParam = Map(“eta” -> 0.1f,
“max_depth” -> 2,
“objective” -> “multi:softprob”,
“num_class” -> 3,
“num_round” -> 100,
“num_workers” -> 2,
“checkpoint_path” -> “/checkpoints_path”,
“checkpoint_interval” -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
setFeaturesCol(“features”).
setLabelCol(“classIndex”)
如果在這100輪訓(xùn)練中訓(xùn)練失敗,則下一輪訓(xùn)練將從讀取最新的檢查點(diǎn)文件/checkpoints_path開(kāi)始,并從構(gòu)建檢查點(diǎn)的迭代開(kāi)始直到下一次失敗或指定的100輪。
總結(jié)
以上是生活随笔為你收集整理的XGBoost4J-Spark基本原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: MLIR: Infrastructure
- 下一篇: 编写可调模板并使用自动调谐器