diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 23b54e51f5b7a..49419f7654e67 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -25,13 +25,10 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: - * Represents eigenvalue decomposition factors. + * Compute eigen-decomposition. */ @Experimental -case class EigenValueDecomposition[VType](s: Vector, V: VType) - -@Experimental -object EigenValueDecomposition { +private[mllib] object EigenValueDecomposition { /** * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. * The caller needs to ensure that the input matrix is real symmetric. This function requires @@ -41,6 +38,7 @@ object EigenValueDecomposition { * @param n dimension of the square matrix (maximum Int.MaxValue). * @param k number of leading eigenvalues required, 0 < k < n. * @param tol tolerance of the eigs computation. + * @param maxIterations the maximum number of Arnoldi update iterations. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors * (columns of the matrix). * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not @@ -48,8 +46,12 @@ object EigenValueDecomposition { * for more details). The maximum number of Arnoldi update iterations is set to 300 in this * function. */ - private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) - : (BDV[Double], BDM[Double]) = { + private[mllib] def symmetricEigs( + mul: DenseVector => DenseVector, + n: Int, + k: Int, + tol: Double, + maxIterations: Int): (BDV[Double], BDM[Double]) = { // TODO: remove this function and use eigs in breeze when switching breeze version require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") @@ -73,7 +75,7 @@ object EigenValueDecomposition { // use exact shift in each iteration iparam(0) = 1 // maximum number of Arnoldi update iterations, or the actual number of iterations on output - iparam(2) = 300 + iparam(2) = maxIterations // Mode 1: A*x = lambda*x, A symmetric iparam(6) = 1 @@ -132,8 +134,8 @@ object EigenValueDecomposition { // number of computed eigenvalues, might be smaller than k val computed = iparam(4) - val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { - r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r => + (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) } // sort the eigen-pairs in descending order @@ -141,15 +143,16 @@ object EigenValueDecomposition { // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.foreach { - r => { + sortedEigenPairs.zipWithIndex.foreach { r => { val b = r._2 * n - for (i <- 0 until n) { + var i = 0 + while (i < n) { sortedU.data(b + i) = r._1._2(i) + i += 1 } } } - (BDV(sortedEigenPairs.map(_._1)), sortedU) + (BDV[Double](sortedEigenPairs.map(_._1)), sortedU) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index fbe822c53e1ce..1c4ae0b0ccafd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -218,6 +218,9 @@ class RowMatrix( rBrz match { case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) + case _ => + throw new UnsupportedOperationException( + s"Do not support vector operation from type ${rBrz.getClass.getName}.") } U }, @@ -247,43 +250,70 @@ class RowMatrix( } /** - * Computes the singular value decomposition of this matrix, using default tolerance (1e-9). + * Computes singular value decomposition of this matrix using dense implementation. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', + * where S contains the leading singular values, U and V contain the corresponding singular + * vectors. + * + * This approach requires `n^2` doubles to fit in memory and `O(n^3)` time on the master node. + * Further, n should be less than m. For problems with small n (e.g. n < 100 and n << m), the + * dense implementation might be faster than the sparse implementation. + * + * At most k largest non-zero singular values and associated vectors are returned. + * If there are k such values, then the dimensions of the return will be: + * + * U is a RowMatrix of size m x k that satisfies U'U = eye(k), + * s is a Vector of size k, holding the singular values in descending order, + * and V is a Matrix of size n x k that satisfies V'V = eye(k). * - * @param k number of singular values to keep. We might return less than k if there are - * numerically zero singular values. See rCond. - * @param computeU whether to compute U + * @param k number of leading singular values to keep (0 < k <= n). We might return less than + * k if there are numerically zero singular values. See rCond. + * @param computeU whether to compute U. * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, - computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { - if (numCols() < 100) { - computeSVD(k, computeU, rCond, 1e-9, true) - } else { - computeSVD(k, computeU, rCond, 1e-9, false) - } + computeU: Boolean, + rCond: Double): SingularValueDecomposition[RowMatrix, Matrix] = { + val n = numCols().toInt + require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + val G = computeGramianMatrix() + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = + brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquaresFull, uFull) + } + + /** + * Computes singular value decomposition of this matrix using dense implementation with default + * reciprocal condition number (1e-9). See computeSVD for more details. + * + * @param k number of leading singular values to keep (0 < k <= n). We might return less than + * k if there are numerically zero singular values. + * @param computeU whether to compute U. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { + computeSVD(k, computeU, 1e-9) } /** - * Computes the singular value decomposition of this matrix. + * Computes singular value decomposition of this matrix using sparse implementation. * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', * where S contains the leading singular values, U and V contain the corresponding singular * vectors. * - * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master - * node. Further, n should be less than m. - * * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). * Note that this approach requires approximately `O(k * nnz(A))` time. * - * ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a - * non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and - * `O(n^3)` time on the master node. + * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master + * node. Further, n should be less than m, and ARPACK requires k to be strictly less than n. If + * the requested k = n, please use the dense implementation computeSVD. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -292,46 +322,69 @@ class RowMatrix( * s is a Vector of size k, holding the singular values in descending order, * and V is a Matrix of size n x k that satisfies V'V = eye(k). * - * @param k number of singular values to keep. We might return less than k if there are - * numerically zero singular values. See rCond. - * @param computeU whether to compute U + * @param k number of leading singular values to keep (0 < k < n). We might return less than + * k if there are numerically zero singular values. See rCond. + * @param computeU whether to compute U. * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations, + * @param tol the numerical tolerance of SVD computation. Larger tolerance means fewer iterations, * but less accurate result. - * @param isDenseSVD invoke dense SVD implementation when isDenseSVD = true. This requires - * `O(n^2)` memory and `O(n^3)` time. For a skinny matrix (m >> n) with small n, - * dense implementation might be faster. + * @param maxIterations the maximum number of Arnoldi update iterations. * @return SingularValueDecomposition(U, s, V) */ - def computeSVD( + def computeSparseSVD( k: Int, computeU: Boolean, rCond: Double, tol: Double, - isDenseSVD: Boolean): SingularValueDecomposition[RowMatrix, Matrix] = { + maxIterations: Int): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + require(k > 0 && k < n, s"Request up to n - 1 singular values k=$k n=$n. " + + s"For full SVD (i.e. k = n), please use dense implementation computeSVD.") + val (sigmaSquares: BDV[Double], u: BDM[Double]) = + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIterations) + computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) + } - val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (!isDenseSVD && k < n) { - EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol) - } else { - if (!isDenseSVD && k == n) { - logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than " + - s"n. Using non-sparse implementation.") - } - val G = computeGramianMatrix() - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) - (sigmaSquaresFull, uFull) - } + /** + * Computes singular value decomposition of this matrix using sparse implementation with default + * reciprocal condition number (1e-9), tolerance (1e-10), and maximum number of Arnoldi iterations + * (300). See computeSparseSVD for more details. + * + * @param k number of leading singular values to keep (0 < k < n). We might return less than + * k if there are numerically zero singular values. + * @param computeU whether to compute U. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSparseSVD( + k: Int, + computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { + computeSparseSVD(k, computeU, 1e-9, 1e-10, 300) + } + + /** + * Determine effective rank of SVD result and compute left singular vectors if required. + */ + private def computeSVDEffectiveRank( + k: Int, + n: Int, + computeU: Boolean, + rCond: Double, + sigmaSquares: BDV[Double], + u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = { val sigmas: BDV[Double] = brzSqrt(sigmaSquares) // Determine effective rank. val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - while (i < k && sigmas(i) >= threshold) { + // sigmas might have a length smaller than k, if some Ritz values do not satisfy the + // convergence criterion specified by tol after maxIterations. + // Thus use i < min(k, sigmas.length) instead of i < k + if (sigmas.length < k) { + logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") + } + while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) { i += 1 } val sk = i diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index e2ff423ca7f79..23466f2fff1a0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -101,7 +101,13 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { - val svd = mat.computeSVD(k, true, 1e-9, 1e-9, denseSVD) + val svd = if (k < n) { + if (denseSVD) mat.computeSVD(k, computeU = true) + else mat.computeSparseSVD(k, computeU = true) + } else { + // when k = n, always use dense SVD + mat.computeSVD(k, computeU = true) + } val U = svd.U val s = svd.s val V = svd.V @@ -114,7 +120,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val svdWithoutU = mat.computeSVD(n - 1, false, 1e-9, 1e-9, denseSVD) + val svdWithoutU = if (denseSVD) mat.computeSVD(n - 1, computeU = false) + else mat.computeSparseSVD(n - 1, computeU = false) assert(svdWithoutU.U === null) } } @@ -124,7 +131,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { for (denseSVD <- Seq(true, false)) { val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) val mat = new RowMatrix(rows, 4, 3) - val svd = mat.computeSVD(2, true, 1e-9, 1e-9, denseSVD) + val svd = if (denseSVD) mat.computeSVD(2, computeU = true) + else mat.computeSparseSVD(2, computeU = true) assert(svd.s.size === 1, "should not return zero singular values") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1)