Skip to content

Commit

Permalink
merge VectorRDDStatistics into RowMatrix
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent 48ee053 commit 4eaf28a
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed

import java.util

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}

Expand All @@ -29,7 +29,171 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.Logging

/**
* :: Experimental ::
* Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
* count.
*/
trait VectorRDDStatisticalSummary {

/**
* Computes the mean of columns in RDD[Vector].
*/
def mean: Vector

/**
* Computes the sample variance of columns in RDD[Vector].
*/
def variance: Vector

/**
* Computes number of vectors in RDD[Vector].
*/
def count: Long

/**
* Computes the number of non-zero elements in each column of RDD[Vector].
*/
def numNonZeros: Vector

/**
* Computes the maximum of each column in RDD[Vector].
*/
def max: Vector

/**
* Computes the minimum of each column in RDD[Vector].
*/
def min: Vector
}


/**
* Aggregates [[org.apache.spark.mllib.linalg.distributed.VectorRDDStatisticalSummary
* VectorRDDStatisticalSummary]] together with add() and merge() function. Online variance solution
* used in add() function, while parallel variance solution used in merge() function. Reference here
* : [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution
* here ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm
* to O(nnz). Real variance is computed here after we get other statistics, simply by another
* parallel combination process.
*/
private class VectorRDDStatisticsAggregator(
val currMean: BDV[Double],
val currM2n: BDV[Double],
var totalCnt: Double,
val nnz: BDV[Double],
val currMax: BDV[Double],
val currMin: BDV[Double])
extends VectorRDDStatisticalSummary with Serializable {

override def mean = {
val realMean = BDV.zeros[Double](currMean.length)
var i = 0
while (i < currMean.length) {
realMean(i) = currMean(i) * nnz(i) / totalCnt
i += 1
}
Vectors.fromBreeze(realMean)
}

override def variance = {
val realVariance = BDV.zeros[Double](currM2n.length)

val denominator = totalCnt - 1.0

// Sample variance is computed, if the denominator is 0, the variance is just 0.
if (denominator != 0.0) {
val deltaMean = currMean
var i = 0
while (i < currM2n.size) {
realVariance(i) =
currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
realVariance(i) /= denominator
i += 1
}
}

Vectors.fromBreeze(realVariance)
}

override def count: Long = totalCnt.toLong

override def numNonZeros: Vector = Vectors.fromBreeze(nnz)

override def max: Vector = {
var i = 0
while (i < nnz.length) {
if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
i += 1
}
Vectors.fromBreeze(currMax)
}

override def min: Vector = {
var i = 0
while (i < nnz.length) {
if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
i += 1
}
Vectors.fromBreeze(currMin)
}

/**
* Aggregate function used for aggregating elements in a worker together.
*/
def add(currData: BV[Double]): this.type = {
currData.activeIterator.foreach {
// this case is used for filtering the zero elements if the vector.
case (id, 0.0) =>
case (id, value) =>
if (currMax(id) < value) currMax(id) = value
if (currMin(id) > value) currMin(id) = value

val tmpPrevMean = currMean(id)
currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)

nnz(id) += 1.0
}

totalCnt += 1.0
this
}

/**
* Combine function used for combining intermediate results together from every worker.
*/
def merge(other: VectorRDDStatisticsAggregator): this.type = {

totalCnt += other.totalCnt

val deltaMean = currMean - other.currMean

var i = 0
while (i < other.currMean.length) {
// merge mean together
if (other.currMean(i) != 0.0) {
currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) /
(nnz(i) + other.nnz(i))
}

// merge m2n together
if (nnz(i) + other.nnz(i) != 0.0) {
currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
(nnz(i) + other.nnz(i))
}

if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i)

if (currMin(i) > other.currMin(i)) currMin(i) = other.currMin(i)

i += 1
}

nnz += other.nnz
this
}
}

/**
* Represents a row-oriented distributed Matrix with no meaningful row indices.
*
* @param rows rows stored as an RDD[Vector]
Expand Down Expand Up @@ -240,6 +404,24 @@ class RowMatrix(
}
}

/**
* Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
*/
def multiVariateSummaryStatistics(): VectorRDDStatisticalSummary = {
val zeroValue = new VectorRDDStatisticsAggregator(
BDV.zeros[Double](nCols),
BDV.zeros[Double](nCols),
0.0,
BDV.zeros[Double](nCols),
BDV.fill(nCols)(Double.MinValue),
BDV.fill(nCols)(Double.MaxValue))

rows.map(_.toBreeze).aggregate[VectorRDDStatisticsAggregator](zeroValue)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2)
)
}

/**
* Multiply this matrix by a local matrix on the right.
*
Expand Down
Loading

0 comments on commit 4eaf28a

Please sign in to comment.