XGBoost 与 Apache Spark 的全面集成
引言
2016年3月,我们发布了 XGBoost4J 的第一个版本,它提供了一组 XGBoost 的 Java/Scala 接口,并集成了流行的基于 JVM 的分布式数据处理平台,如 Spark/Flink。
与 Spark/Flink 的集成,即 XGBoost4J-Spark 和 XGBoost-Flink,获得了社区的巨大积极反馈。它使用户能够构建统一的流水线,将 XGBoost 嵌入到基于广泛部署框架(如 Spark)的数据处理系统中。下图展示了使用第一个版本的 XGBoost4J-Spark 构建的这种流水线的通用架构,其中数据处理基于低级别的 弹性分布式数据集 (RDD) 抽象。
在过去的几个月里,我们与用户进行了大量交流,对用户的最新使用场景和需求有了更深入的了解
-
XGBoost 在生产环境中的部署和在机器学习竞赛中的应用越来越多 链接。
-
虽然 Spark 在大多数场景中仍然是主流的数据处理工具,但越来越多的用户正在将其基于 RDD 的 Spark 程序迁移到 DataFrame/Dataset API,因为其精心设计的接口可用于操作结构化数据,并且具有 显著的性能提升。
-
Spark 本身已经提出了清晰的路线图,DataFrame/Dataset 将成为最新和未来功能的基础,例如最新版本的 ML pipeline 和 Structured Streaming。
基于用户的这些反馈,我们观察到原始基于 RDD 的 XGBoost4J-Spark 与用户的最新使用场景以及 Spark 生态系统的未来发展方向之间存在差距。为了弥补这一差距,我们于九月开始着手集成 XGBoost 和 Spark 的 DataFrame/Dataset 抽象。在这篇博客中,我们将介绍最新版本的 XGBoost4J-Spark,它允许用户直接使用 DataFrame/Dataset,并将 XGBoost 无缝地嵌入到 Spark 的 ML pipeline 中。
XGBoost 和 DataFrame/Dataset 的全面集成
下图展示了使用最新 XGBoost4J-Spark 的新流水线架构。
与以前的版本不同,用户可以使用 Spark 中的低级和高级内存抽象,即 RDD 和 DataFrame/Dataset。DataFrame/Dataset 抽象使用户能够操作结构化数据集,并利用 Spark 的内置例程或用户定义函数 (UDF) 在将数据馈送到流水线中的机器学习阶段之前探索列中的值分布。在下面的示例中,结构化的销售记录可以保存在 JSON 文件中,通过 Spark 的 API 解析为 DataFrame,并仅用两行 Scala 代码来训练 XGBoost 模型。
// load sales records saved in json files
val salesDF = spark.read.json("sales.json")
// call XGBoost API to train with the DataFrame-represented training set
val xgboostModel = XGBoost.trainWithDataFrame(
salesDF, paramMap, numRound, nWorkers, useExternalMemory)
通过与 DataFrame/Dataset 集成,XGBoost4J-Spark 不仅使用户能够直接调用 DataFrame/Dataset API,而且还将基于 DataFrame/Dataset 的 Spark 功能提供给 XGBoost 用户,例如 ML Package。
与 ML Package 的集成
Spark 的 ML Package 提供了一套方便的工具,用于特征提取/转换/选择。此外,借助 ML Package 中的模型选择工具,用户可以通过使用 ML Package API 定义的自动参数搜索过程来选择最佳模型。在与 DataFrame/Dataset 抽象集成后,ML Package 中的这些吸引人的功能也对 XGBoost 用户可用。
特征提取/转换/选择
下面的示例展示了一个特征转换器,它将字符串类型的 storeType 特征转换为数字类型的 storeTypeIndex。然后将转换后的 DataFrame 用于训练 XGBoost 模型。
import org.apache.spark.ml.feature.StringIndexer
// load sales records saved in json files
val salesDF = spark.read.json("sales.json")
// transfrom the string-represented storeType feature to numeric storeTypeIndex
val indexer = new StringIndexer()
.setInputCol("storeType")
.setOutputCol("storeTypeIndex")
// drop the extra column
val indexed = indexer.fit(salesDF).transform(df).drop("storeType")
// use the transformed dataframe as training dataset
val xgboostModel = XGBoost.trainWithDataFrame(
indexed, paramMap, numRound, nWorkers, useExternalMemory)
构建流水线
Spark ML Package 允许用户构建从特征提取/转换/选择到模型训练的完整流水线。我们将 XGBoost 与 ML Package 集成,使其能够无缝地嵌入到此类流水线中。下面的示例展示了如何构建一个包含特征转换器和 XGBoost 估计器的流水线。
import org.apache.spark.ml.feature.StringIndexer
// load sales records saved in json files
val salesDF = spark.read.json("sales.json")
// transfrom the string-represented storeType feature to numeric storeTypeIndex
val indexer = new StringIndexer()
.setInputCol("storeType")
.setOutputCol("storeTypeIndex")
// assemble the columns in dataframe into a vector
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("storeId", "storeTypeIndex", ...))
.setOutputCol("features")
// construct the pipeline
val pipeline = new Pipeline().setStages(
Array(storeTypeIndexer, ..., vectorAssembler, new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100)))
// use the transformed dataframe as training dataset
val xgboostModel = pipeline.fit(salesDF)
// predict with the trained model
val salesTestDF = spark.read.json("sales_test.json")
val salesRecordsWithPred = xgboostModel.transform(salesTestDF)
模型选择
要最大化 XGBoost 的能力,最关键的操作是选择模型的最佳参数。手动调参是一个乏味且耗时的过程。借助最新版本的 XGBoost4J-Spark,我们可以利用 Spark 模型选择工具来自动化此过程。下面的示例展示了如何利用 TrainValidationSplit 和 RegressionEvaluator 来搜索两个 XGBoost 参数 [max_depth 和 eta] (https://github.com/dmlc/xgboost/blob/master/doc/parameter.md) 的最佳组合的代码片段。选择由 RegressionEvaluator 定义的产生最小成本函数值的模型,并用于生成测试集的预测。
// create XGBoostEstimator
val xgbEstimator = new XGBoostEstimator(xgboostParam).setFeaturesCol("features").
setLabelCol("sales")
val paramGrid = new ParamGridBuilder()
.addGrid(xgbEstimator.maxDepth, Array(5, 6))
.addGrid(xgbEstimator.eta, Array(0.1, 0.4))
.build()
val tv = new TrainValidationSplit()
.setEstimator(xgbEstimator)
.setEvaluator(new RegressionEvaluator().setLabelCol("sales"))
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
val salesTestDF = spark.read.json("sales_test.json")
val salesRecordsWithPred = xgboostModel.transform(salesTestDF)
总结
通过最新的 XGBoost4J-Spark,XGBoost 用户可以构建一个更高效的数据处理流水线,该流水线使用 DataFrame/Dataset API 来处理结构化数据并获得出色的性能,同时利用强大的 XGBoost 从数据集中挖掘洞察并将其转化为行动。此外,XGBoost4J-Spark 将 XGBoost 与 Spark ML package 无缝连接,使得特征提取/转换/选择和模型参数调整比以前容易得多。
最新版本的 XGBoost4J-Spark 已在 [GitHub 仓库] (https://github.com/dmlc/xgboost) 中提供,最新的 API 文档在此处。
可移植机器学习系统
XGBoost 是 分布式机器学习社区 (DMLC) 孵化的项目之一,该社区还创建了其他几个流行的机器学习系统项目 (链接),例如最流行的深度学习框架之一 MXNet。我们坚信机器学习解决方案不应局限于特定的语言或平台。我们在 XGBoost 和 MXNet 等几个项目中实现了这一设计理念。我们乐意看到社区在这一方向做出更多贡献。
延伸阅读
如果您有兴趣了解更多关于 XGBoost 的信息,可以在以下资源中找到丰富的内容: