Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4894][mllib] Added Bernoulli option to NaiveBayes model in mllib #4087

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ce73c63
added Bernoulli option to niave bayes model in mllib, added optional …
leahmcguire Jan 16, 2015
4a3676d
Updated changes re-comments. Got rid of verbose populateMatrix method…
leahmcguire Jan 21, 2015
0313c0c
fixed style error in NaiveBayes.scala
leahmcguire Jan 21, 2015
76e5b0f
removed unnecessary sort from test
leahmcguire Jan 26, 2015
d9477ed
removed old inaccurate comment from test suite for mllib naive bayes
leahmcguire Feb 26, 2015
3891bf2
synced with apache spark and resolved merge conflict
leahmcguire Feb 27, 2015
5a4a534
fixed scala style error in NaiveBayes
leahmcguire Feb 27, 2015
b61b5e2
added back compatable constructor to NaiveBayesModel to fix MIMA test…
leahmcguire Mar 2, 2015
3730572
modified NB model type to be more Java-friendly
jkbradley Mar 3, 2015
b93aaf6
Merge pull request #1 from jkbradley/nb-model-type
leahmcguire Mar 5, 2015
7622b0c
added comments and fixed style as per rb
leahmcguire Mar 5, 2015
dc65374
integrated model type fix
leahmcguire Mar 5, 2015
85f298f
Merge remote-tracking branch 'upstream/master'
leahmcguire Mar 5, 2015
e016569
updated test suite with model type fix
leahmcguire Mar 5, 2015
ea09b28
Merge remote-tracking branch 'upstream/master'
leahmcguire Mar 5, 2015
900b586
fixed model call so that uses type argument
leahmcguire Mar 5, 2015
b85b0c9
Merge remote-tracking branch 'upstream/master'
leahmcguire Mar 5, 2015
c298e78
fixed scala style errors
leahmcguire Mar 5, 2015
2d0c1ba
fixed typo in NaiveBayes
leahmcguire Mar 5, 2015
e2d925e
fixed nonserializable error that was causing naivebayes test failures
leahmcguire Mar 7, 2015
fb0a5c7
removed typo
leahmcguire Mar 9, 2015
01baad7
made fixes from code review
leahmcguire Mar 11, 2015
bea62af
put back in constructor for NaiveBayes
leahmcguire Mar 12, 2015
18f3219
removed private from naive bayes constructor for lambda only
leahmcguire Mar 12, 2015
a22d670
changed NaiveBayesModel modelType parameter back to NaiveBayes.ModelT…
leahmcguire Mar 17, 2015
852a727
merged with upstream master
leahmcguire Mar 21, 2015
6a8f383
Added new model save/load format 2.0 for NaiveBayesModel after modelT…
jkbradley Mar 22, 2015
9ad89ca
removed old code
jkbradley Mar 22, 2015
2224b15
Merge pull request #2 from jkbradley/leahmcguire-master
leahmcguire Mar 24, 2015
acb69af
removed enum type and replaces all modelType parameters with strings
leahmcguire Mar 28, 2015
f3c8994
changed checks on model type to requires
leahmcguire Mar 31, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,67 @@

package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD


/**
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation

*/
object NaiveBayesModels extends Enumeration {
type NaiveBayesModels = Value
val Multinomial, Bernoulli = Value
}

/**
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
* @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be
* Multinomial or Bernoulli
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line

class NaiveBayesModel private[mllib] (
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)
val theta: Array[Array[Double]],
val model: NaiveBayesModels) extends ClassificationModel with Serializable {

{
// Need to put an extra pair of braces to prevent Scala treating `i` as a member.
def populateMatrix(arrayIn: Array[Array[Double]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems excessive. Does the Breeze library support element-wise log/exp and addition/subtraction with matrices? If so, that would be cleaner and less verbose.

matrixIn: BDM[Double],
transformation: (Double) => Double = (x) => x) = {
var i = 0
while (i < theta.length) {
while (i < arrayIn.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
while (j < arrayIn(i).length) {
matrixIn(i, j) = transformation(theta(i)(j))
j += 1
}
i += 1
}
}

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)
populateMatrix(theta, brzTheta)

private val brzNegTheta: Option[BDM[Double]] = model match {
case NaiveBayesModels.Multinomial => None
case NaiveBayesModels.Bernoulli =>
val negTheta = new BDM[Double](theta.length, theta(0).length)
populateMatrix(theta, negTheta, (x) => math.log(1.0 - math.exp(x)))
Option(negTheta)
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
Expand All @@ -63,7 +87,14 @@ class NaiveBayesModel private[mllib] (
}

override def predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
model match {
case NaiveBayesModels.Multinomial =>
labels (brzArgmax (brzPi + brzTheta * testData.toBreeze) )
case NaiveBayesModels.Bernoulli =>
labels (brzArgmax (brzPi +
(brzTheta - brzNegTheta.get) * testData.toBreeze +
brzSum(brzNegTheta.get, Axis._1)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sum could be precomputed. I'm also wondering: Is there some normalization going on here which isn't needed to get the argmax?

}
}
}

Expand All @@ -75,16 +106,26 @@ class NaiveBayesModel private[mllib] (
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative.
*/
class NaiveBayes private (private var lambda: Double) extends Serializable with Logging {
class NaiveBayes private (private var lambda: Double,
var model: NaiveBayesModels) extends Serializable with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Model should probably be a val, not a var.


def this() = this(1.0)
def this(lambda: Double) = this(lambda, NaiveBayesModels.Multinomial)

def this() = this(1.0, NaiveBayesModels.Multinomial)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest removing the default model for the internal API. Backwards compatibility only matters for public API.


/** Set the smoothing parameter. Default: 1.0. */
def setLambda(lambda: Double): NaiveBayes = {
this.lambda = lambda
this
}

/** Set the model type. Default: Multinomial. */
def setModelType(model: NaiveBayesModels): NaiveBayes = {
this.model = model
this
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra space

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
Expand Down Expand Up @@ -118,21 +159,27 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
(c1._1 + c2._1, c1._2 += c2._2)
).collect()

val numLabels = aggregated.length
var numDocuments = 0L
aggregated.foreach { case (_, (n, _)) =>
numDocuments += n
}
val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }

