Skip to content

Commit

Permalink
feat: add singleton dataset mode for faster performance
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft committed Jun 1, 2021
1 parent 6aecdf1 commit 9e9ff1a
Show file tree
Hide file tree
Showing 14 changed files with 811 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class SharedSingleton[T: ClassTag](constructor: => T) extends AnyRef with Serial
}

def get: T = instance

}

object SharedSingleton {
Expand Down
20 changes: 15 additions & 5 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.microsoft.ml.spark.lightgbm

import com.microsoft.ml.spark.core.utils.ClusterUtil
import com.microsoft.ml.spark.lightgbm.booster.LightGBMBooster
import com.microsoft.ml.spark.lightgbm.dataset.DatasetUtils
import com.microsoft.ml.spark.lightgbm.params.{DartModeParams, ExecutionParams, LightGBMParams,
ObjectiveParams, TrainParams}
import com.microsoft.ml.spark.logging.BasicLogging
Expand Down Expand Up @@ -169,7 +170,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val featuresSchema = schema.fields(schema.fieldIndex(getFeaturesCol))
val metadata = AttributeGroup.fromStructField(featuresSchema)
if (metadata.attributes.isDefined) {
val slotNamesOpt = TrainUtils.getSlotNames(df.schema,
val slotNamesOpt = DatasetUtils.getSlotNames(df.schema,
columnParams.featuresColumn, metadata.attributes.get.length, trainParams)
val pattern = new Regex("[\",:\\[\\]{}]")
slotNamesOpt.foreach(slotNames => {
Expand All @@ -193,10 +194,14 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine

/**
* Constructs the ExecutionParams.
*
* @param isLocalMode True if spark is run in local mode, on one machine.
* @param numTasksPerExec The number of tasks per executor.
* @return ExecutionParams object containing parameters related to LightGBM execution.
*/
protected def getExecutionParams(): ExecutionParams = {
ExecutionParams(getChunkSize, getMatrixType)
protected def getExecutionParams(isLocalMode: Boolean, numTasksPerExec: Int): ExecutionParams = {
val useSingleDatasetMode = if (isLocalMode || numTasksPerExec == 1) false else getUseSingleDatasetMode
ExecutionParams(getChunkSize, getMatrixType, useSingleDatasetMode)
}

/**
Expand Down Expand Up @@ -238,7 +243,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
*/
val encoder = Encoders.kryo[LightGBMBooster]

val trainParams = getTrainParams(numTasks, getCategoricalIndexes(df), dataset)
val trainParams = getTrainParams(numTasks, getCategoricalIndexes(df), dataset, numTasksPerExec)
log.info(s"LightGBM parameters: ${trainParams.toString()}")
val networkParams = NetworkParams(getDefaultListenPort, inetAddress, port, getUseBarrierExecutionMode)
val (trainingData, validationData) =
Expand Down Expand Up @@ -278,9 +283,14 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine

/** Gets the training parameters.
*
* @param numTasks The total number of tasks.
* @param categoricalIndexes The indexes of the categorical slots in the features vector.
* @param dataset The training dataset.
* @param numTasksPerExec The number of tasks per executor.
* @return train parameters.
*/
protected def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams
protected def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_],
numTasksPerExec: Int): TrainParams

protected def stringFromTrainedModel(model: TrainedModel): String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class LightGBMClassifier(override val uid: String)
def getIsUnbalance: Boolean = $(isUnbalance)
def setIsUnbalance(value: Boolean): this.type = set(isUnbalance, value)

def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int],
dataset: Dataset[_], numTasksPerExec: Int): TrainParams = {
/* The native code for getting numClasses is always 1 unless it is multiclass-classification problem
* so we infer the actual numClasses from the dataset here
*/
val actualNumClasses = getNumClasses(dataset)
val isLocal = dataset.sparkSession.sparkContext.isLocal
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
ClassifierTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves, getMaxBin,
getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction,
Expand All @@ -53,7 +55,7 @@ class LightGBMClassifier(override val uid: String)
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames,
getDelegate, getDartParams(), getExecutionParams(), getObjectiveParams())
getDelegate, getDartParams(), getExecutionParams(isLocal, numTasksPerExec), getObjectiveParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ class LightGBMRanker(override val uid: String)
def getEvalAt: Array[Int] = $(evalAt)
def setEvalAt(value: Array[Int]): this.type = set(evalAt, value)

def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int],
dataset: Dataset[_], numTasksPerExec: Int): TrainParams = {
val isLocal = dataset.sparkSession.sparkContext.isLocal
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
RankerTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves,
getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction,
getBaggingFreq, getBaggingSeed, getEarlyStoppingRound, getImprovementTolerance,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(), getExecutionParams(),
getObjectiveParams())
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate, getDartParams(),
getExecutionParams(isLocal, numTasksPerExec), getObjectiveParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ class LightGBMRegressor(override val uid: String)
def getTweedieVariancePower: Double = $(tweedieVariancePower)
def setTweedieVariancePower(value: Double): this.type = set(tweedieVariancePower, value)

