Skip to content

Commit

Permalink
[SPARK-1594][MLLIB] Cleaning up MLlib APIs and guide
Browse files Browse the repository at this point in the history
Final pass before the v1.0 release.

* Remove `VectorRDDs`
* Move `BinaryClassificationMetrics` from `evaluation.binary` to `evaluation`
* Change default value of `addIntercept` to false and allow to add intercept in Ridge and Lasso.
* Clean `DecisionTree` package doc and test suite.
* Mark model constructors `private[spark]`
* Rename `loadLibSVMData` to `loadLibSVMFile` and hide `LabelParser` from users.
* Add `saveAsLibSVMFile`.
* Add `appendBias` to `MLUtils`.

Author: Xiangrui Meng <meng@databricks.com>

Closes #524 from mengxr/mllib-cleaning and squashes the following commits:

295dc8b [Xiangrui Meng] update loadLibSVMFile doc
1977ac1 [Xiangrui Meng] fix doc of appendBias
649fcf0 [Xiangrui Meng] rename loadLibSVMData to loadLibSVMFile; hide LabelParser from user APIs
54b812c [Xiangrui Meng] add appendBias
a71e7d0 [Xiangrui Meng] add saveAsLibSVMFile
d976295 [Xiangrui Meng] Merge branch 'master' into mllib-cleaning
b7e5cec [Xiangrui Meng] remove some experimental annotations and make model constructors private[mllib]
9b02b93 [Xiangrui Meng] minor code style update
a593ddc [Xiangrui Meng] fix python tests
fc28c18 [Xiangrui Meng] mark more classes experimental
f6cbbff [Xiangrui Meng] fix Java tests
0af70b0 [Xiangrui Meng] minor
6e139ef [Xiangrui Meng] Merge branch 'master' into mllib-cleaning
94e6dce [Xiangrui Meng] move BinaryLabelCounter and BinaryConfusionMatrixImpl to evaluation.binary
df34907 [Xiangrui Meng] clean DecisionTreeSuite to use LocalSparkContext
c81807f [Xiangrui Meng] set the default value of AddIntercept to false
03389c0 [Xiangrui Meng] allow to add intercept in Ridge and Lasso
c66c56f [Xiangrui Meng] move tree md to package object doc
a2695df [Xiangrui Meng] update guide for BinaryClassificationMetrics
9194f4c [Xiangrui Meng] move BinaryClassificationMetrics one level up
1c1a0e3 [Xiangrui Meng] remove VectorRDDs because it only contains one function that is not necessary for us to maintain

(cherry picked from commit 98750a7)
Signed-off-by: Matei Zaharia <matei@databricks.com>
  • Loading branch information
mengxr authored and mateiz committed May 6, 2014
1 parent a5f765c commit 32c960a
Show file tree
Hide file tree
Showing 39 changed files with 390 additions and 329 deletions.
2 changes: 1 addition & 1 deletion docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ error.
{% highlight scala %}
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.binary.BinaryClassificationMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scopt.OptionParser

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.evaluation.binary.BinaryClassificationMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater}

