diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index 824a3edbeb..78f58e7be9 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -423,7 +423,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}", during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any ) - modelSelector.findBestEstimator(trainFixed, during, persistEveryKStages) + modelSelector.findBestEstimator(trainFixed, Option(during)) val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", ")) diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index dc43fbd315..938fdc330e 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -124,7 +124,9 @@ class RawFeatureFilter[T] @transient protected lazy val log = LoggerFactory.getLogger(this.getClass) /** - * Get binned counts of the feature distribution and empty count for each raw feature + * Get binned counts of the feature distribution and empty count for each raw feature. This computes all the + * statistics on the training and scoring data. It does two map reduce operations, the first to produce a Summary + * of each feature, the second to produce a binned histogram (Distribution) for each feature based on the Summary. * * @param data data frame to compute counts on * @param features list of raw, non-protected, features contained in the dataframe @@ -151,6 +153,7 @@ class RawFeatureFilter[T] val predOut = allPredictors.map(TransientFeature(_)) (respOut, predOut) } + // process all features based on raw type so that they can be summerized as either text or numeric val preparedFeatures: RDD[PreparedFeatures] = data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) implicit val sgTuple2Maps = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]() diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala index 4a13050e75..f0953b0ef8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala @@ -40,6 +40,8 @@ import enumeratum.Enum import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder +import scala.concurrent.duration.Duration + /** * A factory for Binary Classification Model Selector @@ -153,6 +155,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return Classification Model Selector with a Cross Validation */ def withCrossValidation( @@ -164,10 +167,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory { stratify: Boolean = ValidatorParamDefaults.Stratify, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val cv = new OpCrossValidation[ModelType, EstimatorType]( - numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism + numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify, + parallelism = parallelism, maxWait = maxWait ) selector(cv, splitter = splitter, @@ -198,6 +203,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return Classification Model Selector with a Train Validation Split */ def withTrainValidationSplit( @@ -209,10 +215,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory { stratify: Boolean = ValidatorParamDefaults.Stratify, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val ts = new OpTrainValidationSplit[ModelType, EstimatorType]( - trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism + trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism, + maxWait = maxWait ) selector(ts, splitter = splitter, diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelector.scala index 6707da5097..82bd968947 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelector.scala @@ -33,13 +33,15 @@ package com.salesforce.op.stages.impl.classification import com.salesforce.op.evaluators._ import com.salesforce.op.stages.impl.ModelsToTry import com.salesforce.op.stages.impl.classification.{MultiClassClassificationModelsToTry => MTT} -import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector} +import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory} import com.salesforce.op.stages.impl.tuning._ import enumeratum.Enum import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder +import scala.concurrent.duration.Duration + /** * A factory for Multi Classification Model Selector @@ -133,6 +135,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return MultiClassification Model Selector with a Cross Validation */ def withCrossValidation( @@ -144,10 +147,12 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory { stratify: Boolean = ValidatorParamDefaults.Stratify, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val cv = new OpCrossValidation[ModelType, EstimatorType]( - numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism + numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify, parallelism = parallelism, + maxWait = maxWait ) selector(cv, splitter = splitter, @@ -178,6 +183,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return MultiClassification Model Selector with a Train Validation Split */ def withTrainValidationSplit( @@ -189,7 +195,8 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory { stratify: Boolean = ValidatorParamDefaults.Stratify, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val ts = new OpTrainValidationSplit[ModelType, EstimatorType]( trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelector.scala index 32022383e0..3d17551669 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelector.scala @@ -34,12 +34,14 @@ import com.salesforce.op.evaluators._ import com.salesforce.op.stages.impl.ModelsToTry import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => MTT} import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType} -import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector} +import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory} import com.salesforce.op.stages.impl.tuning._ import enumeratum.Enum import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder +import scala.concurrent.duration.Duration + /** * A factory for Regression Model Selector @@ -57,7 +59,7 @@ case object RegressionModelSelector extends ModelSelectorFactory { * Note: [[OpDecisionTreeRegressor]] and [[OpXGBoostRegressor]] are off by default */ val modelTypesToUse: Seq[RegressionModelsToTry] = Seq( - MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor, MTT.OpGeneralizedLinearRegression + MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor ) /** @@ -107,6 +109,7 @@ case object RegressionModelSelector extends ModelSelectorFactory { val glrParams = new ParamGridBuilder() .addGrid(glr.fitIntercept, DefaultSelectorParams.FitIntercept) .addGrid(glr.family, DefaultSelectorParams.DistFamily) + .addGrid(glr.link, DefaultSelectorParams.LinkFunction) .addGrid(glr.maxIter, DefaultSelectorParams.MaxIterLin) .addGrid(glr.regParam, DefaultSelectorParams.Regularization) .addGrid(glr.tol, DefaultSelectorParams.Tol) @@ -145,6 +148,7 @@ case object RegressionModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return Regression Model Selector with a Cross Validation */ def withCrossValidation( @@ -155,10 +159,11 @@ case object RegressionModelSelector extends ModelSelectorFactory { seed: Long = ValidatorParamDefaults.Seed, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val cv = new OpCrossValidation[ModelType, EstimatorType]( - numFolds = numFolds, seed = seed, validationMetric, parallelism = parallelism + numFolds = numFolds, seed = seed, evaluator = validationMetric, parallelism = parallelism, maxWait = maxWait ) selector(cv, splitter = dataSplitter, @@ -188,6 +193,7 @@ case object RegressionModelSelector extends ModelSelectorFactory { * for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be * an Estimator that takes in a label (RealNN) and features (OPVector) and returns a * prediction (Prediction) + * @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day) * @return Regression Model Selector with a Train Validation Split */ def withTrainValidationSplit( @@ -198,7 +204,8 @@ case object RegressionModelSelector extends ModelSelectorFactory { seed: Long = ValidatorParamDefaults.Seed, parallelism: Int = ValidatorParamDefaults.Parallelism, modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse, - modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty + modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty, + maxWait: Duration = ValidatorParamDefaults.MaxWait ): ModelSelector[ModelType, EstimatorType] = { val ts = new OpTrainValidationSplit[ModelType, EstimatorType]( trainRatio = trainRatio, seed = seed, validationMetric, parallelism = parallelism diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/DefaultSelectorParams.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/DefaultSelectorParams.scala index 184b941aa0..46b2ec9708 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/DefaultSelectorParams.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/DefaultSelectorParams.scala @@ -53,7 +53,16 @@ object DefaultSelectorParams { val RegSolver = Array("auto") // regression solver spark default auto val FitIntercept = Array(true) // fit intercept spark default true val NbSmoothing = Array(1.0) // spark default 1.0 - val DistFamily = Array("gaussian", "poisson") // generalized linear model link family + val DistFamily = Array("gaussian", "binomial", "poisson", "gamma", "tweedie") // glm distribution family + val LinkFunction = Array("identity", "log", "inverse", "logit", "probit", "cloglog", "sqrt") // glm link function + // Valid link functions for each family is listed below. The first link function of each family + // is the default one. + // - "gaussian" : "identity", "log", "inverse" + // - "binomial" : "logit", "probit", "cloglog" + // - "poisson" : "log", "identity", "sqrt" + // - "gamma" : "inverse", "identity", "log" + // - "tweedie" : power link function specified through "linkPower". The default link power in + // the tweedie family is 1 - variancePower. val NumRound = Array(100) // number of rounds for xgboost (default 1) val Eta = Array(0.1 , 0.3) // step size shrinkage for xgboost (default 0.3) val MinChildWeight = Array(1.0, 5.0, 10.0) // minimum sum of instance weight needed in a child for xgboost (default 1) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala index 432672c652..f32aae608e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala @@ -33,21 +33,18 @@ package com.salesforce.op.stages.impl.selector import com.salesforce.op.UID import com.salesforce.op.evaluators.{EvaluationMetrics, _} import com.salesforce.op.features.types._ -import com.salesforce.op.readers.DataFrameFieldNames import com.salesforce.op.stages._ import com.salesforce.op.stages.base.binary.OpTransformer2 import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.impl.selector.ModelSelectorNames.ModelType -import com.salesforce.op.stages.impl.tuning._ +import com.salesforce.op.stages.impl.tuning.{BestEstimator, _} import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapperModel, SparkModelConverter} -import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.spark.RichParamMap._ import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.spark.ml.param._ import org.apache.spark.ml.{Estimator, Model, PredictionModel} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import scala.reflect.runtime.universe._ @@ -93,37 +90,48 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] } @transient private[op] var bestEstimator: Option[BestEstimator[E]] = None - @transient private lazy val modelsUse = models.map{case (e, p) => + // TODO allow smart modification of these values + @transient private lazy val modelsUse = models.map{ case (e, p) => val est = e.setOutputFeatureName(getOutputFeatureName) val par = if (p.isEmpty) Array(new ParamMap) else p est -> par } + lazy val labelColName: String = in1.name + + override protected[op] def outputsColNamesMap: Map[String, String] = + Map(ModelSelectorNames.outputParamName -> getOutputFeatureName) + /** * Find best estimator with validation on a workflow level. Executed when workflow level Cross Validation is on * (see [[com.salesforce.op.OpWorkflow.withWorkflowCV]]) * * @param data data to validate * @param dag dag done inside the Cross-validation/Train-validation split - * @param persistEveryKStages frequency of persisting the DAG's stages * @param spark Spark Session * @return Updated Model Selector with best model along with best paramMap */ - protected[op] def findBestEstimator(data: DataFrame, dag: StagesDAG, persistEveryKStages: Int = 0) - (implicit spark: SparkSession): Unit = { - val PrevalidationVal(_, dataSetOpt) = prepareForValidation(data, in1.name) - val theBestEstimator = validator.validate(modelInfo = modelsUse, dataset = dataSetOpt.getOrElse(data), - label = in1.name, features = in2.name, dag = Option(dag), splitter = splitter, - stratifyCondition = validator.isClassification + protected[op] def findBestEstimator(data: DataFrame, dag: Option[StagesDAG] = None) + (implicit spark: SparkSession): (BestEstimator[E], Option[SplitterSummary], DataFrame) = { + + val PrevalidationVal(splitterSummary, dataOpt) = prepareForValidation(data, labelColName) + val dataUse = dataOpt.getOrElse(data) + + val theBestEstimator = validator.validate(modelInfo = modelsUse, dataset = dataUse, + label = labelColName, features = in2.name, dag = dag, splitter = splitter ) bestEstimator = Option(theBestEstimator) + (theBestEstimator, splitterSummary, dataUse) } - lazy val labelColName: String = in1.name - override protected[op] def outputsColNamesMap: Map[String, String] = - Map(ModelSelectorNames.outputParamName -> getOutputFeatureName) + private def prepareForValidation(data: DataFrame, labelColName: String): PrevalidationVal = { + splitter + .map(_.withLabelColumnName(labelColName).preValidationPrepare(data)) + .getOrElse(PrevalidationVal(None, Option(data))) + } + /** * Splits the data into training test and test set, balances the training set and selects the best model @@ -135,25 +143,13 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] final override def fit(dataset: Dataset[_]): SelectedModel = { implicit val spark = dataset.sparkSession - - val datasetWithIDPre = - if (dataset.columns.contains(DataFrameFieldNames.KeyFieldName)) { - dataset.select(in1.name, in2.name, DataFrameFieldNames.KeyFieldName) - } else { - dataset.select(in1.name, in2.name) - .withColumn(ModelSelectorNames.idColName, monotonically_increasing_id()) - } - require(!datasetWithIDPre.isEmpty, "Dataset cannot be empty") - - val PrevalidationVal(splitterSummary, dataSetWithIDOpt) = prepareForValidation(datasetWithIDPre, in1.name) - val datasetWithID = dataSetWithIDOpt.getOrElse(datasetWithIDPre) - val BestEstimator(name, estimator, summary) = bestEstimator.getOrElse { - setInputSchema(dataset.schema).transformSchema(dataset.schema) - val best = validator - .validate(modelInfo = modelsUse, dataset = datasetWithID, label = in1.name, features = in2.name) - bestEstimator = Some(best) - best - } + setInputSchema(dataset.schema).transformSchema(dataset.schema) + require(!dataset.isEmpty, "Dataset cannot be empty") + val data = dataset.select(labelColName, in2.name) + val (BestEstimator(name, estimator, summary), splitterSummary, datasetWithID) = bestEstimator.map{ e => + val PrevalidationVal(summary, dataOpt) = prepareForValidation(data, labelColName) + (e, summary, dataOpt.getOrElse(data)) + }.getOrElse{ findBestEstimator(data.toDF()) } val preparedData = splitter.map(_.validationPrepare(datasetWithID)).getOrElse(datasetWithID) val bestModel = estimator.fit(preparedData).asInstanceOf[M] @@ -162,7 +158,7 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] log.info(s"With parameters : ${bestEst.extractParamMap()}") // set input and output params - outputsColNamesMap.foreach { case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) } + outputsColNamesMap.foreach{ case (pname, pvalue) => bestModel.set(bestModel.getParam(pname), pvalue) } // get eval results for metadata val trainingEval = evaluate(bestModel.transform(preparedData)) @@ -195,11 +191,6 @@ E <: Estimator[_] with OpPipelineStage2[RealNN, OPVector, Prediction]] .setEvaluators(evaluators) } - private def prepareForValidation(data: DataFrame, labelColName: String): PrevalidationVal = { - splitter - .map(_.withLabelColumnName(labelColName).preValidationPrepare(data)) - .getOrElse(PrevalidationVal(None, Option(data))) - } } /** diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorFactory.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorFactory.scala index 685e79a98b..1956237e4f 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorFactory.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorFactory.scala @@ -33,10 +33,12 @@ package com.salesforce.op.stages.impl.selector import com.salesforce.op.evaluators.{EvaluationMetrics, OpEvaluatorBase} import com.salesforce.op.stages.impl.ModelsToTry import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType} -import com.salesforce.op.stages.impl.tuning.{OpValidator, Splitter} +import com.salesforce.op.stages.impl.tuning.{OpValidator, Splitter, ValidatorParamDefaults} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder +import scala.concurrent.duration.Duration + /** * Creates the model selector class */ diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala index b3d857f92e..cd6cb76c14 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala @@ -44,7 +44,8 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]] val seed: Long = ValidatorParamDefaults.Seed, val evaluator: OpEvaluatorBase[_], val stratify: Boolean = ValidatorParamDefaults.Stratify, - val parallelism: Int = ValidatorParamDefaults.Parallelism + val parallelism: Int = ValidatorParamDefaults.Parallelism, + val maxWait: Duration = ValidatorParamDefaults.MaxWait ) extends OpValidator[M, E] { val validationName: String = ModelSelectorNames.CrossValResults @@ -115,7 +116,7 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]] } } // Await for all the evaluations to complete - val modelSummaries = SparkThreadUtils.utils.awaitResult(Future.sequence(modelSummariesFuts.toSeq), Duration.Inf) + val modelSummaries = SparkThreadUtils.utils.awaitResult(Future.sequence(modelSummariesFuts.toSeq), maxWait) // Find the best model & return it val groupedSummary = modelSummaries.flatten.groupBy(_.model).map { case (_, folds) => findBestModel(folds) }.toArray diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala index c2fdf4b4e1..e33616c3db 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SparkSession} import scala.concurrent.ExecutionContext +import scala.concurrent.duration.Duration private[op] class OpTrainValidationSplit[M <: Model[_], E <: Estimator[_]] @@ -37,7 +38,8 @@ private[op] class OpTrainValidationSplit[M <: Model[_], E <: Estimator[_]] val seed: Long = ValidatorParamDefaults.Seed, val evaluator: OpEvaluatorBase[_], val stratify: Boolean = ValidatorParamDefaults.Stratify, - val parallelism: Int = ValidatorParamDefaults.Parallelism + val parallelism: Int = ValidatorParamDefaults.Parallelism, + val maxWait: Duration = ValidatorParamDefaults.MaxWait ) extends OpValidator[M, E] { val validationName: String = ModelSelectorNames.TrainValSplitResults diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala index fbe55c8897..7c872d3b60 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/OpValidator.scala @@ -42,13 +42,13 @@ import org.apache.log4j.{Level, LogManager} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.sql.{Dataset, Row, SparkSession, functions} import org.apache.spark.util.SparkThreadUtils import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} /** @@ -105,6 +105,8 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ def parallelism: Int + def maxWait: Duration + private[op] final def isClassification = evaluator match { case _: OpBinaryClassificationEvaluatorBase[_] => true case _: OpMultiClassificationEvaluatorBase[_] => true @@ -200,6 +202,15 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ splitter: Option[Splitter] ): Array[(RDD[Row], RDD[Row])] + /** + * Creates RDD grouped by classes (0, 1, 2, 3, ..., K) for stratified sampling + * @param dataset dataset to prepare + * @param message message to log + * @param label label name + * @param splitter data splitter (processor) for pre modeling data manipulation + * @tparam T + * @return Sequence of RDDs grouped by label class + */ protected def prepareStratification[T]( dataset: Dataset[T], message: String, @@ -225,6 +236,17 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ datasetsByClass.map(_.toDF().rdd) } + /** + * Transform data in train or test up to the model selector + * @param dag Stages to be applied in DAG + * @param training training data + * @param validation validation data + * @param label label name + * @param features feature name + * @param splitter data splitter (processor) for pre modeling data manipulation + * @param sparkSession + * @return transformed training and test data + */ protected def applyDAG( dag: StagesDAG, training: Dataset[Row], @@ -239,13 +261,10 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ train = training, test = validation, hasTest = true, - indexOfLastEstimator = Some(-1) + indexOfLastEstimator = Option(-1) ) val selectTrain = newTrain.select(label, features) - .withColumn(ModelSelectorNames.idColName, monotonically_increasing_id()) - val selectTest = newTest.select(label, features) - .withColumn(ModelSelectorNames.idColName, monotonically_increasing_id()) val (balancedTrain, balancedTest) = splitter.map(s => ( s.validationPrepare(selectTrain), @@ -267,6 +286,17 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ result } + /** + * Does the model fitting for the all models and their accompanying hyperparameter grids + * @param modelInfo Sequence of estimators and grids to try + * @param label label column + * @param features features column + * @param train training data + * @param test test data + * @param ec + * @tparam T + * @return Array of fit models and their metrics + */ protected def getSummary[T]( modelInfo: Seq[(E, Array[ParamMap])], label: String, features: String, train: Dataset[T], test: Dataset[T] )(implicit ec: ExecutionContext): Array[ValidatedModel[E]] = { @@ -276,9 +306,8 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ val name = estimator.getClass.getSimpleName estimator match { case e: OpPipelineStage2[RealNN, OPVector, Prediction]@unchecked => - val (labelFeat, Array(featuresFeat: Feature[OPVector]@unchecked, _)) = - FeatureBuilder.fromDataFrame[RealNN](train.toDF(), response = label, - nonNullable = Set(features, ModelSelectorNames.idColName)) + val (labelFeat, Array(featuresFeat: Feature[OPVector]@unchecked)) = + FeatureBuilder.fromDataFrame[RealNN](train.toDF(), response = label, nonNullable = Set(features)) e.setInput(labelFeat, featuresFeat) evaluator.setPredictionCol(e.getOutput()) case _ => // otherwise it is a spark estimator @@ -286,26 +315,43 @@ private[op] trait OpValidator[M <: Model[_], E <: Estimator[_]] extends Serializ val pi2 = estimator.getParam(ModelSelectorNames.inputParam2Name) estimator.set(pi1, label).set(pi2, features) } - Future { - val numModels = params.length - val metrics = new Array[Double](params.length) - log.info(s"Train split with multiple sets of parameters.") - val models = estimator.fit(train, params).asInstanceOf[Seq[M]] - for {i <- 0 until numModels} { - val metric = evaluator.evaluate(models(i).transform(test, params(i))) - log.info(s"Got metric $metric for model $name trained with ${params(i)}.") - metrics(i) = metric + + val paramsMetricsF = params.seq.map { p => + Future { + val model = estimator.fit(train, p).asInstanceOf[M] + val metric = evaluator.evaluate(model.transform(test, p)) + log.info(s"Got metric $metric for model $name trained with $p.") + Option(p -> metric) + }.recover({ case e: Throwable => + log.warn(s"Model $name attempted in model selector with failed with following issue: \n", e) + None + }) } - val (bestMetric, bestIndex) = - if (evaluator.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) - else metrics.zipWithIndex.minBy(_._1) - log.info(s"Best set of parameters:\n${params(bestIndex)} for $name") - log.info(s"Best train validation split metric: $bestMetric.") - ValidatedModel(estimator, bestIndex, metrics, params) - } + Future.sequence(paramsMetricsF).map { paramsMetrics => + val (goodParams, metrics) = paramsMetrics.flatten.unzip + + val (bestMetric, bestIndex) = + if (evaluator.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) + else metrics.zipWithIndex.minBy(_._1) + + log.info(s"Best set of parameters:\n${params(bestIndex)} for $name") + log.info(s"Best train validation split metric: $bestMetric.") + ValidatedModel(estimator, bestIndex, metrics.toArray, goodParams.toArray) + } + } + + val summaryOfAttempts = summaryFuts.map { f => f.map(Option(_)).recover { + case e: Throwable => + log.warn("Model attempted in model selector failed with following issue: \n", e) + None + }} + val summary = SparkThreadUtils.utils.awaitResult(Future.sequence(summaryOfAttempts), maxWait).flatten.toArray + if (summary.isEmpty) { + throw new RuntimeException( + s"All models failed model selector or failed to finsih within $maxWait!!! Models tried were: \n" + + s"${modelInfo.map(m => s"${m._1.getClass.getSimpleName} -> ${m._2.mkString(", ")}" ).mkString("\n")}") } - val summary = SparkThreadUtils.utils.awaitResult(Future.sequence(summaryFuts), Duration.Inf).toArray train.unpersist() test.unpersist() summary @@ -330,5 +376,6 @@ object ValidatorParamDefaults { val TrainRatio = 0.75 val Stratify = false val Parallelism = 8 + val MaxWait = Duration(1L, "day") } diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala index 4cb0563190..ef8fe635c6 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala @@ -305,6 +305,7 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val lrParams = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.0, 0.001, 0.1)) .build() + val models = Seq(lr -> lrParams) val pred1 = BinaryClassificationModelSelector.withCrossValidation( @@ -344,10 +345,11 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { val summary2 = model2.modelInsights(pred2) log.info("model2.summary: \n{}", summary2) - summary1.selectedModelInfo.get.validationResults - .forall(_.metricValues.asInstanceOf[SingleMetric].value < 0.81) shouldBe true - summary2.selectedModelInfo.get.validationResults - .forall(_.metricValues.asInstanceOf[SingleMetric].value < 0.81) shouldBe false + summary1.selectedModelInfo.get.validationResults.zip( + summary2.selectedModelInfo.get.validationResults + ).forall{ case (v1, v2) => + v1.metricValues.asInstanceOf[SingleMetric].value < v2.metricValues.asInstanceOf[SingleMetric].value + } shouldBe true } def compare( diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 2d0331fcf6..16f7ee4c09 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -36,12 +36,13 @@ import com.salesforce.op.features.{Feature, FeatureBuilder} import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => RMT} import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType -import com.salesforce.op.stages.impl.selector.ModelSelectorSummary +import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorSummary} import com.salesforce.op.stages.impl.tuning.BestEstimator import com.salesforce.op.test.TestSparkContext import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ import ml.dmlc.xgboost4j.scala.spark.OpXGBoostQuietLogging +import org.apache.spark.SparkException import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamPair import org.apache.spark.ml.tuning.ParamGridBuilder @@ -51,6 +52,10 @@ import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration +import scala.util.Random + @RunWith(classOf[JUnitRunner]) class RegressionModelSelectorTest extends FlatSpec with TestSparkContext @@ -59,9 +64,10 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext val stageNames = "label_prediction" import spark.implicits._ + val rand = new Random(seed) - val rawData: Seq[(Double, Vector)] = List.range(0, 100, 1).map(i => - (i.toDouble, Vectors.dense(2 * i, 4 * i))) + val rawData: Seq[(Double, Vector)] = List.range(-100, 100, 1).map(i => + (i.toDouble, Vectors.dense(2 * i, 4 * i + 5, rand.nextFloat()))) val data = sc.parallelize(rawData).toDF("label", "features") @@ -213,6 +219,69 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext justScores.length shouldEqual transformedData.count() } + it should "fit and predict for even when some models fail" in { + val testEstimator = RegressionModelSelector + .withCrossValidation( + numFolds = 4, + validationMetric = Evaluators.Regression.mse(), + seed = 10L, + modelTypesToUse = Seq(RegressionModelsToTry.OpLinearRegression, + RegressionModelsToTry.OpGeneralizedLinearRegression) + ) + .setInput(label, features) + + + val model = testEstimator.fit(data) + model.evaluateModel(data) + + // evaluation metrics from train set should be in metadata + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + RegressionEvalMetrics.values.foreach(metric => + assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"), + s"Metric ${metric.entryName} is not present in metadata: " + metaData) + ) + metaData.validationResults.foreach(println(_)) + metaData.validationResults.size shouldBe 42 + } + + + it should "fail when all models fail due to inappropriate data" in { + + val glr = new OpGeneralizedLinearRegression() + // GLR poisson cannot take negative values so will fail this test + val glrParams = new ParamGridBuilder() + .addGrid(glr.family, Seq("poisson")) + .addGrid(glr.maxIter, DefaultSelectorParams.MaxIterLin) + .build() + + val testEstimator = RegressionModelSelector + .withCrossValidation( + numFolds = 4, + validationMetric = Evaluators.Regression.mse(), + seed = 10L, + modelsAndParameters = Seq(glr -> glrParams) + ) + .setInput(label, features) + + + intercept[SparkException](testEstimator.fit(data)) + } + + it should "fail when maxWait is set too low" in { + val testEstimator = RegressionModelSelector + .withCrossValidation( + numFolds = 4, + validationMetric = Evaluators.Regression.mse(), + seed = 10L, + modelTypesToUse = Seq(RegressionModelsToTry.OpLinearRegression), + maxWait = Duration(1L, "microsecond") + ) + .setInput(label, features) + + + intercept[TimeoutException](testEstimator.fit(data)) + } + it should "fit and predict with a train validation split even if there is no split between training and test" in { val testEstimator = RegressionModelSelector