From 10a63a6e6b583e6e79ced58ea9b73937656f5a24 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 20 Dec 2014 00:43:52 -0800 Subject: [PATCH] [SPARK-4409] Fourth pass of code review --- .../apache/spark/mllib/linalg/Matrices.scala | 145 ++++++++++-------- .../spark/mllib/linalg/MatricesSuite.scala | 14 +- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index e4f90f3171b0b..4a6cfde0634cf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map => MutableMap} +import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHashSet, ArrayBuffer} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -150,11 +150,10 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) /** Generate a `SparseMatrix` from the given `DenseMatrix`. */ def toSparse(): SparseMatrix = { - val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble + val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble val colPtrs: Array[Int] = new Array[Int](numCols + 1) - val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt + val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt var nnz = 0 - var lastCol = -1 var j = 0 while (j < numCols) { var i = 0 @@ -164,19 +163,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) if (v != 0.0) { rowIndices += i spVals += v - while (j != lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 - } nnz += 1 } i += 1 } j += 1 - } - while (numCols > lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 + colPtrs(j) = nnz } new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) } @@ -362,10 +354,11 @@ object SparseMatrix { /** * Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of - * (row, column, value) tuples. + * (i, j, value) tuples. Entries that have duplicate values of i and j are + * added together. Tuples where value is equal to zero will be omitted. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @param entries Array of (row, column, value) tuples + * @param entries Array of (i, j, value) tuples * @return The corresponding `SparseMatrix` */ def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { @@ -373,19 +366,42 @@ object SparseMatrix { val colPtrs = new Array[Int](numCols + 1) var nnz = 0 var lastCol = -1 - val values = sortedEntries.map { case (i, j, v) => - while (j != lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 + var lastIndex = -1 + sortedEntries.foreach { case (i, j, v) => + require(i >= 0 && j >= 0, "Negative indices given. Please make sure all indices are " + + s"greater than or equal to zero. i: $i, j: $j, value: $v") + if (v != 0.0) { + while (j != lastCol) { + colPtrs(lastCol + 1) = nnz + lastCol += 1 + } + val index = j * numRows + i + if (lastIndex != index) { + nnz += 1 + lastIndex = index + } } - nnz += 1 - v } while (numCols > lastCol) { colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values) + val values = new Array[Double](nnz) + val rowIndices = new Array[Int](nnz) + lastIndex = -1 + var cnt = -1 + sortedEntries.foreach { case (i, j, v) => + if (v != 0.0) { + val index = j * numRows + i + if (lastIndex != index) { + cnt += 1 + lastIndex = index + } + values(cnt) += v + rowIndices(cnt) = i + } + } + new SparseMatrix(numRows, numCols, colPtrs.toArray, rowIndices, values) } /** @@ -397,57 +413,54 @@ object SparseMatrix { new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) } - /** Generates a `SparseMatrix` with a given random number generator and `method`, which - * specifies the distribution. */ + /** Generates the skeleton of a random `SparseMatrix` with a given random number generator. */ private def genRandMatrix( numRows: Int, numCols: Int, density: Double, - rng: Random, - method: Random => Double): SparseMatrix = { + rng: Random): SparseMatrix = { require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + s"0.0 <= d <= 1.0. Currently, density: $density") val length = math.ceil(numRows * numCols * density).toInt - val entries = MutableMap[(Int, Int), Double]() var i = 0 if (density == 0.0) { return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]()) } else if (density == 1.0) { + val rowIndices = Array.tabulate(numCols, numRows)((j, i) => i).flatten return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray, - (0 until numRows * numCols).toArray, Array.fill(numRows * numCols)(method(rng))) + rowIndices, new Array[Double](numRows * numCols)) } - // Expected number of iterations is less than 1.5 * length - if (density < 0.34) { - while (i < length) { - var rowIndex = rng.nextInt(numRows) - var colIndex = rng.nextInt(numCols) - while (entries.contains((rowIndex, colIndex))) { - rowIndex = rng.nextInt(numRows) - colIndex = rng.nextInt(numCols) - } - entries += (rowIndex, colIndex) -> method(rng) - i += 1 + if (density < 0.34) { // Expected number of iterations is less than 1.5 * length + val entries = MHashSet[(Int, Int)]() + while (entries.size < length) { + entries += ((rng.nextInt(numRows), rng.nextInt(numCols))) } + val entryList = entries.toArray.map(v => (v._1, v._2, 1.0)) + SparseMatrix.fromCOO(numRows, numCols, entryList) } else { // selection - rejection method var j = 0 val pool = numRows * numCols - // loop over columns so that the sort in fromCOO requires less sorting + val rowIndexBuilder = new MArrayBuilder.ofInt + val colPtrs = new Array[Int](numCols + 1) while (i < length && j < numCols) { var passedInPool = j * numRows var r = 0 while (i < length && r < numRows) { if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) { - entries += (r, j) -> method(rng) + rowIndexBuilder += r i += 1 } r += 1 passedInPool += 1 } j += 1 + colPtrs(j) = i } + val rowIndices = rowIndexBuilder.result() + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](rowIndices.size)) } - SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2))) + } /** @@ -461,8 +474,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) */ def sprand(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - def method(rand: Random): Double = rand.nextDouble() - genRandMatrix(numRows, numCols, density, rng, method) + val mat = genRandMatrix(numRows, numCols, density, rng) + mat.update(i => rng.nextDouble()) } /** @@ -474,8 +487,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) */ def sprandn(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - def method(rand: Random): Double = rand.nextGaussian() - genRandMatrix(numRows, numCols, density, rng, method) + val mat = genRandMatrix(numRows, numCols, density, rng) + mat.update(i => rng.nextGaussian()) } /** @@ -488,11 +501,10 @@ object SparseMatrix { val n = vector.size vector match { case sVec: SparseVector => - val indices = sVec.indices - SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) + SparseMatrix.fromCOO(n, n, sVec.indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) case dVec: DenseVector => - val values = dVec.values.zipWithIndex - val nnzVals = values.filter(v => v._1 != 0.0) + val entries = dVec.values.zipWithIndex + val nnzVals = entries.filter(v => v._1 != 0.0) SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1))) } } @@ -645,11 +657,11 @@ object Matrices { return matrices(0) } val numRows = matrices(0).numRows - var rowsMatch = true var hasSparse = false var numCols = 0 matrices.foreach { mat => - if (numRows != mat.numRows) rowsMatch = false + require(numRows == mat.numRows, "The number of rows of the matrices in this sequence, " + + "don't match!") mat match { case sparse: SparseMatrix => hasSparse = true case dense: DenseMatrix => // empty on purpose @@ -658,8 +670,6 @@ object Matrices { } numCols += mat.numCols } - require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse) { new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray)) } else { @@ -723,12 +733,12 @@ object Matrices { return matrices(0) } val numCols = matrices(0).numCols - var colsMatch = true var hasSparse = false var numRows = 0 var valsLength = 0 matrices.foreach { mat => - if (numCols != mat.numCols) colsMatch = false + require(numCols == mat.numCols, "The number of rows of the matrices in this sequence, " + + "don't match!") mat match { case sparse: SparseMatrix => hasSparse = true @@ -741,15 +751,26 @@ object Matrices { numRows += mat.numRows } - require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse) { - val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => + val allValues = new Array[Double](numRows * numCols) + var startRow = 0 + matrices.foreach { mat => + var j = 0 + val nRows = mat.numRows val values = mat.toArray - for (j <- 0 until numCols) yield (j, ind, - values.slice(j * mat.numRows, (j + 1) * mat.numRows)) - }.sortBy(x => (x._1, x._2)) - new DenseMatrix(numRows, numCols, matData.flatMap(_._3)) + while (j < numCols) { + var i = 0 + val indStart = j * numRows + startRow + val subMatStart = j * nRows + while (i < nRows) { + allValues(indStart + i) = values(subMatStart + i) + i += 1 + } + j += 1 + } + startRow += nRows + } + new DenseMatrix(numRows, numCols, allValues) } else { var startRow = 0 val entries: Array[(Int, Int, Double)] = matrices.flatMap { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index bdb0f9e34bfd9..a35d0fe389fdd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -54,11 +54,12 @@ class MatricesSuite extends FunSuite { assert(mat.colPtrs.eq(colPtrs), "should not copy data") assert(mat.rowIndices.eq(rowIndices), "should not copy data") - val entries: Array[(Int, Int, Double)] = Array((1, 0, 1.0), (2, 0, 2.0), - (1, 2, 4.0), (2, 2, 5.0)) + val entries: Array[(Int, Int, Double)] = Array((2, 2, 3.0), (1, 0, 1.0), (2, 0, 2.0), + (1, 2, 2.0), (2, 2, 2.0), (1, 2, 2.0), (0, 0, 0.0)) val mat2 = SparseMatrix.fromCOO(m, n, entries) assert(mat.toBreeze === mat2.toBreeze) + assert(mat2.values.length == 4) } test("sparse matrix construction with wrong number of elements") { @@ -308,12 +309,15 @@ class MatricesSuite extends FunSuite { test("sprand") { val rng = mock[Random] when(rng.nextInt(4)).thenReturn(0, 1, 1, 3, 2, 2, 0, 1, 3, 0) - when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0) + when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0) val mat = SparseMatrix.sprand(4, 4, 0.25, rng) assert(mat.numRows === 4) assert(mat.numCols === 4) assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) - assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0)) + val mat2 = SparseMatrix.sprand(2, 3, 1.0, rng) + assert(mat2.rowIndices.toSeq === Seq(0, 1, 0, 1, 0, 1)) + assert(mat2.colPtrs.toSeq === Seq(0, 2, 4, 6)) } test("sprandn") { @@ -324,6 +328,6 @@ class MatricesSuite extends FunSuite { assert(mat.numRows === 4) assert(mat.numCols === 4) assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) - assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0)) } }