val labels = new Array[Double](numLabels)
val pi = new Array[Double](numLabels)
val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

val piLogDenom = math.log(numDocuments + numLabels * lambda)
var i = 0
aggregated.foreach { case (label, (n, sumTermFreqs)) =>
labels(i) = label
val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
pi(i) = math.log(n + lambda) - piLogDenom
val thetaLogDenom = model match {
case NaiveBayesModels.Multinomial => math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
case NaiveBayesModels.Bernoulli => math.log(n + 2.0 * lambda)
}
var j = 0
while (j < numFeatures) {
theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
Expand All @@ -141,7 +188,7 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
i += 1
}

new NaiveBayesModel(labels, pi, theta)
new NaiveBayesModel(labels, pi, theta, model)
}
}

Expand All @@ -154,8 +201,7 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
* document classification.
*
* This version of the method uses a default smoothing parameter of 1.0.
*
Expand All @@ -171,8 +217,7 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
* document classification.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
Expand All @@ -181,4 +226,25 @@ object NaiveBayes {
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda).run(input)
}


/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is by default the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update documentation

* all kinds of discrete data. For example, by converting documents into TF-IDF vectors,
* it can be used for document classification. By making every vector a 0-1 vector and
* setting the model type to NaiveBayesModels.Bernoulli, it fits and predicts as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smoothing parameter
*
* @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be
* Multinomial or Bernoulli
*/
def train(input: RDD[LabeledPoint], lambda: Double, model: NaiveBayesModels): NaiveBayesModel = {
new NaiveBayes(lambda, model).run(input)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import breeze.stats.distributions.Multinomial
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels

import scala.util.Random

import org.scalatest.FunSuite
Expand All @@ -39,10 +43,12 @@ object NaiveBayesSuite {

// Generate input of the form Y = (theta * x).argmax()
def generateNaiveBayesInput(
pi: Array[Double], // 1XC
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int): Seq[LabeledPoint] = {
pi: Array[Double], // 1XC
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int,
dataModel: NaiveBayesModels = NaiveBayesModels.Multinomial,
sample: Int = 10): Seq[LabeledPoint] = {
val D = theta(0).length
val rnd = new Random(seed)

Expand All @@ -51,8 +57,17 @@ object NaiveBayesSuite {

for (i <- 0 until nPoints) yield {
val y = calcLabel(rnd.nextDouble(), _pi)
val xi = Array.tabulate[Double](D) { j =>
if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
val xi = dataModel match {
case NaiveBayesModels.Bernoulli => Array.tabulate[Double] (D) {j =>
if (rnd.nextDouble () < _theta(y)(j) ) 1 else 0
}
case NaiveBayesModels.Multinomial =>
val mult = Multinomial(BDV(_theta(y)))
val emptyMap = (0 until D).map(x => (x, 0.0)).toMap
val counts = emptyMap ++ mult.sample(sample).groupBy(x => x).map {
case (index, reps) => (index, reps.size.toDouble)
}
counts.toArray.sortBy(_._1).map(_._2)
}

LabeledPoint(y, Vectors.dense(xi))
Expand All @@ -71,23 +86,68 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
assert(numOfPredictions < input.length / 5)
}

test("Naive Bayes") {
def validateModelFit(piData: Array[Double], thetaData: Array[Array[Double]], model: NaiveBayesModel) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (limit to <= 100 chars). (This may not be checked by Scala style since it's a test suite, but it's still nice to follow.)

def closeFit(d1: Double, d2: Double, precision: Double): Boolean = {
(d1 - d2).abs <= precision
}
val modelIndex = (0 until piData.length).zip(model.labels.map(_.toInt))
for (i <- modelIndex) {
assert(closeFit(math.exp(piData(i._2)), math.exp(model.pi(i._1)), 0.05))
}
for (i <- modelIndex) {
val sortedData = thetaData(i._2).sorted
val sortedModel = model.theta(i._1).sorted
for (j <- 0 until sortedData.length) {
assert(closeFit(math.exp(sortedData(j)), math.exp(sortedModel(j)), 0.05))
}
}
}

test("Naive Bayes Multinomial") {
val nPoints = 1000

val pi = Array(0.5, 0.1, 0.4).map(math.log)
val theta = Array(
Array(0.70, 0.10, 0.10, 0.10), // label 0
Array(0.10, 0.70, 0.10, 0.10), // label 1
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42, NaiveBayesModels.Multinomial)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long

val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Multinomial)
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17, NaiveBayesModels.Multinomial)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long

val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)

// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}

test("Naive Bayes Bernoulli") {
val nPoints = 10000

val pi = Array(0.5, 0.3, 0.2).map(math.log)
val theta = Array(
Array(0.91, 0.03, 0.03, 0.03), // label 0
Array(0.03, 0.91, 0.03, 0.03), // label 1
Array(0.03, 0.03, 0.91, 0.03) // label 2
Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0
Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1
Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42)
val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 45, NaiveBayesModels.Bernoulli)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long

val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD)
val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Bernoulli) ///!!! this gives same result on both models check the math
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17)
val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 20, NaiveBayesModels.Bernoulli)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long

val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand Down