From 4cfbadf963c04dd88d8677a784492a4adf84a57f Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 1 Apr 2014 16:47:25 +0800 Subject: [PATCH] fix bug of min max --- .../spark/mllib/rdd/VectorRDDFunctions.scala | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala index 8ab93630a2463..a39b6f81cf6ed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala @@ -20,7 +20,6 @@ import breeze.linalg.{Vector => BV} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD -import breeze.linalg.axpy case class VectorRDDStatisticalSummary( mean: Vector, @@ -35,8 +34,8 @@ private case class VectorRDDStatisticalRing( fakeM2n: BV[Double], totalCnt: Double, nnz: BV[Double], - max: BV[Double], - min: BV[Double]) + fakeMax: BV[Double], + fakeMin: BV[Double]) /** * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an @@ -58,7 +57,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { * with the size of Vector as input parameter. */ - private def seqOp(aggregator: VectorRDDStatisticalRing, currData: BV[Double]): VectorRDDStatisticalRing = { + private def seqOp( + aggregator: VectorRDDStatisticalRing, + currData: BV[Double]): VectorRDDStatisticalRing = { aggregator match { case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) => currData.activeIterator.foreach { @@ -73,7 +74,8 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { nnzVec(id) += 1.0 } - VectorRDDStatisticalRing(prevMean, + VectorRDDStatisticalRing( + prevMean, prevM2n, cnt + 1.0, nnzVec, @@ -82,7 +84,9 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { } } - private def combOp(statistics1: VectorRDDStatisticalRing, statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = { + private def combOp( + statistics1: VectorRDDStatisticalRing, + statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = { (statistics1, statistics2) match { case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1), VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) => @@ -111,18 +115,26 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { BV.fill(size)(Double.MinValue), BV.fill(size)(Double.MaxValue)) - val breezeVectors = self.collect().map(_.toBreeze) - val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, max, min) = breezeVectors.aggregate(zeroValue)(seqOp, combOp) + val breezeVectors = self.map(_.toBreeze) + val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) = + breezeVectors.aggregate(zeroValue)(seqOp, combOp) // solve real mean val realMean = fakeMean :* nnz :/ totalCnt // solve real variance val deltaMean = fakeMean :- 0.0 val realVar = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt) - // max, min process, in case of a column is all zero. - // max :+= 0.0 - // min :+= 0.0 - + // max, min + val max = Vectors.sparse(size, fakeMax.activeIterator.map { case (id, value) => + if ((value == Double.MinValue) && (realMean(id) != Double.MinValue)) (id, 0.0) + else (id, value) + }.toSeq) + val min = Vectors.sparse(size, fakeMin.activeIterator.map { case (id, value) => + if ((value == Double.MaxValue) && (realMean(id) != Double.MaxValue)) (id, 0.0) + else (id, value) + }.toSeq) + + // get variance realVar :/= totalCnt VectorRDDStatisticalSummary( @@ -130,7 +142,7 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable { Vectors.fromBreeze(realVar), totalCnt.toLong, Vectors.fromBreeze(nnz), - Vectors.fromBreeze(max), - Vectors.fromBreeze(min)) + max, + min) } }