From 3971c931d18dfaea0ea66e0cbb19b61dbf310a66 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 19 Dec 2014 01:44:31 -0800 Subject: [PATCH] [SPARK-4409] Third pass of code review --- .../apache/spark/mllib/linalg/Matrices.scala | 236 +++++++++++------- .../spark/mllib/linalg/JavaMatricesSuite.java | 19 ++ .../spark/mllib/linalg/MatricesSuite.scala | 31 ++- 3 files changed, 189 insertions(+), 97 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index fc2f2fd9ca92b..9d2611e0ed8e9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.{ArrayBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -150,31 +150,35 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) /** Generate a `SparseMatrix` from the given `DenseMatrix`. */ def toSparse(): SparseMatrix = { - val sparseA: ArrayBuffer[Double] = new ArrayBuffer() - val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) - val sRows: ArrayBuffer[Int] = new ArrayBuffer() - var i = 0 + val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble + val colPtrs: Array[Int] = new Array[Int](numCols + 1) + val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt var nnz = 0 var lastCol = -1 - values.foreach { v => - val r = i % numRows - val c = (i - r) / numRows - if (v != 0.0) { - sRows.append(r) - sparseA.append(v) - while (c != lastCol) { - sCols.append(nnz) - lastCol += 1 + var j = 0 + while (j < numCols) { + var i = 0 + val indStart = j * numRows + while (i < numRows) { + val v = values(indStart + i) + if (v != 0.0) { + rowIndices += i + spVals += v + while (j != lastCol) { + colPtrs(lastCol + 1) = nnz + lastCol += 1 + } + nnz += 1 } - nnz += 1 + i += 1 } - i += 1 + j += 1 } while (numCols > lastCol) { - sCols.append(sparseA.length) + colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) } } @@ -358,35 +362,30 @@ object SparseMatrix { /** * Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of - * (row, column, value) tuples. Array must be sorted first by *column* index and then by row - * index. + * (row, column, value) tuples. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @param entries Array of ((row, column), value) tuples + * @param entries Array of (row, column, value) tuples * @return The corresponding `SparseMatrix` */ - def fromCOO(numRows: Int, numCols: Int, entries: Array[((Int, Int), Double)]): SparseMatrix = { - val colPtrs = new ArrayBuffer[Int](numCols + 1) - colPtrs.append(0) + def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { + val sortedEntries = entries.sortBy(v => (v._2, v._1)) + val colPtrs = new Array[Int](numCols + 1) var nnz = 0 - var lastCol = 0 - val values = entries.map { case ((i, j), v) => + var lastCol = -1 + val values = sortedEntries.map { case (i, j, v) => while (j != lastCol) { - colPtrs.append(nnz) + colPtrs(lastCol + 1) = nnz lastCol += 1 - if (lastCol > numCols) { - throw new IndexOutOfBoundsException("Please make sure that the entries array is " + - "sorted by COLUMN index first and then by row index.") - } } nnz += 1 v } while (numCols > lastCol) { - colPtrs.append(nnz) + colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, colPtrs.toArray, entries.map(_._1._1), values) + new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values) } /** @@ -411,17 +410,42 @@ object SparseMatrix { val length = math.ceil(numRows * numCols * density).toInt val entries = Map[(Int, Int), Double]() var i = 0 - while (i < length) { - var rowIndex = rng.nextInt(numRows) - var colIndex = rng.nextInt(numCols) - while (entries.contains((rowIndex, colIndex))) { - rowIndex = rng.nextInt(numRows) - colIndex = rng.nextInt(numCols) + // Expected number of iterations is less than 1.5 * length + if (density < 0.34) { + while (i < length) { + var rowIndex = rng.nextInt(numRows) + var colIndex = rng.nextInt(numCols) + while (entries.contains((rowIndex, colIndex))) { + rowIndex = rng.nextInt(numRows) + colIndex = rng.nextInt(numCols) + } + entries += (rowIndex, colIndex) -> method(rng) + i += 1 + } + } else { // selection - rejection method + var j = 0 + val triesPerCol = math.ceil(length * 1.0 / numCols).toInt + val pool = numRows * numCols + // loop over columns so that the sort in fromCOO requires less sorting + while (i < length && j < numCols) { + var k = 0 + val leftFromPool = (numCols - j) * numRows + while (k < triesPerCol) { + if (rng.nextDouble() < 1.0 * (length - i) / (pool - leftFromPool)) { + var rowIndex = rng.nextInt(numRows) + val colIndex = j + while (entries.contains((rowIndex, colIndex))) { + rowIndex = rng.nextInt(numRows) + } + entries += (rowIndex, colIndex) -> method(rng) + i += 1 + } + k += 1 + } + j += 1 } - entries += (rowIndex, colIndex) -> method(rng) - i += 1 } - SparseMatrix.fromCOO(numRows, numCols, entries.toArray.sortBy(v => (v._1._2, v._1._1))) + SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2))) } /** @@ -462,12 +486,12 @@ object SparseMatrix { val n = vector.size vector match { case sVec: SparseVector => - val indices = sVec.indices.map(i => (i, i)) - SparseMatrix.fromCOO(n, n, indices.zip(sVec.values)) + val indices = sVec.indices + SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) case dVec: DenseVector => val values = dVec.values.zipWithIndex val nnzVals = values.filter(v => v._1 != 0.0) - SparseMatrix.fromCOO(n, n, nnzVals.map(v => ((v._2, v._2), v._1))) + SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1))) } } } @@ -613,21 +637,20 @@ object Matrices { * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ def horzcat(matrices: Array[Matrix]): Matrix = { - if (matrices.size == 1) { - return matrices(0) - } else if (matrices.size == 0) { + if (matrices.isEmpty) { return new DenseMatrix(0, 0, Array[Double]()) + } else if (matrices.size == 1) { + return matrices(0) } val numRows = matrices(0).numRows var rowsMatch = true - var hasDense = false var hasSparse = false var numCols = 0 matrices.foreach { mat => if (numRows != mat.numRows) rowsMatch = false mat match { case sparse: SparseMatrix => hasSparse = true - case dense: DenseMatrix => hasDense = true + case dense: DenseMatrix => // empty on purpose case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } @@ -635,36 +658,49 @@ object Matrices { } require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse && hasDense) { + if (!hasSparse) { new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray)) } else { var startCol = 0 - val entries: Array[((Int, Int), Double)] = matrices.flatMap { + val entries: Array[(Int, Int, Double)] = matrices.flatMap { case spMat: SparseMatrix => var j = 0 - var cnt = 0 - val ptr = spMat.colPtrs - val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => - cnt += 1 - if (cnt <= ptr(j + 1)) { - ((i, j + startCol), v) - } else { - while (ptr(j + 1) < cnt) { - j += 1 - } - ((i, j + startCol), v) + val colPtrs = spMat.colPtrs + val rowIndices = spMat.rowIndices + val values = spMat.values + val data = new Array[(Int, Int, Double)](values.length) + val nCols = spMat.numCols + while (j < nCols) { + var idx = colPtrs(j) + while (idx < colPtrs(j + 1)) { + val i = rowIndices(idx) + val v = values(idx) + data(idx) = (i, j + startCol, v) + idx += 1 } + j += 1 } - startCol += spMat.numCols + startCol += nCols data case dnMat: DenseMatrix => - val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) - val data = nnzValues.map { case (v, i) => - val rowIndex = i % dnMat.numRows - val colIndex = i / dnMat.numRows - ((rowIndex, colIndex + startCol), v) + val data = new ArrayBuffer[(Int, Int, Double)]() + var j = 0 + val nCols = dnMat.numCols + val nRows = dnMat.numRows + val values = dnMat.values + while (j < nCols) { + var i = 0 + val indStart = j * nRows + while (i < nRows) { + val v = values(indStart + i) + if (v != 0.0) { + data.append((i, j + startCol, v)) + } + i += 1 + } + j += 1 } - startCol += dnMat.numCols + startCol += nCols data } SparseMatrix.fromCOO(numRows, numCols, entries) @@ -679,14 +715,13 @@ object Matrices { * @return a single `Matrix` composed of the matrices that were vertically concatenated */ def vertcat(matrices: Array[Matrix]): Matrix = { - if (matrices.size == 1) { - return matrices(0) - } else if (matrices.size == 0) { + if (matrices.isEmpty) { return new DenseMatrix(0, 0, Array[Double]()) + } else if (matrices.size == 1) { + return matrices(0) } val numCols = matrices(0).numCols var colsMatch = true - var hasDense = false var hasSparse = false var numRows = 0 var valsLength = 0 @@ -697,7 +732,6 @@ object Matrices { hasSparse = true valsLength += sparse.values.length case dense: DenseMatrix => - hasDense = true valsLength += dense.values.length case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") @@ -707,7 +741,7 @@ object Matrices { } require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse && hasDense) { + if (!hasSparse) { val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => val values = mat.toArray for (j <- 0 until numCols) yield (j, ind, @@ -716,34 +750,46 @@ object Matrices { new DenseMatrix(numRows, numCols, matData.flatMap(_._3)) } else { var startRow = 0 - val entries: Array[((Int, Int), Double)] = matrices.flatMap { + val entries: Array[(Int, Int, Double)] = matrices.flatMap { case spMat: SparseMatrix => var j = 0 - var cnt = 0 - val ptr = spMat.colPtrs - val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => - cnt += 1 - if (cnt <= ptr(j + 1)) { - ((i + startRow, j), v) - } else { - while (ptr(j + 1) < cnt) { - j += 1 - } - ((i + startRow, j), v) + val colPtrs = spMat.colPtrs + val rowIndices = spMat.rowIndices + val values = spMat.values + val data = new Array[(Int, Int, Double)](values.length) + while (j < numCols) { + var idx = colPtrs(j) + while (idx < colPtrs(j + 1)) { + val i = rowIndices(idx) + val v = values(idx) + data(idx) = (i + startRow, j, v) + idx += 1 } + j += 1 } startRow += spMat.numRows data case dnMat: DenseMatrix => - val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) - val data = nnzValues.map { case (v, i) => - val rowIndex = i % dnMat.numRows - val colIndex = i / dnMat.numRows - ((rowIndex + startRow, colIndex), v) + val data = new ArrayBuffer[(Int, Int, Double)]() + var j = 0 + val nCols = dnMat.numCols + val nRows = dnMat.numRows + val values = dnMat.values + while (j < nCols) { + var i = 0 + val indStart = j * nRows + while (i < nRows) { + val v = values(indStart + i) + if (v != 0.0) { + data.append((i + startRow, j, v)) + } + i += 1 + } + j += 1 } - startRow += dnMat.numRows + startRow += nRows data - }.sortBy(d => (d._1._2, d._1._1)) + } SparseMatrix.fromCOO(numRows, numCols, entries) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java index df811933a080b..704d484d0b585 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java @@ -99,6 +99,25 @@ public void zerosMatrixConstruction() { assertArrayEquals(done.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0); } + @Test + public void sparseDenseConversion() { + int m = 3; + int n = 2; + double[] values = new double[]{1.0, 2.0, 4.0, 5.0}; + double[] allValues = new double[]{1.0, 2.0, 0.0, 0.0, 4.0, 5.0}; + int[] colPtrs = new int[]{0, 2, 4}; + int[] rowIndices = new int[]{0, 1, 1, 2}; + + SparseMatrix spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values); + DenseMatrix deMat1 = new DenseMatrix(m, n, allValues); + + SparseMatrix spMat2 = deMat1.toSparse(); + DenseMatrix deMat2 = spMat1.toDense(); + + assertArrayEquals(spMat1.toArray(), spMat2.toArray(), 0.0); + assertArrayEquals(deMat1.toArray(), deMat2.toArray(), 0.0); + } + @Test public void concatenateMatrices() { int m = 3; diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index f5933ddcfca2e..bdb0f9e34bfd9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -54,8 +54,8 @@ class MatricesSuite extends FunSuite { assert(mat.colPtrs.eq(colPtrs), "should not copy data") assert(mat.rowIndices.eq(rowIndices), "should not copy data") - val entries: Array[((Int, Int), Double)] = Array(((1, 0), 1.0), ((2, 0), 2.0), - ((1, 2), 4.0), ((2, 2), 5.0)) + val entries: Array[(Int, Int, Double)] = Array((1, 0, 1.0), (2, 0, 2.0), + (1, 2, 4.0), (2, 2, 5.0)) val mat2 = SparseMatrix.fromCOO(m, n, entries) assert(mat.toBreeze === mat2.toBreeze) @@ -123,6 +123,24 @@ class MatricesSuite extends FunSuite { assert(sparseMat.values(2) === 10.0) } + test("toSparse, toDense") { + val m = 3 + val n = 2 + val values = Array(1.0, 2.0, 4.0, 5.0) + val allValues = Array(1.0, 2.0, 0.0, 0.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(0, 1, 1, 2) + + val spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values) + val deMat1 = new DenseMatrix(m, n, allValues) + + val spMat2 = deMat1.toSparse() + val deMat2 = spMat1.toDense() + + assert(spMat1.toBreeze === spMat2.toBreeze) + assert(deMat1.toBreeze === deMat2.toBreeze) + } + test("map, update") { val m = 3 val n = 2 @@ -162,6 +180,8 @@ class MatricesSuite extends FunSuite { val spHorz3 = Matrices.horzcat(Array(deMat1, spMat2)) val deHorz1 = Matrices.horzcat(Array(deMat1, deMat2)) + val deHorz2 = Matrices.horzcat(Array[Matrix]()) + assert(deHorz1.numRows === 3) assert(spHorz2.numRows === 3) assert(spHorz3.numRows === 3) @@ -170,6 +190,9 @@ class MatricesSuite extends FunSuite { assert(spHorz2.numCols === 5) assert(spHorz3.numCols === 5) assert(spHorz.numCols === 5) + assert(deHorz2.numRows === 0) + assert(deHorz2.numCols === 0) + assert(deHorz2.toArray.length === 0) assert(deHorz1.toBreeze.toDenseMatrix === spHorz2.toBreeze.toDenseMatrix) assert(spHorz2.toBreeze === spHorz3.toBreeze) @@ -200,6 +223,7 @@ class MatricesSuite extends FunSuite { val deVert1 = Matrices.vertcat(Array(deMat1, deMat3)) val spVert2 = Matrices.vertcat(Array(spMat1, deMat3)) val spVert3 = Matrices.vertcat(Array(deMat1, spMat3)) + val deVert2 = Matrices.vertcat(Array[Matrix]()) assert(deVert1.numRows === 5) assert(spVert2.numRows === 5) @@ -209,6 +233,9 @@ class MatricesSuite extends FunSuite { assert(spVert2.numCols === 2) assert(spVert3.numCols === 2) assert(spVert.numCols === 2) + assert(deVert2.numRows === 0) + assert(deVert2.numCols === 0) + assert(deVert2.toArray.length === 0) assert(deVert1.toBreeze.toDenseMatrix === spVert2.toBreeze.toDenseMatrix) assert(spVert2.toBreeze === spVert3.toBreeze)