def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int],
dataset: Dataset[_], numTasksPerExec: Int): TrainParams = {
val isLocal = dataset.sparkSession.sparkContext.isLocal
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
RegressorTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves,
getAlpha, getTweedieVariancePower, getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction,
getNegBaggingFraction, getBaggingFreq, getBaggingSeed, getEarlyStoppingRound, getImprovementTolerance,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr, getVerbosity, categoricalIndexes,
getBoostFromAverage, getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric, getMetric,
getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate,
getDartParams(), getExecutionParams(), getObjectiveParams())
getDartParams(), getExecutionParams(isLocal, numTasksPerExec), getObjectiveParams())
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
41 changes: 19 additions & 22 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.slf4j.Logger

import scala.collection.immutable.HashSet
Expand Down Expand Up @@ -65,15 +65,6 @@ object LightGBMUtils {
featurizer.fit(dataset)
}

def getBoosterPtrFromModelString(lgbModelString: String): SWIGTYPE_p_void = {
val boosterOutPtr = lightgbmlib.voidpp_handle()
val numItersOut = lightgbmlib.new_intp()
LightGBMUtils.validate(
lightgbmlib.LGBM_BoosterLoadModelFromString(lgbModelString, numItersOut, boosterOutPtr),
"Booster LoadFromString")
lightgbmlib.voidpp_value(boosterOutPtr)
}

def getCategoricalIndexes(df: DataFrame,
featuresCol: String,
slotNames: Array[String],
Expand Down Expand Up @@ -187,11 +178,10 @@ object LightGBMUtils {
(host, port, f)
}

/** Returns an integer ID for the current node.
*
* @return In cluster, returns the executor id. In local case, returns the task id.
/** Returns an integer ID for the current worker.
* @return In cluster, returns the executor id. In local case, returns the partition id.
*/
def getId(): Int = {
def getWorkerId(): Int = {
val executorId = SparkEnv.get.executorId
val ctx = TaskContext.get
val partId = ctx.partitionId
Expand All @@ -201,14 +191,21 @@ object LightGBMUtils {
idAsInt
}

def generateData(numRows: Int, rowsAsDoubleArray: Array[Array[Double]]):
(SWIGTYPE_p_void, SWIGTYPE_p_double) = {
val numCols = rowsAsDoubleArray.head.length
val data = lightgbmlib.new_doubleArray(numCols.toLong * numRows.toLong)
rowsAsDoubleArray.zipWithIndex.foreach(ri =>
ri._1.zipWithIndex.foreach(value =>
lightgbmlib.doubleArray_setitem(data, (value._2 + (ri._2 * numCols)).toLong, value._1)))
(lightgbmlib.double_to_voidp_ptr(data), data)
/** Returns true if spark is run in local mode.
* @return True if spark is run in local mode.
*/
def isLocalExecution(): Boolean = {
val executorId = SparkEnv.get.executorId
executorId == "driver"
}

/** Returns a unique task Id for the current task run on the executor.
* @return A unique task id.
*/
def getTaskId(): Long = {
val ctx = TaskContext.get
val taskId = ctx.taskAttemptId()
taskId
}

def getNumRowsForChunksArray(numRows: Int, chunkSize: Int): SWIGTYPE_p_int = {
Expand Down
Loading

0 comments on commit 9e9ff1a

Please sign in to comment.