Skip to content

Commit

Permalink
fix bug of min max
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent 4e4fbd1 commit 4cfbadf
Showing 1 changed file with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import breeze.linalg.{Vector => BV}

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import breeze.linalg.axpy

case class VectorRDDStatisticalSummary(
mean: Vector,
Expand All @@ -35,8 +34,8 @@ private case class VectorRDDStatisticalRing(
fakeM2n: BV[Double],
totalCnt: Double,
nnz: BV[Double],
max: BV[Double],
min: BV[Double])
fakeMax: BV[Double],
fakeMin: BV[Double])

/**
* Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
Expand All @@ -58,7 +57,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
* with the size of Vector as input parameter.
*/

private def seqOp(aggregator: VectorRDDStatisticalRing, currData: BV[Double]): VectorRDDStatisticalRing = {
private def seqOp(
aggregator: VectorRDDStatisticalRing,
currData: BV[Double]): VectorRDDStatisticalRing = {
aggregator match {
case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
currData.activeIterator.foreach {
Expand All @@ -73,7 +74,8 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
nnzVec(id) += 1.0
}

VectorRDDStatisticalRing(prevMean,
VectorRDDStatisticalRing(
prevMean,
prevM2n,
cnt + 1.0,
nnzVec,
Expand All @@ -82,7 +84,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
}
}

private def combOp(statistics1: VectorRDDStatisticalRing, statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
private def combOp(
statistics1: VectorRDDStatisticalRing,
statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
(statistics1, statistics2) match {
case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1),
VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
Expand Down Expand Up @@ -111,26 +115,34 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
BV.fill(size)(Double.MinValue),
BV.fill(size)(Double.MaxValue))

val breezeVectors = self.collect().map(_.toBreeze)
val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, max, min) = breezeVectors.aggregate(zeroValue)(seqOp, combOp)
val breezeVectors = self.map(_.toBreeze)
val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) =
breezeVectors.aggregate(zeroValue)(seqOp, combOp)

// solve real mean
val realMean = fakeMean :* nnz :/ totalCnt
// solve real variance
val deltaMean = fakeMean :- 0.0
val realVar = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
// max, min process, in case of a column is all zero.
// max :+= 0.0
// min :+= 0.0

// max, min
val max = Vectors.sparse(size, fakeMax.activeIterator.map { case (id, value) =>
if ((value == Double.MinValue) && (realMean(id) != Double.MinValue)) (id, 0.0)
else (id, value)
}.toSeq)
val min = Vectors.sparse(size, fakeMin.activeIterator.map { case (id, value) =>
if ((value == Double.MaxValue) && (realMean(id) != Double.MaxValue)) (id, 0.0)
else (id, value)
}.toSeq)

// get variance
realVar :/= totalCnt

VectorRDDStatisticalSummary(
Vectors.fromBreeze(realMean),
Vectors.fromBreeze(realVar),
totalCnt.toLong,
Vectors.fromBreeze(nnz),
Vectors.fromBreeze(max),
Vectors.fromBreeze(min))
max,
min)
}
}

0 comments on commit 4cfbadf

Please sign in to comment.