diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index f340f6feae30e..c6d5fe5bc678c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -80,13 +80,11 @@ sealed trait Vector extends Serializable { /** * Applies a function `f` to all the active elements of dense and sparse vector. * - * @param f the function takes (Int, Double) as input where the first element - * in the tuple is the index, and the second element is the corresponding value. - * @param skippingZeros if true, skipping zero elements explicitly. It will be useful when - * iterating through dense vector which has lots of zero elements to be - * skipped. Default is false. + * @param f the function takes two parameters where the first parameter is the index of + * the vector with type `Int`, and the second parameter is the corresponding value + * with type `Double`. */ - private[spark] def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) + private[spark] def foreachActive(f: (Int, Double) => Unit) } /** @@ -285,23 +283,14 @@ class DenseVector(val values: Array[Double]) extends Vector { new DenseVector(values.clone()) } - private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + private[spark] override def foreachActive(f: (Int, Double) => Unit) = { var i = 0 val localValuesSize = values.size val localValues = values - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(i, localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { - f(i, localValues(i)) - i += 1 - } + while (i < localValuesSize) { + f(i, localValues(i)) + i += 1 } } } @@ -341,24 +330,15 @@ class SparseVector( private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) - private[spark] override def foreach(skippingZeros: Boolean = false)(f: ((Int, Double)) => Unit) { + private[spark] override def foreachActive(f: (Int, Double) => Unit) = { var i = 0 val localValuesSize = values.size val localIndices = indices val localValues = values - if (skippingZeros) { - while (i < localValuesSize) { - if (localValues(i) != 0.0) { - f(localIndices(i), localValues(i)) - } - i += 1 - } - } else { - while (i < localValuesSize) { - f(localIndices(i), localValues(i)) - i += 1 - } + while (i < localValuesSize) { + f(localIndices(i), localValues(i)) + i += 1 } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 417564b597914..198b66a005962 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -17,10 +17,8 @@ package org.apache.spark.mllib.stat -import breeze.linalg.{DenseVector => BDV} - import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector} +import org.apache.spark.mllib.linalg.{Vectors, Vector} /** * :: DeveloperApi :: @@ -40,35 +38,14 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable { private var n = 0 - private var currMean: BDV[Double] = _ - private var currM2n: BDV[Double] = _ - private var currM2: BDV[Double] = _ - private var currL1: BDV[Double] = _ + private var currMean: Array[Double] = _ + private var currM2n: Array[Double] = _ + private var currM2: Array[Double] = _ + private var currL1: Array[Double] = _ private var totalCnt: Long = 0 - private var nnz: BDV[Double] = _ - private var currMax: BDV[Double] = _ - private var currMin: BDV[Double] = _ - - /** - * Adds input value to position i. - */ - private[this] def add(i: Int, value: Double) = { - if (currMax(i) < value) { - currMax(i) = value - } - if (currMin(i) > value) { - currMin(i) = value - } - - val prevMean = currMean(i) - val diff = value - prevMean - currMean(i) = prevMean + diff / (nnz(i) + 1.0) - currM2n(i) += (value - currMean(i)) * diff - currM2(i) += value * value - currL1(i) += math.abs(value) - - nnz(i) += 1.0 - } + private var nnz: Array[Double] = _ + private var currMax: Array[Double] = _ + private var currMin: Array[Double] = _ /** * Add a new sample to this summarizer, and update the statistical summary. @@ -81,19 +58,37 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S require(sample.size > 0, s"Vector should have dimension larger than zero.") n = sample.size - currMean = BDV.zeros[Double](n) - currM2n = BDV.zeros[Double](n) - currM2 = BDV.zeros[Double](n) - currL1 = BDV.zeros[Double](n) - nnz = BDV.zeros[Double](n) - currMax = BDV.fill(n)(Double.MinValue) - currMin = BDV.fill(n)(Double.MaxValue) + currMean = Array.ofDim[Double](n) + currM2n = Array.ofDim[Double](n) + currM2 = Array.ofDim[Double](n) + currL1 = Array.ofDim[Double](n) + nnz = Array.ofDim[Double](n) + currMax = Array.fill[Double](n)(Double.MinValue) + currMin = Array.fill[Double](n)(Double.MaxValue) } require(n == sample.size, s"Dimensions mismatch when adding new sample." + s" Expecting $n but got ${sample.size}.") - sample.foreach(true)(x => add(x._1, x._2)) + sample.foreachActive((index, value) => { + if(value != 0.0){ + if (currMax(index) < value) { + currMax(index) = value + } + if (currMin(index) > value) { + currMin(index) = value + } + + val prevMean = currMean(index) + val diff = value - prevMean + currMean(index) = prevMean + diff / (nnz(index) + 1.0) + currM2n(index) += (value - currMean(index)) * diff + currM2(index) += value * value + currL1(index) += math.abs(value) + + nnz(index) += 1.0 + } + }) totalCnt += 1 this @@ -135,14 +130,14 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } } else if (totalCnt == 0 && other.totalCnt != 0) { this.n = other.n - this.currMean = other.currMean.copy - this.currM2n = other.currM2n.copy - this.currM2 = other.currM2.copy - this.currL1 = other.currL1.copy + this.currMean = other.currMean.clone + this.currM2n = other.currM2n.clone + this.currM2 = other.currM2.clone + this.currL1 = other.currL1.clone this.totalCnt = other.totalCnt - this.nnz = other.nnz.copy - this.currMax = other.currMax.copy - this.currMin = other.currMin.copy + this.nnz = other.nnz.clone + this.currMax = other.currMax.clone + this.currMin = other.currMin.clone } this } @@ -150,19 +145,19 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S override def mean: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realMean = BDV.zeros[Double](n) + val realMean = Array.ofDim[Double](n) var i = 0 while (i < n) { realMean(i) = currMean(i) * (nnz(i) / totalCnt) i += 1 } - Vectors.fromBreeze(realMean) + Vectors.dense(realMean) } override def variance: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realVariance = BDV.zeros[Double](n) + val realVariance = Array.ofDim[Double](n) val denominator = totalCnt - 1.0 @@ -177,8 +172,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S i += 1 } } - - Vectors.fromBreeze(realVariance) + Vectors.dense(realVariance) } override def count: Long = totalCnt @@ -186,7 +180,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S override def numNonzeros: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - Vectors.fromBreeze(nnz) + Vectors.dense(nnz) } override def max: Vector = { @@ -197,7 +191,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0 i += 1 } - Vectors.fromBreeze(currMax) + Vectors.dense(currMax) } override def min: Vector = { @@ -208,25 +202,25 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0 i += 1 } - Vectors.fromBreeze(currMin) + Vectors.dense(currMin) } override def normL2: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - val realMagnitude = BDV.zeros[Double](n) + val realMagnitude = Array.ofDim[Double](n) var i = 0 while (i < currM2.size) { realMagnitude(i) = math.sqrt(currM2(i)) i += 1 } - - Vectors.fromBreeze(realMagnitude) + Vectors.dense(realMagnitude) } override def normL1: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") - Vectors.fromBreeze(currL1) + + Vectors.dense(currL1) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 81e886b4bd4ee..41b6d7f518e2d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -178,58 +178,19 @@ class VectorsSuite extends FunSuite { val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0) val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0))) - val dvMap0 = scala.collection.mutable.Map[Int, Double]() - dv.foreach() { - case (index: Int, value: Double) => dvMap0.put(index, value) - } - assert(dvMap0.size === 4) - assert(dvMap0.get(0) === Some(0.0)) - assert(dvMap0.get(1) === Some(1.2)) - assert(dvMap0.get(2) === Some(3.1)) - assert(dvMap0.get(3) === Some(0.0)) - - val dvMap1 = scala.collection.mutable.Map[Int, Double]() - dv.foreach(false) { - case (index, value) => dvMap1.put(index, value) - } - assert(dvMap1.size === 4) - assert(dvMap1.get(0) === Some(0.0)) - assert(dvMap1.get(1) === Some(1.2)) - assert(dvMap1.get(2) === Some(3.1)) - assert(dvMap1.get(3) === Some(0.0)) - - val dvMap2 = scala.collection .mutable.Map[Int, Double]() - dv.foreach(true) { - case (index, value) => dvMap2.put(index, value) - } - assert(dvMap2.size === 2) - assert(dvMap2.get(1) === Some(1.2)) - assert(dvMap2.get(2) === Some(3.1)) - - val svMap0 = scala.collection.mutable.Map[Int, Double]() - sv.foreach() { - case (index, value) => svMap0.put(index, value) - } - assert(svMap0.size === 3) - assert(svMap0.get(1) === Some(1.2)) - assert(svMap0.get(2) === Some(3.1)) - assert(svMap0.get(3) === Some(0.0)) - - val svMap1 = scala.collection.mutable.Map[Int, Double]() - sv.foreach(false) { - case (index, value) => svMap1.put(index, value) - } - assert(svMap1.size === 3) - assert(svMap1.get(1) === Some(1.2)) - assert(svMap1.get(2) === Some(3.1)) - assert(svMap1.get(3) === Some(0.0)) - - val svMap2 = scala.collection.mutable.Map[Int, Double]() - sv.foreach(true) { - case (index, value) => svMap2.put(index, value) - } - assert(svMap2.size === 2) - assert(svMap2.get(1) === Some(1.2)) - assert(svMap2.get(2) === Some(3.1)) + val dvMap = scala.collection.mutable.Map[Int, Double]() + dv.foreachActive((index, value) => dvMap.put(index, value)) + assert(dvMap.size === 4) + assert(dvMap.get(0) === Some(0.0)) + assert(dvMap.get(1) === Some(1.2)) + assert(dvMap.get(2) === Some(3.1)) + assert(dvMap.get(3) === Some(0.0)) + + val svMap = scala.collection.mutable.Map[Int, Double]() + sv.foreachActive((index, value) => svMap.put(index, value)) + assert(svMap.size === 3) + assert(svMap.get(1) === Some(1.2)) + assert(svMap.get(2) === Some(3.1)) + assert(svMap.get(3) === Some(0.0)) } }