Skip to content

Commit

Permalink
added Bernoulli option to niave bayes model in mllib, added optional …
Browse files Browse the repository at this point in the history
…model type parameter for training. When Bernoulli is given the Bernoulli smoothing is used for fitting and for prediction http://nlp.stanford.edu/IR-book/html/htmledition/the-bernoulli-model-1.html
  • Loading branch information
leahmcguire committed Jan 16, 2015
1 parent a79a9f9 commit ce73c63
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,67 @@

package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD


/**
*
*/
object NaiveBayesModels extends Enumeration {
type NaiveBayesModels = Value
val Multinomial, Bernoulli = Value
}

/**
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
* @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be
* Multinomial or Bernoulli
*/

class NaiveBayesModel private[mllib] (
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)
val theta: Array[Array[Double]],
val model: NaiveBayesModels) extends ClassificationModel with Serializable {

{
// Need to put an extra pair of braces to prevent Scala treating `i` as a member.
def populateMatrix(arrayIn: Array[Array[Double]],
matrixIn: BDM[Double],
transformation: (Double) => Double = (x) => x) = {
var i = 0
while (i < theta.length) {
while (i < arrayIn.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
while (j < arrayIn(i).length) {
matrixIn(i, j) = transformation(theta(i)(j))
j += 1
}
i += 1
}
}

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)
populateMatrix(theta, brzTheta)

private val brzNegTheta: Option[BDM[Double]] = model match {
case NaiveBayesModels.Multinomial => None
case NaiveBayesModels.Bernoulli =>
val negTheta = new BDM[Double](theta.length, theta(0).length)
populateMatrix(theta, negTheta, (x) => math.log(1.0 - math.exp(x)))
Option(negTheta)
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
Expand All @@ -63,7 +87,14 @@ class NaiveBayesModel private[mllib] (
}

override def predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
model match {
case NaiveBayesModels.Multinomial =>
labels (brzArgmax (brzPi + brzTheta * testData.toBreeze) )
case NaiveBayesModels.Bernoulli =>
labels (brzArgmax (brzPi +
(brzTheta - brzNegTheta.get) * testData.toBreeze +
brzSum(brzNegTheta.get, Axis._1)))
}
}
}

Expand All @@ -75,16 +106,26 @@ class NaiveBayesModel private[mllib] (
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). The input feature values must be nonnegative.
*/
class NaiveBayes private (private var lambda: Double) extends Serializable with Logging {
class NaiveBayes private (private var lambda: Double,
var model: NaiveBayesModels) extends Serializable with Logging {

def this() = this(1.0)
def this(lambda: Double) = this(lambda, NaiveBayesModels.Multinomial)

def this() = this(1.0, NaiveBayesModels.Multinomial)

/** Set the smoothing parameter. Default: 1.0. */
def setLambda(lambda: Double): NaiveBayes = {
this.lambda = lambda
this
}

/** Set the model type. Default: Multinomial. */
def setModelType(model: NaiveBayesModels): NaiveBayes = {
this.model = model
this
}


/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
Expand Down Expand Up @@ -118,21 +159,27 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
(c1._1 + c2._1, c1._2 += c2._2)
).collect()

val numLabels = aggregated.length
var numDocuments = 0L
aggregated.foreach { case (_, (n, _)) =>
numDocuments += n
}
val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }

val labels = new Array[Double](numLabels)
val pi = new Array[Double](numLabels)
val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

val piLogDenom = math.log(numDocuments + numLabels * lambda)
var i = 0
aggregated.foreach { case (label, (n, sumTermFreqs)) =>
labels(i) = label
val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
pi(i) = math.log(n + lambda) - piLogDenom
val thetaLogDenom = model match {
case NaiveBayesModels.Multinomial => math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
case NaiveBayesModels.Bernoulli => math.log(n + 2.0 * lambda)
}
var j = 0
while (j < numFeatures) {
theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
Expand All @@ -141,7 +188,7 @@ class NaiveBayes private (private var lambda: Double) extends Serializable with
i += 1
}

new NaiveBayesModel(labels, pi, theta)
new NaiveBayesModel(labels, pi, theta, model)
}
}

Expand All @@ -154,8 +201,7 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
* document classification.
*
* This version of the method uses a default smoothing parameter of 1.0.
*
Expand All @@ -171,8 +217,7 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
* document classification.
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
Expand All @@ -181,4 +226,25 @@ object NaiveBayes {
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda).run(input)
}


