Skip to content

Commit

Permalink
update NaiveBayes to support sparse data
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 31, 2014
1 parent 0f8759b commit 3432e84
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class PythonMLLibAPI extends Serializable {
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.labels))
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleMatrix(model.theta))
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,43 @@

package org.apache.spark.mllib.classification

import scala.collection.mutable
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}

import org.jblas.DoubleMatrix
import breeze.linalg.{Vector => BV}

import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

/**
* Model for Naive Bayes Classifiers.
*
* @param pi Log of class priors, whose dimension is C.
* @param theta Log of class conditional probabilities, whose dimension is CxD.
*/
class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
extends ClassificationModel with Serializable {

// Create a column vector that can be used for predictions
private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
private val _theta = new DoubleMatrix(theta)
class NaiveBayesModel(
val labels: Array[Double],
val pi: Array[Double],
val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {

private val brzPi = new BDV[Double](pi)
private val brzTheta = new BDM[Double](theta.length, theta(0).length)

var i = 0
while (i < theta.length) {
var j = 0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j += 1
}
i += 1
}

override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)

override def predict(testData: Vector): Double = predict(testData.toArray)

private def predict(testData: Array[Double]): Double = {
val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
val result = _pi.add(_theta.mmul(dataMatrix))
result.argmax()
override def predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
}
}

Expand All @@ -61,9 +65,8 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
class NaiveBayes private (var lambda: Double)
extends Serializable with Logging
{
class NaiveBayes private (var lambda: Double) extends Serializable with Logging {

def this() = this(1.0)

/** Set the smoothing parameter. Default: 1.0. */
Expand All @@ -78,57 +81,37 @@ class NaiveBayes private (var lambda: Double)
* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def run(data: RDD[LabeledPoint]) = {
val agg = data.map(p => (p.label, p.features)).combineByKey[(Long, BV[Double])](
// Aggregates term frequencies per label.
val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
createCombiner = (v: Vector) => (1L, v.toBreeze.toDenseVector),
mergeValue = (c: (Long, BV[Double]), v: Vector) => (c._1 + 1L, c._2 += v.toBreeze),
mergeCombiners = (c1: (Long, BV[Double]), c2: (Long, BV[Double])) =>
mergeValue = (c: (Long, BDV[Double]), v: Vector) => (c._1 + 1L, c._2 += v.toBreeze),
mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
(c1._1 + c2._1, c1._2 += c2._2)
).collect()
val numLabels = agg.size
}

/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
* @param data RDD of (label, array of features) pairs.
*/
private def runRaw(data: RDD[(Double, Array[Double])]) = {
// Aggregates all sample points to driver side to get sample count and summed feature vector
// for each label. The shape of `zeroCombiner` & `aggregated` is:
//
// label: Int -> (count: Int, featuresSum: DoubleMatrix)
val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
point match {
case (label, features) =>
val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
val fs = new DoubleMatrix(features.length, 1, features: _*)
combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
}
}, { (lhs, rhs) =>
for ((label, (c, fs)) <- rhs) {
val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
lhs(label) = (count + c, featuresSum.addi(fs))
val numLabels = aggregated.length
var numExamples = 0L
aggregated.foreach { case (_, (n, _)) =>
numExamples += n
}
val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
val labels = new Array[Double](numLabels)
val pi = new Array[Double](numLabels)
val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
val piLogDenom = math.log(numExamples + numLabels * lambda)
var i = 0
aggregated.foreach { case (label, (n, sum)) =>
labels(i) = label
val thetaLogDenom = math.log(brzSum(sum) + numFeatures * lambda)
pi(i) = math.log(n + lambda) - piLogDenom
var j = 0
while (j < numFeatures) {
theta(i)(j) = math.log(sum(j) + lambda) - thetaLogDenom
j += 1
}
lhs
})

// Kinds of label
val C = aggregated.size
// Total sample count
val N = aggregated.values.map(_._1).sum

val pi = new Array[Double](C)
val theta = new Array[Array[Double]](C)
val piLogDenom = math.log(N + C * lambda)

for ((label, (count, fs)) <- aggregated) {
val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
pi(label) = math.log(count + lambda) - piLogDenom
theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
i += 1
}

new NaiveBayesModel(pi, theta)
new NaiveBayesModel(labels, pi, theta)
}
}

Expand Down Expand Up @@ -178,8 +161,9 @@ object NaiveBayes {
} else {
NaiveBayes.train(data, args(2).toDouble)
}
println("Pi: " + model.pi.mkString("[", ", ", "]"))
println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))

println("Pi\n: " + model.pi)
println("Theta:\n" + model.theta)

sc.stop()
}
Expand Down
12 changes: 7 additions & 5 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ class NaiveBayesModel(object):
>>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0
0.0
>>> model.predict(array([1.0, 0.0]))
1
1.0
"""

def __init__(self, pi, theta):
def __init__(self, labels, pi, theta):
self.labels = labels
self.pi = pi
self.theta = theta

def predict(self, x):
"""Return the most likely class for a data vector x"""
return numpy.argmax(self.pi + dot(x, self.theta))
return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]

class NaiveBayes(object):
@classmethod
Expand All @@ -122,7 +123,8 @@ def train(cls, data, lambda_=1.0):
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),
_deserialize_double_matrix(ans[1]))
_deserialize_double_vector(ans[1]),
_deserialize_double_matrix(ans[2]))


def _test():
Expand Down

0 comments on commit 3432e84

Please sign in to comment.