Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making model selectors robust to failing models #372

Merged
merged 14 commits into from
Aug 2, 2019
Merged
2 changes: 1 addition & 1 deletion core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}",
during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any
)
modelSelector.findBestEstimator(trainFixed, during, persistEveryKStages)
modelSelector.findBestEstimator(trainFixed, Option(during))
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after

log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ class RawFeatureFilter[T]
@transient protected lazy val log = LoggerFactory.getLogger(this.getClass)

/**
* Get binned counts of the feature distribution and empty count for each raw feature
* Get binned counts of the feature distribution and empty count for each raw feature. This computes all the
* statistics on the training and scoring data. It does two map reduce operations, the first to produce a Summary
* of each feature, the second to produce a binned histogram (Distribution) for each feature based on the Summary.
*
* @param data data frame to compute counts on
* @param features list of raw, non-protected, features contained in the dataframe
Expand All @@ -151,6 +153,7 @@ class RawFeatureFilter[T]
val predOut = allPredictors.map(TransientFeature(_))
(respOut, predOut)
}
// process all features based on raw type so that they can be summerized as either text or numeric
val preparedFeatures: RDD[PreparedFeatures] = data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod))

implicit val sgTuple2Maps = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import enumeratum.Enum
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Binary Classification Model Selector
Expand Down Expand Up @@ -153,6 +155,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Classification Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -164,10 +167,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify,
parallelism = parallelism, maxWait = maxWait
)
selector(cv,
splitter = splitter,
Expand Down Expand Up @@ -198,6 +203,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Classification Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -209,10 +215,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism,
maxWait = maxWait
)
selector(ts,
splitter = splitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ package com.salesforce.op.stages.impl.classification
import com.salesforce.op.evaluators._
import com.salesforce.op.stages.impl.ModelsToTry
import com.salesforce.op.stages.impl.classification.{MultiClassClassificationModelsToTry => MTT}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory}
import com.salesforce.op.stages.impl.tuning._
import enumeratum.Enum
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Multi Classification Model Selector
Expand Down Expand Up @@ -133,6 +135,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return MultiClassification Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -144,10 +147,12 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify, parallelism = parallelism,
maxWait = maxWait
)
selector(cv,
splitter = splitter,
Expand Down Expand Up @@ -178,6 +183,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return MultiClassification Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -189,7 +195,8 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import com.salesforce.op.evaluators._
import com.salesforce.op.stages.impl.ModelsToTry
import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => MTT}
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory}
import com.salesforce.op.stages.impl.tuning._
import enumeratum.Enum
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Regression Model Selector
Expand All @@ -57,7 +59,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* Note: [[OpDecisionTreeRegressor]] and [[OpXGBoostRegressor]] are off by default
*/
val modelTypesToUse: Seq[RegressionModelsToTry] = Seq(
MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor, MTT.OpGeneralizedLinearRegression
MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor
)

/**
Expand Down Expand Up @@ -107,6 +109,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
val glrParams = new ParamGridBuilder()
.addGrid(glr.fitIntercept, DefaultSelectorParams.FitIntercept)
.addGrid(glr.family, DefaultSelectorParams.DistFamily)
.addGrid(glr.link, DefaultSelectorParams.LinkFunction)
.addGrid(glr.maxIter, DefaultSelectorParams.MaxIterLin)
.addGrid(glr.regParam, DefaultSelectorParams.Regularization)
.addGrid(glr.tol, DefaultSelectorParams.Tol)
Expand Down Expand Up @@ -145,6 +148,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Regression Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -155,10 +159,11 @@ case object RegressionModelSelector extends ModelSelectorFactory {
seed: Long = ValidatorParamDefaults.Seed,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, parallelism = parallelism, maxWait = maxWait
)
selector(cv,
splitter = dataSplitter,
Expand Down Expand Up @@ -188,6 +193,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Regression Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -198,7 +204,8 @@ case object RegressionModelSelector extends ModelSelectorFactory {
seed: Long = ValidatorParamDefaults.Seed,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, parallelism = parallelism
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ object DefaultSelectorParams {
val RegSolver = Array("auto") // regression solver spark default auto
val FitIntercept = Array(true) // fit intercept spark default true
val NbSmoothing = Array(1.0) // spark default 1.0
val DistFamily = Array("gaussian", "poisson") // generalized linear model link family
val DistFamily = Array("gaussian", "binomial", "poisson", "gamma", "tweedie") // glm distribution family
val LinkFunction = Array("identity", "log", "inverse", "logit", "probit", "cloglog", "sqrt") // glm link function
// Valid link functions for each family is listed below. The first link function of each family
// is the default one.
// - "gaussian" : "identity", "log", "inverse"
// - "binomial" : "logit", "probit", "cloglog"
// - "poisson" : "log", "identity", "sqrt"
// - "gamma" : "inverse", "identity", "log"
// - "tweedie" : power link function specified through "linkPower". The default link power in
// the tweedie family is 1 - variancePower.
val NumRound = Array(100) // number of rounds for xgboost (default 1)
val Eta = Array(0.1 , 0.3) // step size shrinkage for xgboost (default 0.3)
val MinChildWeight = Array(1.0, 5.0, 10.0) // minimum sum of instance weight needed in a child for xgboost (default 1)
Expand Down
Loading