Skip to content

Commit

Permalink
test sparse vector RDD
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 10, 2014
1 parent 18cf072 commit dc77e38
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private class Aggregator(
val deltaMean = currMean
var i = 0
while(i < currM2n.size) {
currM2n(i) -= deltaMean(i) * deltaMean(i) * nnz(i) * (nnz(i)-totalCnt) / totalCnt
currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
currM2n(i) /= totalCnt
i += 1
}
Expand All @@ -61,15 +61,15 @@ private class Aggregator(
override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)

override lazy val max: Vector = {
nnz.activeIterator.foreach {
nnz.iterator.foreach {
case (id, count) =>
if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0))) currMax(id) = 0.0
}
Vectors.fromBreeze(currMax)
}

override lazy val min: Vector = {
nnz.activeIterator.foreach {
nnz.iterator.foreach {
case (id, count) =>
if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
}
Expand All @@ -88,7 +88,7 @@ private class Aggregator(
if (currMin(id) > value) currMin(id) = value

val tmpPrevMean = currMean(id)
currMean(id) = (currMean(id) * totalCnt + value) / (totalCnt + 1.0)
currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)

nnz(id) += 1.0
Expand All @@ -114,11 +114,14 @@ private class Aggregator(
(currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
}

other.currM2n.activeIterator.foreach {
case (id, 0.0) =>
case (id, value) =>
currM2n(id) +=
value + deltaMean(id) * deltaMean(id) * nnz(id) * other.nnz(id) / (nnz(id)+other.nnz(id))
var i = 0
while(i < currM2n.size) {
(nnz(i), other.nnz(i)) match {
case (0.0, 0.0) =>
case _ => currM2n(i) +=
other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i))
}
i += 1
}

other.currMax.activeIterator.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,54 +38,59 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
Vectors.dense(7.0, 8.0, 9.0)
)

val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
for (i <- 0 until 100) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
val sparseData = ArrayBuffer(Vectors.sparse(3, Seq((0, 1.0))))
for (i <- 0 until 97) sparseData += Vectors.sparse(3, Seq((2, 0.0)))
sparseData += Vectors.sparse(3, Seq((0, 5.0)))
sparseData += Vectors.sparse(3, Seq((1, 5.0)))

test("full-statistics") {
test("dense statistical summary") {
val data = sc.parallelize(localData, 2)
val (summary, denseTime) =
time(data.summarizeStatistics())
val summary = data.summarizeStatistics()

assert(equivVector(summary.mean, Vectors.dense(4.0, 5.0, 6.0)),
"Column mean do not match.")
"Dense column mean do not match.")

assert(equivVector(summary.variance, Vectors.dense(6.0, 6.0, 6.0)),
"Column variance do not match.")
"Dense column variance do not match.")

assert(summary.totalCount === 3, "Column cnt do not match.")
assert(summary.totalCount === 3, "Dense column cnt do not match.")

assert(equivVector(summary.numNonZeros, Vectors.dense(3.0, 3.0, 3.0)),
"Column nnz do not match.")
"Dense column nnz do not match.")

assert(equivVector(summary.max, Vectors.dense(7.0, 8.0, 9.0)),
"Column max do not match.")
"Dense column max do not match.")

assert(equivVector(summary.min, Vectors.dense(1.0, 2.0, 3.0)),
"Column min do not match.")
"Dense column min do not match.")
}

test("sparse statistical summary") {
val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
val (_, sparseTime) = time(dataForSparse.summarizeStatistics())
val summary = dataForSparse.summarizeStatistics()

assert(equivVector(summary.mean, Vectors.dense(0.06, 0.05, 0.0)),
"Sparse column mean do not match.")

assert(equivVector(summary.variance, Vectors.dense(0.2564, 0.2475, 0.0)),
"Sparse column variance do not match.")

assert(summary.totalCount === 100, "Sparse column cnt do not match.")

assert(equivVector(summary.numNonZeros, Vectors.dense(2.0, 1.0, 0.0)),
"Sparse column nnz do not match.")

println(s"dense time is $denseTime, sparse time is $sparseTime.")
assert(equivVector(summary.max, Vectors.dense(5.0, 5.0, 0.0)),
"Sparse column max do not match.")

assert(equivVector(summary.min, Vectors.dense(0.0, 0.0, 0.0)),
"Sparse column min do not match.")
}
}

object VectorRDDFunctionsSuite {
def time[R](block: => R): (R, Double) = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
(result, (t1 - t0).toDouble / 1.0e9)
}

def equivVector(lhs: Vector, rhs: Vector): Boolean = {
(lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-9
}

def relativeTime(lhs: Double, rhs: Double): Boolean = {
val denominator = math.max(lhs, rhs)
math.abs(lhs - rhs) / denominator < 0.3
}
}

0 comments on commit dc77e38

Please sign in to comment.