/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
* This is by default the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle
* all kinds of discrete data. For example, by converting documents into TF-IDF vectors,
* it can be used for document classification. By making every vector a 0-1 vector and
* setting the model type to NaiveBayesModels.Bernoulli, it fits and predicts as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
* @param lambda The smoothing parameter
*
* @param model The type of NB model to fit from the enumeration NaiveBayesModels, can be
* Multinomial or Bernoulli
*/
def train(input: RDD[LabeledPoint], lambda: Double, model: NaiveBayesModels): NaiveBayesModel = {
new NaiveBayes(lambda, model).run(input)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.mllib.classification

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import breeze.stats.distributions.Multinomial
import org.apache.spark.mllib.classification.NaiveBayesModels.NaiveBayesModels

import scala.util.Random

import org.scalatest.FunSuite
Expand All @@ -39,10 +43,12 @@ object NaiveBayesSuite {

// Generate input of the form Y = (theta * x).argmax()
def generateNaiveBayesInput(
pi: Array[Double], // 1XC
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int): Seq[LabeledPoint] = {
pi: Array[Double], // 1XC
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int,
dataModel: NaiveBayesModels = NaiveBayesModels.Multinomial,
sample: Int = 10): Seq[LabeledPoint] = {
val D = theta(0).length
val rnd = new Random(seed)

Expand All @@ -51,8 +57,17 @@ object NaiveBayesSuite {

for (i <- 0 until nPoints) yield {
val y = calcLabel(rnd.nextDouble(), _pi)
val xi = Array.tabulate[Double](D) { j =>
if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
val xi = dataModel match {
case NaiveBayesModels.Bernoulli => Array.tabulate[Double] (D) {j =>
if (rnd.nextDouble () < _theta(y)(j) ) 1 else 0
}
case NaiveBayesModels.Multinomial =>
val mult = Multinomial(BDV(_theta(y)))
val emptyMap = (0 until D).map(x => (x, 0.0)).toMap
val counts = emptyMap ++ mult.sample(sample).groupBy(x => x).map {
case (index, reps) => (index, reps.size.toDouble)
}
counts.toArray.sortBy(_._1).map(_._2)
}

LabeledPoint(y, Vectors.dense(xi))
Expand All @@ -71,23 +86,68 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
assert(numOfPredictions < input.length / 5)
}

test("Naive Bayes") {
def validateModelFit(piData: Array[Double], thetaData: Array[Array[Double]], model: NaiveBayesModel) = {
def closeFit(d1: Double, d2: Double, precision: Double): Boolean = {
(d1 - d2).abs <= precision
}
val modelIndex = (0 until piData.length).zip(model.labels.map(_.toInt))
for (i <- modelIndex) {
assert(closeFit(math.exp(piData(i._2)), math.exp(model.pi(i._1)), 0.05))
}
for (i <- modelIndex) {
val sortedData = thetaData(i._2).sorted
val sortedModel = model.theta(i._1).sorted
for (j <- 0 until sortedData.length) {
assert(closeFit(math.exp(sortedData(j)), math.exp(sortedModel(j)), 0.05))
}
}
}

test("Naive Bayes Multinomial") {
val nPoints = 1000

val pi = Array(0.5, 0.1, 0.4).map(math.log)
val theta = Array(
Array(0.70, 0.10, 0.10, 0.10), // label 0
Array(0.10, 0.70, 0.10, 0.10), // label 1
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42, NaiveBayesModels.Multinomial)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Multinomial)
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17, NaiveBayesModels.Multinomial)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)

// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}

test("Naive Bayes Bernoulli") {
val nPoints = 10000

val pi = Array(0.5, 0.3, 0.2).map(math.log)
val theta = Array(
Array(0.91, 0.03, 0.03, 0.03), // label 0
Array(0.03, 0.91, 0.03, 0.03), // label 1
Array(0.03, 0.03, 0.91, 0.03) // label 2
Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0
Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1
Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42)
val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 45, NaiveBayesModels.Bernoulli)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD)
val model = NaiveBayes.train(testRDD, 1.0, NaiveBayesModels.Bernoulli) ///!!! this gives same result on both models check the math
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17)
val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 20, NaiveBayesModels.Bernoulli)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand Down

0 comments on commit ce73c63

Please sign in to comment.