Expand Down Expand Up @@ -96,7 +96,7 @@ object BinaryClassification {

Logger.getRootLogger.setLevel(Level.WARN)

val examples = MLUtils.loadLibSVMData(sc, params.input).cache()
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()

val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scopt.OptionParser

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.util.{MulticlassLabelParser, MLUtils}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ object LinearRegression extends App {

Logger.getRootLogger.setLevel(Level.WARN)

val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser).cache()
val examples = MLUtils.loadLibSVMFile(sc, params.input, multiclass = true).cache()

val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ object SparseNaiveBayes {
val minPartitions =
if (params.minPartitions > 0) params.minPartitions else sc.defaultMinPartitions

val examples = MLUtils.loadLibSVMData(sc, params.input, MulticlassLabelParser,
params.numFeatures, minPartitions)
val examples =
MLUtils.loadLibSVMFile(sc, params.input, multiclass = true, params.numFeatures, minPartitions)
// Cache examples because it will be used in both training and evaluation.
examples.cache()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.mllib.classification

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
Expand All @@ -29,26 +30,30 @@ import org.apache.spark.rdd.RDD
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
class LogisticRegressionModel(
class LogisticRegressionModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

private var threshold: Option[Double] = Some(0.5)

/**
* :: Experimental ::
* Sets the threshold that separates positive predictions from negative predictions. An example
* with prediction score greater than or equal to this threshold is identified as an positive,
* and negative otherwise. The default value is 0.5.
*/
@Experimental
def setThreshold(threshold: Double): this.type = {
this.threshold = Some(threshold)
this
}

/**
* :: Experimental ::
* Clears the threshold so that `predict` will output raw prediction scores.
*/
@Experimental
def clearThreshold(): this.type = {
threshold = None
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@ package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
*/
@Experimental
class NaiveBayesModel(
class NaiveBayesModel private[mllib] (
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
Expand Down Expand Up @@ -124,6 +121,9 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
}
}

/**
* Top-level methods for calling naive Bayes.
*/
object NaiveBayes {
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.mllib.classification

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
Expand All @@ -29,26 +30,30 @@ import org.apache.spark.rdd.RDD
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
class SVMModel(
class SVMModel private[mllib] (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

private var threshold: Option[Double] = Some(0.0)

/**
* :: Experimental ::
* Sets the threshold that separates positive predictions from negative predictions. An example
* with prediction score greater than or equal to this threshold is identified as an positive,
* and negative otherwise. The default value is 0.0.
*/
@Experimental
def setThreshold(threshold: Double): this.type = {
this.threshold = Some(threshold)
this
}

/**
* :: Experimental ::
* Clears the threshold so that `predict` will output raw prediction scores.
*/
@Experimental
def clearThreshold(): this.type = {
threshold = None
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand Down Expand Up @@ -81,6 +82,7 @@ class KMeans private (
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
*/
@Experimental
def setRuns(runs: Int): KMeans = {
if (runs <= 0) {
throw new IllegalArgumentException("Number of runs must be positive")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.Vector
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
class KMeansModel(val clusterCenters: Array[Vector]) extends Serializable {
class KMeansModel private[mllib] (val clusterCenters: Array[Vector]) extends Serializable {

/** Total number of clusters. */
def k: Int = clusterCenters.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,83 +15,22 @@
* limitations under the License.
*/

package org.apache.spark.mllib.evaluation.binary
package org.apache.spark.mllib.evaluation

import org.apache.spark.rdd.{UnionRDD, RDD}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.evaluation.AreaUnderCurve
import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.evaluation.binary._
import org.apache.spark.rdd.{RDD, UnionRDD}

/**
* Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]].
*
* @param count label counter for labels with scores greater than or equal to the current score
* @param totalCount label counter for all labels
*/
private case class BinaryConfusionMatrixImpl(
count: LabelCounter,
totalCount: LabelCounter) extends BinaryConfusionMatrix with Serializable {

/** number of true positives */
override def numTruePositives: Long = count.numPositives

/** number of false positives */
override def numFalsePositives: Long = count.numNegatives

/** number of false negatives */
override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives

/** number of true negatives */
override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives

/** number of positives */
override def numPositives: Long = totalCount.numPositives

/** number of negatives */
override def numNegatives: Long = totalCount.numNegatives
}

/**
* :: Experimental ::
* Evaluator for binary classification.
*
* @param scoreAndLabels an RDD of (score, label) pairs.
*/
class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
extends Serializable with Logging {

private lazy val (
cumulativeCounts: RDD[(Double, LabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
// and then sort by score values in descending order.
val counts = scoreAndLabels.combineByKey(
createCombiner = (label: Double) => new LabelCounter(0L, 0L) += label,
mergeValue = (c: LabelCounter, label: Double) => c += label,
mergeCombiners = (c1: LabelCounter, c2: LabelCounter) => c1 += c2
).sortByKey(ascending = false)
val agg = counts.values.mapPartitions({ iter =>
val agg = new LabelCounter()
iter.foreach(agg += _)
Iterator(agg)
}, preservesPartitioning = true).collect()
val partitionwiseCumulativeCounts =
agg.scanLeft(new LabelCounter())((agg: LabelCounter, c: LabelCounter) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount")
val cumulativeCounts = counts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, LabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
iter.map { case (score, c) =>
cumCount += c
(score, cumCount.clone())
}
}, preservesPartitioning = true)
cumulativeCounts.persist()
val confusions = cumulativeCounts.map { case (score, cumCount) =>
(score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
}
(cumulativeCounts, confusions)
}
@Experimental
class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends Logging {

/** Unpersist intermediate RDDs used in the computation. */
def unpersist() {
Expand Down Expand Up @@ -154,6 +93,41 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
/** Returns the (threshold, recall) curve. */
def recallByThreshold(): RDD[(Double, Double)] = createCurve(Recall)

private lazy val (
cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// Create a bin for each distinct score value, count positives and negatives within each bin,
// and then sort by score values in descending order.
val counts = scoreAndLabels.combineByKey(
createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortByKey(ascending = false)
val agg = counts.values.mapPartitions({ iter =>
val agg = new BinaryLabelCounter()
iter.foreach(agg += _)
Iterator(agg)
}, preservesPartitioning = true).collect()
val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())(
(agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount")
val cumulativeCounts = counts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
iter.map { case (score, c) =>
cumCount += c
(score, cumCount.clone())
}
}, preservesPartitioning = true)
cumulativeCounts.persist()
val confusions = cumulativeCounts.map { case (score, cumCount) =>
(score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
}
(cumulativeCounts, confusions)
}

/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (s, c) =>
Expand All @@ -170,35 +144,3 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)])
}
}
}

/**
* A counter for positives and negatives.
*
* @param numPositives number of positive labels
* @param numNegatives number of negative labels
*/
private class LabelCounter(
var numPositives: Long = 0L,
var numNegatives: Long = 0L) extends Serializable {

/** Processes a label. */
def +=(label: Double): LabelCounter = {
// Though we assume 1.0 for positive and 0.0 for negative, the following check will handle
// -1.0 for negative as well.
if (label > 0.5) numPositives += 1L else numNegatives += 1L
this
}

/** Merges another counter. */
def +=(other: LabelCounter): LabelCounter = {
numPositives += other.numPositives
numNegatives += other.numNegatives
this
}

override def clone: LabelCounter = {
new LabelCounter(numPositives, numNegatives)
}

override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,32 @@ private[evaluation] trait BinaryConfusionMatrix {
/** number of negatives */
def numNegatives: Long = numFalsePositives + numTrueNegatives
}

/**
* Implementation of [[org.apache.spark.mllib.evaluation.binary.BinaryConfusionMatrix]].
*
* @param count label counter for labels with scores greater than or equal to the current score
* @param totalCount label counter for all labels
*/
private[evaluation] case class BinaryConfusionMatrixImpl(
count: BinaryLabelCounter,
totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix {

/** number of true positives */
override def numTruePositives: Long = count.numPositives

/** number of false positives */
override def numFalsePositives: Long = count.numNegatives

/** number of false negatives */
override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives

/** number of true negatives */
override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives

/** number of positives */
override def numPositives: Long = totalCount.numPositives

/** number of negatives */
override def numNegatives: Long = totalCount.numNegatives
}
Loading

0 comments on commit 32c960a

Please sign in to comment.