From 1a173f00bd96dcfbfc11866b1f2ad76a50ffc399 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 15:01:03 -0500 Subject: [PATCH 01/46] Initial files - no tests --- .../apache/spark/mllib/linalg/sparsesvd.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala new file mode 100644 index 0000000000000..384bf9d33f64c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -0,0 +1,68 @@ +import org.apache.spark.SparkContext + +import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +// arguments +val MIN_SVALUE = 0.01 // minimum singular value to recover +val m = 100000 +val n = 10 +// and a 1-indexed spase matrix. + +// TODO: check min svalue +// TODO: check dimensions + +// Load and parse the data file +/*val rawdata = sc.textFile("mllib/data/als/test.data") +val data = rawdata.map { line => + val parts = line.split(',') + ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) +}*/ + +val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) + + +// Compute A^T A, assuming rows are sparse enough to fit in memory +val rows = data.map(entry => + (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() +val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } +}.reduceByKey(_+_) + + +// Constructi jblas A^T A locally +val ata = DoubleMatrix.zeros(n, n) +for(entry <- emits.toArray) { + ata.put(entry._1._1-1, entry._1._2-1, entry._2) +} + +// Since A^T A is small, we can compute its SVD directly +val svd = Singular.sparseSVD(ata) +val V = svd(0) +val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= MIN_SVALUE) + +// threshold s values +if(sigma.isEmpty) { + // TODO: return empty +} + +// prepare V for returning +val retV = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + ((i+1, j+1), V.get(i,j)) }.flatten) + +val retS = sc.makeRDD(sigma) + + +// Compute U as U = A V S^-1 +// turn V S^-1 into an RDD as a sparse matrix and cache it +val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() + +// Multiply A by VS^-1 +val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) +val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) +val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + From 6e740cc90131b29ebc17e32c66ea16727e5dcc9f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 16:12:40 -0500 Subject: [PATCH 02/46] Some documentation --- .../apache/spark/mllib/linalg/sparsesvd.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 384bf9d33f64c..99a1785074daa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -1,7 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + import org.apache.spark.SparkContext import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +/** + * Singular Value Decomposition for Tall and Skinny matrices. + * Given an m x n matrix A, this will compute matrices U, S, V such that + * A = U * S * V' + * + * There is no restriction on m, but we require n doubles to be held in memory. + * Further, n should be less than m. + * + * This is computed by first computing A'A = V S^2 V', + * computing locally on that (since n x n is small), + * from which we recover S and V. + * Then we compute U via easy matrix multiplication + * as U = A * V * S^-1 + * + * Only singular vectors associated with singular values + * greater or equal to MIN_SVALUE are recovered. If there are k + * such values, then the dimensions of the return will be: + * + * S is k x k and diagonal, holding the singular values on diagonal + * U is m x k and satisfies U'U = eye(k,k) + * V is n x k and satisfies V'V = eye(k,k) + * + * All input and output is expected in sparse matrix format, 1-indexed + * as tuples of the form ((i,j),value) all in RDDs + */ + + // arguments val MIN_SVALUE = 0.01 // minimum singular value to recover val m = 100000 From 6c3674cd235558ec09e6b97382bb541b379a3f8f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 17:39:25 -0500 Subject: [PATCH 03/46] Object to hold the svd methods --- .../apache/spark/mllib/linalg/sparsesvd.scala | 132 ++++++++++-------- 1 file changed, 74 insertions(+), 58 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 99a1785074daa..f9b9a04f19b08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -18,6 +18,8 @@ package org.apache.spark.mllib.linalg import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} @@ -49,67 +51,81 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} */ -// arguments -val MIN_SVALUE = 0.01 // minimum singular value to recover -val m = 100000 -val n = 10 -// and a 1-indexed spase matrix. - -// TODO: check min svalue -// TODO: check dimensions - -// Load and parse the data file -/*val rawdata = sc.textFile("mllib/data/als/test.data") -val data = rawdata.map { line => - val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) -}*/ - -val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) - - -// Compute A^T A, assuming rows are sparse enough to fit in memory -val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() -val emits = rows.flatMap{ case (rowind, cols) => - cols.flatMap{ case (colind1, mval1) => - cols.map{ case (colind2, mval2) => - ((colind1, colind2), mval1*mval2) } } -}.reduceByKey(_+_) - - -// Constructi jblas A^T A locally -val ata = DoubleMatrix.zeros(n, n) -for(entry <- emits.toArray) { - ata.put(entry._1._1-1, entry._1._2-1, entry._2) +object SVD { + def sparseSVD( + data: RDD[((Int, Int), Double)], + m: Int, + n: Int, + min_svalue: Double) + : ( RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)]) = + { + val sc = data.sparkContext + + // Compute A^T A, assuming rows are sparse enough to fit in memory + val rows = data.map(entry => + (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } + }.reduceByKey(_+_) + + + // Constructi jblas A^T A locally + val ata = DoubleMatrix.zeros(n, n) + for(entry <- emits.toArray) { + ata.put(entry._1._1-1, entry._1._2-1, entry._2) + } + + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) + + // threshold s values + if(sigma.isEmpty) { + // TODO: return empty + } + + // prepare V for returning + val retV = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + ((i+1, j+1), V.get(i,j)) }.flatten) + + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + + // Compute U as U = A V S^-1 + // turn V S^-1 into an RDD as a sparse matrix and cache it + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() + + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + + (retU, retS, retV) + } + + def main(args: Array[String]) { + if (args.length < 4) { + println("Usage: KMeans []") + System.exit(1) + } + val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) + val runs = if (args.length >= 5) args(4).toInt else 1 + val sc = new SparkContext(master, "KMeans") + val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() + println("Cost: ") + System.exit(0) + //val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) + } } -// Since A^T A is small, we can compute its SVD directly -val svd = Singular.sparseSVD(ata) -val V = svd(0) -val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= MIN_SVALUE) - -// threshold s values -if(sigma.isEmpty) { - // TODO: return empty -} - -// prepare V for returning -val retV = sc.makeRDD( - Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) - -val retS = sc.makeRDD(sigma) -// Compute U as U = A V S^-1 -// turn V S^-1 into an RDD as a sparse matrix and cache it -val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() -// Multiply A by VS^-1 -val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) -val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) -val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) - => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) From 1a21ba29672074692be6c006a8938bfa86330a19 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 18:09:33 -0500 Subject: [PATCH 04/46] new main file --- .../apache/spark/mllib/linalg/sparsesvd.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index f9b9a04f19b08..be8ccff9cc278 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -110,22 +110,31 @@ object SVD { (retU, retS, retV) } + def main(args: Array[String]) { - if (args.length < 4) { - println("Usage: KMeans []") + if (args.length < 8) { + println("Usage: SVD ") System.exit(1) } - val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) - val runs = if (args.length >= 5) args(4).toInt else 1 - val sc = new SparkContext(master, "KMeans") - val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() - println("Cost: ") + val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = + (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) + + val sc = new SparkContext(master, "SVD") + + val rawdata = sc.textFile(inputFile) + val data = rawdata.map { line => + val parts = line.split(',') + ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) + } + + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + println("Computed " + s.size + " singular values and vectors") + u.saveAsText(output_u) + s.saveAsText(output_s) + v.saveAsText(output_v) System.exit(0) - //val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> ((a+1,b+1),a*b%37) }.flatten ) } } - - From fe1a132d403e301a674da6f4af7163df210ec2af Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 18:13:21 -0500 Subject: [PATCH 05/46] Main method added for svd --- .../scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index be8ccff9cc278..7bb393de8b35f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -128,10 +128,10 @@ object SVD { } val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) - println("Computed " + s.size + " singular values and vectors") - u.saveAsText(output_u) - s.saveAsText(output_s) - v.saveAsText(output_v) + println("Computed " + s.toArray.length + " singular values and vectors") + u.saveAsTextFile(output_u) + s.saveAsTextFile(output_s) + v.saveAsTextFile(output_v) System.exit(0) } } From 16de5268e3652498b47b0600fe5cf9cf10d0dd83 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 26 Dec 2013 23:21:57 -0500 Subject: [PATCH 06/46] full rank matrix test added --- .../org/apache/spark/mllib/linalg/sparsesvd.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 7bb393de8b35f..2ce0df1e5db1e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -61,6 +61,14 @@ object SVD { RDD[((Int, Int), Double)], RDD[((Int, Int), Double)]) = { + if (m < n) { + throw new IllegalArgumentException("Expecting a tall and skinny matrix") + } + + if (min_svalue < 1.0e-9) { + throw new IllegalArgumentException("Minimum singular value must be greater than 1e-9") + } + val sc = data.sparkContext // Compute A^T A, assuming rows are sparse enough to fit in memory @@ -86,7 +94,7 @@ object SVD { // threshold s values if(sigma.isEmpty) { - // TODO: return empty + throw new Exception("All singular values are smaller than min_svalue: " + min_svalue) } // prepare V for returning From fa1e8d8cbf916f963e1ea000683a11d83551f870 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 00:34:59 -0500 Subject: [PATCH 07/46] test for truncated svd --- .../apache/spark/mllib/linalg/sparsesvd.scala | 101 +++++++++--------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 2ce0df1e5db1e..a799aa3280355 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -57,65 +57,65 @@ object SVD { m: Int, n: Int, min_svalue: Double) - : ( RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)]) = + : ( RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)]) = { - if (m < n) { + if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } - if (min_svalue < 1.0e-9) { + if (min_svalue < 1.0e-9) { throw new IllegalArgumentException("Minimum singular value must be greater than 1e-9") } - val sc = data.sparkContext + val sc = data.sparkContext - // Compute A^T A, assuming rows are sparse enough to fit in memory - val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() - val emits = rows.flatMap{ case (rowind, cols) => - cols.flatMap{ case (colind1, mval1) => - cols.map{ case (colind2, mval2) => - ((colind1, colind2), mval1*mval2) } } - }.reduceByKey(_+_) + // Compute A^T A, assuming rows are sparse enough to fit in memory + val rows = data.map(entry => + (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + val emits = rows.flatMap{ case (rowind, cols) => + cols.flatMap{ case (colind1, mval1) => + cols.map{ case (colind2, mval2) => + ((colind1, colind2), mval1*mval2) } } + }.reduceByKey(_+_) - // Constructi jblas A^T A locally - val ata = DoubleMatrix.zeros(n, n) - for(entry <- emits.toArray) { - ata.put(entry._1._1-1, entry._1._2-1, entry._2) - } + // Constructi jblas A^T A locally + val ata = DoubleMatrix.zeros(n, n) + for(entry <- emits.toArray) { + ata.put(entry._1._1-1, entry._1._2-1, entry._2) + } - // Since A^T A is small, we can compute its SVD directly - val svd = Singular.sparseSVD(ata) - val V = svd(0) - val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) + // Since A^T A is small, we can compute its SVD directly + val svd = Singular.sparseSVD(ata) + val V = svd(0) + val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) - // threshold s values - if(sigma.isEmpty) { + // threshold s values + if(sigma.isEmpty) { throw new Exception("All singular values are smaller than min_svalue: " + min_svalue) - } + } - // prepare V for returning - val retV = sc.makeRDD( - Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) + // prepare V for returning + val retV = sc.makeRDD( + Array.tabulate(V.rows, sigma.length){ (i,j) => + ((i+1, j+1), V.get(i,j)) }.flatten) - val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) - // Compute U as U = A V S^-1 - // turn V S^-1 into an RDD as a sparse matrix and cache it - val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) + // Compute U as U = A V S^-1 + // turn V S^-1 into an RDD as a sparse matrix and cache it + val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() - // Multiply A by VS^-1 - val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) - val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + // Multiply A by VS^-1 + val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) + val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) + val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) - - (retU, retS, retV) + + (retU, retS, retV) } @@ -125,24 +125,23 @@ object SVD { System.exit(1) } val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = - (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) + (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) val sc = new SparkContext(master, "SVD") - val rawdata = sc.textFile(inputFile) - val data = rawdata.map { line => - val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) - } + val rawdata = sc.textFile(inputFile) + val data = rawdata.map { line => + val parts = line.split(',') + ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) + } - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) println("Computed " + s.toArray.length + " singular values and vectors") - u.saveAsTextFile(output_u) - s.saveAsTextFile(output_s) - v.saveAsTextFile(output_v) + u.saveAsTextFile(output_u) + s.saveAsTextFile(output_s) + v.saveAsTextFile(output_v) System.exit(0) } } - From bdb5037987d472c31f4d7891e60a8997e0e9bdcc Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 00:36:41 -0500 Subject: [PATCH 08/46] add all tests --- .../apache/spark/mllib/linalg/SVDSuite.scala | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala new file mode 100644 index 0000000000000..726650af0ab88 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.jblas._ + +class SVDSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + val EPSILON = 1e-4 + + // Return jblas matrix from sparse matrix RDD + def getDenseMatrix(matrix:RDD[((Int, Int), Double)], m:Int, n:Int) : DoubleMatrix = { + val ret = DoubleMatrix.zeros(m, n) + matrix.toArray.map(x => ret.put(x._1._1-1, x._1._2-1, x._2)) + ret + } + + def assertMatrixEquals(a:DoubleMatrix, b:DoubleMatrix) { + assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch") + val diff = DoubleMatrix.zeros(a.rows, a.columns) + Array.tabulate(a.rows, a.columns){(i,j) => + diff.put(i,j, + Math.min(Math.abs(a.get(i,j)-b.get(i,j)), + Math.abs(a.get(i,j)+b.get(i,j)))) } + assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1) + } + + test("full rank matrix svd") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> + ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) + val min_svalue = 1.0e-8 + + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + + val densea = getDenseMatrix(data, m, n) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u,m,n) + val rets = getDenseMatrix(s,n,n) + val retv = getDenseMatrix(v,n,n) + + // check individual decomposition + assertMatrixEquals(retu, svd(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + } + + test("rank one matrix svd") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> + ((a+1,b+1), 1.0) }.flatten ) + val min_svalue = 1.0e-4 + + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val retrank = s.toArray.length + + assert(retrank == 1, "rank returned not one") + + val densea = getDenseMatrix(data, m, n) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u,m,retrank) + val rets = getDenseMatrix(s,retrank,retrank) + val retv = getDenseMatrix(v,n,retrank) + + // check individual decomposition + assertMatrixEquals(retu, svd(0).getColumn(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixEquals(retv, svd(2).getColumn(0)) + + // check multiplication guarantee + assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) + } + + test("truncated with min singular value") { + val m = 10 + val n = 3 + val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> + ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) + + val min_svalue = 5.0 // only one svalue above this + + val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val retrank = s.toArray.length + + val densea = getDenseMatrix(data, m, n) + val svd = Singular.sparseSVD(densea) + + val retu = getDenseMatrix(u,m,retrank) + val rets = getDenseMatrix(s,retrank,retrank) + val retv = getDenseMatrix(v,n,retrank) + + assert(retrank == 1, "rank returned not one") + + // check individual decomposition + assertMatrixEquals(retu, svd(0).getColumn(0)) + assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) + assertMatrixEquals(retv, svd(2).getColumn(0)) + } +} From 3369c2d48795d831acd841b8ecb67b5a84083883 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 00:41:46 -0500 Subject: [PATCH 09/46] cleanup documentation --- .../main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index a799aa3280355..1c9f67e2653b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -29,7 +29,7 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} * Given an m x n matrix A, this will compute matrices U, S, V such that * A = U * S * V' * - * There is no restriction on m, but we require n doubles to be held in memory. + * There is no restriction on m, but we require n^2 doubles to fit in memory. * Further, n should be less than m. * * This is computed by first computing A'A = V S^2 V', @@ -81,7 +81,7 @@ object SVD { }.reduceByKey(_+_) - // Constructi jblas A^T A locally + // Construct jblas A^T A locally val ata = DoubleMatrix.zeros(n, n) for(entry <- emits.toArray) { ata.put(entry._1._1-1, entry._1._2-1, entry._2) From 642ab5c1e1ba98833265447447702c3c39fb2d40 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 01:51:19 -0500 Subject: [PATCH 10/46] initial large scale testing begin --- ec2/spark_ec2.py | 12 +----------- .../org/apache/spark/mllib/linalg/sparsesvd.scala | 8 ++++---- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a2b0e7e7f4748..5e8b381a4d62f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -113,16 +113,6 @@ def parse_args(): # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') - if home_dir == None or not os.path.isfile(home_dir + '/.boto'): - if not os.path.isfile('/etc/boto.cfg'): - if os.getenv('AWS_ACCESS_KEY_ID') == None: - print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + - "must be set") - sys.exit(1) - if os.getenv('AWS_SECRET_ACCESS_KEY') == None: - print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + - "must be set") - sys.exit(1) return (opts, action, cluster_name) @@ -646,7 +636,7 @@ def get_partition(total, num_partitions, current_partitions): def real_main(): (opts, action, cluster_name) = parse_args() try: - conn = ec2.connect_to_region(opts.region) + conn = ec2.connect_to_region(opts.region,aws_access_key_id="AKIAI2EGAQ7GYNL4LRAA", aws_secret_access_key="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") except Exception as e: print >> stderr, (e) sys.exit(1) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 1c9f67e2653b3..edf715dc196bf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -32,8 +32,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} * There is no restriction on m, but we require n^2 doubles to fit in memory. * Further, n should be less than m. * - * This is computed by first computing A'A = V S^2 V', - * computing locally on that (since n x n is small), + * The decomposition is computed by first computing A'A = V S^2 V', + * computing svd locally on that (since n x n is small), * from which we recover S and V. * Then we compute U via easy matrix multiplication * as U = A * V * S^-1 @@ -43,8 +43,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} * such values, then the dimensions of the return will be: * * S is k x k and diagonal, holding the singular values on diagonal - * U is m x k and satisfies U'U = eye(k,k) - * V is n x k and satisfies V'V = eye(k,k) + * U is m x k and satisfies U'U = eye(k) + * V is n x k and satisfies V'V = eye(k) * * All input and output is expected in sparse matrix format, 1-indexed * as tuples of the form ((i,j),value) all in RDDs From ae5102acc08f9cbe07caba9b95d59f928bc0e16a Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 27 Dec 2013 04:15:13 -0500 Subject: [PATCH 11/46] large scale considerations --- .../main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index edf715dc196bf..0ab05de8726b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -58,8 +58,8 @@ object SVD { n: Int, min_svalue: Double) : ( RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)]) = + RDD[((Int, Int), Double)], + RDD[((Int, Int), Double)]) = { if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") From dd0d3f008b5dd478fdfb6d20c53713ca0c7c2be1 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 19:53:04 -0800 Subject: [PATCH 12/46] New documentation --- docs/mllib-guide.md | 52 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index c1ff9c417c353..8c86369ae6a86 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -210,3 +210,55 @@ at each iteration. Available algorithms for gradient descent: * [GradientDescent](api/mllib/index.html#org.apache.spark.mllib.optimization.GradientDescent) + + + +# Singular Value Decomposition +Singular Value Decomposition for Tall and Skinny matrices. +Given an m x n matrix A, this will compute matrices U, S, V such that +A = U * S * V^T + +There is no restriction on m, but we require n^2 doubles to fit in memory. +Further, n should be less than m. + +The decomposition is computed by first computing A^TA = V S^2 V^T, +computing svd locally on that (since n x n is small), +from which we recover S and V. +Then we compute U via easy matrix multiplication +as U = A * V * S^-1 + +Only singular vectors associated with singular values +greater or equal to MIN_SVALUE are recovered. If there are k +such values, then the dimensions of the return will be: + +S is k x k and diagonal, holding the singular values on diagonal +U is m x k and satisfies U^T*U = eye(k) +V is n x k and satisfies V^TV = eye(k) + +All input and output is expected in sparse matrix format, 1-indexed +as tuples of the form ((i,j),value) all in RDDs + +{% highlight scala %} + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.SVD + +// Load and parse the data file +val data = sc.textFile("mllib/data/als/test.data").map { line => + val parts = line.split(',') + ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) +} +val m = 4 +val n = 4 + +// recover singular vectors for singular values at or above 1e-5 +val (u, s, v) = SVD.sparseSVD(data, m, n, 1e-5) + +println("singular values = " + s.toArray.mkString) + +{% endhighlight %} + + + + + From 185c882606112a49c1d7359cc1de00bd273c3050 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 19:53:14 -0800 Subject: [PATCH 13/46] tweaks to docs --- .../scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 0ab05de8726b3..83b2178c0920d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -65,12 +65,10 @@ object SVD { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } - if (min_svalue < 1.0e-9) { - throw new IllegalArgumentException("Minimum singular value must be greater than 1e-9") + if (min_svalue < 1.0e-8) { + throw new IllegalArgumentException("Minimum singular value requested must be greater than 1e-9") } - val sc = data.sparkContext - // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry => (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() @@ -80,7 +78,6 @@ object SVD { ((colind1, colind2), mval1*mval2) } } }.reduceByKey(_+_) - // Construct jblas A^T A locally val ata = DoubleMatrix.zeros(n, n) for(entry <- emits.toArray) { @@ -97,6 +94,8 @@ object SVD { throw new Exception("All singular values are smaller than min_svalue: " + min_svalue) } + val sc = data.sparkContext + // prepare V for returning val retV = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => From b941b6f7b0131b4382b09740d56916574901fd55 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:01:13 -0800 Subject: [PATCH 14/46] doc tweaks --- docs/mllib-guide.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 8c86369ae6a86..08d6d7485387f 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -231,12 +231,12 @@ Only singular vectors associated with singular values greater or equal to MIN_SVALUE are recovered. If there are k such values, then the dimensions of the return will be: -S is k x k and diagonal, holding the singular values on diagonal -U is m x k and satisfies U^T*U = eye(k) -V is n x k and satisfies V^TV = eye(k) +* *S* is *k x k* and diagonal, holding the singular values on diagonal. +* *U* is *m x k* and satisfies U^T*U = eye(k). +* *V* is *n x k* and satisfies V^TV = eye(k). All input and output is expected in sparse matrix format, 1-indexed -as tuples of the form ((i,j),value) all in RDDs +as tuples of the form ((i,j),value) all in RDDs. Below is example usage. {% highlight scala %} From 97dc527849b836703811acdbd6767685585099df Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:02:37 -0800 Subject: [PATCH 15/46] doc tweak --- docs/mllib-guide.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 08d6d7485387f..8c490eba69f98 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -215,17 +215,18 @@ Available algorithms for gradient descent: # Singular Value Decomposition Singular Value Decomposition for Tall and Skinny matrices. -Given an m x n matrix A, this will compute matrices U, S, V such that -A = U * S * V^T +Given an *m x n* matrix *A*, this will compute matrices *U, S, V* such that + +*A = U * S * V^T* There is no restriction on m, but we require n^2 doubles to fit in memory. Further, n should be less than m. -The decomposition is computed by first computing A^TA = V S^2 V^T, +The decomposition is computed by first computing *A^TA = V S^2 V^T*, computing svd locally on that (since n x n is small), from which we recover S and V. Then we compute U via easy matrix multiplication -as U = A * V * S^-1 +as *U = A * V * S^-1* Only singular vectors associated with singular values greater or equal to MIN_SVALUE are recovered. If there are k From 53ccf65362d935f89fb9e27b4a3485454fa4c882 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:03:47 -0800 Subject: [PATCH 16/46] doc tweaks --- docs/mllib-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 8c490eba69f98..711187fbea5b8 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -215,7 +215,7 @@ Available algorithms for gradient descent: # Singular Value Decomposition Singular Value Decomposition for Tall and Skinny matrices. -Given an *m x n* matrix *A*, this will compute matrices *U, S, V* such that +Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that *A = U * S * V^T* From 0c3797dd15c1323d046e4eae36c2914470c8701e Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:05:03 -0800 Subject: [PATCH 17/46] remove accidental changes to ec2 script --- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5e8b381a4d62f..ac309cc1f412c 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -636,7 +636,7 @@ def get_partition(total, num_partitions, current_partitions): def real_main(): (opts, action, cluster_name) = parse_args() try: - conn = ec2.connect_to_region(opts.region,aws_access_key_id="AKIAI2EGAQ7GYNL4LRAA", aws_secret_access_key="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") + conn = ec2.connect_to_region(opts.region) except Exception as e: print >> stderr, (e) sys.exit(1) From c868d71b0ba122906f9041823ef2442578fa06b9 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:08:01 -0800 Subject: [PATCH 18/46] old version of spark_ec2 --- ec2/spark_ec2.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index ac309cc1f412c..a2b0e7e7f4748 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -113,6 +113,16 @@ def parse_args(): # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') + if home_dir == None or not os.path.isfile(home_dir + '/.boto'): + if not os.path.isfile('/etc/boto.cfg'): + if os.getenv('AWS_ACCESS_KEY_ID') == None: + print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + + "must be set") + sys.exit(1) + if os.getenv('AWS_SECRET_ACCESS_KEY') == None: + print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + + "must be set") + sys.exit(1) return (opts, action, cluster_name) From 915d53f8acb1f7ab14894b1255eb334b0812d9d3 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:20:16 -0800 Subject: [PATCH 19/46] javadoc for sparsesvd --- .../org/apache/spark/mllib/linalg/sparsesvd.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 83b2178c0920d..19173fd26a9da 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} +object SVD { /** * Singular Value Decomposition for Tall and Skinny matrices. * Given an m x n matrix A, this will compute matrices U, S, V such that @@ -48,10 +49,13 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} * * All input and output is expected in sparse matrix format, 1-indexed * as tuples of the form ((i,j),value) all in RDDs + * + * @param data RDD Matrix in sparse 1-index format ((int, int), value) + * @param m number of rows + * @param n number of columns + * @param min_svalue Recover singular values greater or equal to min_svalue + * @return Three sparse matrices: U, S, V such that A = USV^T */ - - -object SVD { def sparseSVD( data: RDD[((Int, Int), Double)], m: Int, From 2612164f85ae3249c78c130fc51427ace33b3580 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 1 Jan 2014 20:22:29 -0800 Subject: [PATCH 20/46] more docs yay --- .../main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 19173fd26a9da..2c82c6b958b5c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -23,7 +23,10 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - +/** + * Top-level methods for calling Singular Value Decomposition + * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)] + */ object SVD { /** * Singular Value Decomposition for Tall and Skinny matrices. From e617ae2dad20950e5358c15fa1290d52ca03a874 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 2 Jan 2014 01:51:38 -0800 Subject: [PATCH 21/46] fix error message --- .../main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala index 2c82c6b958b5c..2198e6a1a2f6a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala @@ -73,7 +73,7 @@ object SVD { } if (min_svalue < 1.0e-8) { - throw new IllegalArgumentException("Minimum singular value requested must be greater than 1e-9") + throw new IllegalArgumentException("Minimum singular value requested is too small") } // Compute A^T A, assuming rows are sparse enough to fit in memory From b059a2a00c1e4a46dacbd63cacbfe0a06f3c61fa Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 3 Jan 2014 21:54:57 -0800 Subject: [PATCH 22/46] New matrix entry file --- .../spark/mllib/linalg/MatrixEntry.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala new file mode 100644 index 0000000000000..c7f2abab9750c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +/** + * Class that represents an entry in a sparse matrix of doubles. + * + * @param i row index (1 indexing used) + * @param j column index (1 indexing used) + * @param mval value of entry in matrix + */ +case class MatrixEntry(val i: Int, val j: Int, val mval: Double) From 6bcdb762a107c82ef095553ab31284623475cb2c Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 3 Jan 2014 21:55:38 -0800 Subject: [PATCH 23/46] rename sparsesvd.scala --- .../org/apache/spark/mllib/linalg/{sparsesvd.scala => SVD.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/linalg/{sparsesvd.scala => SVD.scala} (100%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala similarity index 100% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/sparsesvd.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala From 7f631dd2a9e2467871167da1514be9863485a96f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 3 Jan 2014 22:17:24 -0800 Subject: [PATCH 24/46] start using matrixentry --- .../org/apache/spark/mllib/linalg/SVD.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 2198e6a1a2f6a..08af2c855a604 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -23,6 +23,8 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} +import org.apache.spark.linalg.MatrixEntry + /** * Top-level methods for calling Singular Value Decomposition * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)] @@ -60,13 +62,13 @@ object SVD { * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( - data: RDD[((Int, Int), Double)], + data: RDD[MatrixEntry], m: Int, n: Int, min_svalue: Double) - : ( RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)], - RDD[((Int, Int), Double)]) = + : ( RDD[MatrixEntry], + RDD[MatrixEntry], + RDD[MatrixEntry]) = { if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") @@ -78,7 +80,7 @@ object SVD { // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry => - (entry._1._1, (entry._1._2, entry._2))).groupByKey().cache() + (entry.i, (entry.j, entry.mval))).groupByKey().cache() val emits = rows.flatMap{ case (rowind, cols) => cols.flatMap{ case (colind1, mval1) => cols.map{ case (colind2, mval2) => @@ -106,9 +108,9 @@ object SVD { // prepare V for returning val retV = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => - ((i+1, j+1), V.get(i,j)) }.flatten) + MatrixEntry(i+1, j+1, V.get(i,j)) }.flatten) - val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>((x+1,x+1),sigma(x))}) + val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>MatrixEntry(x+1,x+1,sigma(x))}) // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix and cache it @@ -120,6 +122,7 @@ object SVD { val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) + .map( case (row, col, mval) => MatrixEntry(row, col, mval)) (retU, retS, retV) } @@ -127,11 +130,13 @@ object SVD { def main(args: Array[String]) { if (args.length < 8) { - println("Usage: SVD ") + println("Usage: SVD + ") System.exit(1) } val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = - (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toDouble, args(5), args(6), args(7)) + (args(0), args(1), args(2).toInt, args(3).toInt, + args(4).toDouble, args(5), args(6), args(7)) val sc = new SparkContext(master, "SVD") From d2d5e5e062e8aab5c3f019fbf97ad5e673a3f75f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 00:15:04 -0800 Subject: [PATCH 25/46] new return struct --- .../mllib/linalg/SVDecomposedMatrix.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala new file mode 100644 index 0000000000000..c3ec428c3fac6 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + +import org.apache.spark.linalg.MatrixEntry + +/** + * Class that represents the SV decomposition of a matrix + * + * @param U such that A = USV^T + * @param S such that A = USV^T + * @param V such that A = USV^T + */ +case class SVDecomposedMatrix(val U: RDD[MatrixEntry], + val S: RDD[MatrixEntry], + val V: RDD[MatrixEntry]) From 26a74f0c4131d506384b94a913b8c6e1a30be9a4 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 00:38:53 -0800 Subject: [PATCH 26/46] using decomposed matrix struct now --- .../org/apache/spark/mllib/linalg/SVD.scala | 14 ++++++-------- .../spark/mllib/linalg/SVDecomposedMatrix.scala | 2 -- .../apache/spark/mllib/linalg/SVDSuite.scala | 17 ++++++++++------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 08af2c855a604..ac9178e78c826 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -23,7 +23,6 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} -import org.apache.spark.linalg.MatrixEntry /** * Top-level methods for calling Singular Value Decomposition @@ -66,9 +65,7 @@ object SVD { m: Int, n: Int, min_svalue: Double) - : ( RDD[MatrixEntry], - RDD[MatrixEntry], - RDD[MatrixEntry]) = + : SVDecomposedMatrix = { if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") @@ -118,16 +115,16 @@ object SVD { { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() // Multiply A by VS^-1 - val aCols = data.map(entry => (entry._1._2, (entry._1._1, entry._2))) + val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) - .map( case (row, col, mval) => MatrixEntry(row, col, mval)) + .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} - (retU, retS, retV) + SVDecomposedMatrix(retU, retS, retV) } - +/* def main(args: Array[String]) { if (args.length < 8) { println("Usage: SVD @@ -153,6 +150,7 @@ object SVD { v.saveAsTextFile(output_v) System.exit(0) } +*/ } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala index c3ec428c3fac6..e0bcdab2d2856 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala @@ -19,8 +19,6 @@ package org.apache.spark.mllib.linalg import org.apache.spark.rdd.RDD -import org.apache.spark.linalg.MatrixEntry - /** * Class that represents the SV decomposition of a matrix * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 726650af0ab88..71749ff72934c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -45,9 +45,9 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val EPSILON = 1e-4 // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix:RDD[((Int, Int), Double)], m:Int, n:Int) : DoubleMatrix = { + def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = { val ret = DoubleMatrix.zeros(m, n) - matrix.toArray.map(x => ret.put(x._1._1-1, x._1._2-1, x._2)) + matrix.toArray.map(x => ret.put(x.i-1, x.j-1, x.mval)) ret } @@ -65,11 +65,14 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = 10 val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> - ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) + MatrixEntry(a+1,b+1, (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) val min_svalue = 1.0e-8 - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) - + val decomposed = SVD.sparseSVD(data, m, n, min_svalue) + val u = decomposed.U + val v = decomposed.V + val s = decomposed.S + val densea = getDenseMatrix(data, m, n) val svd = Singular.sparseSVD(densea) @@ -85,7 +88,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { // check multiplication guarantee assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) } - +/* test("rank one matrix svd") { val m = 10 val n = 3 @@ -138,5 +141,5 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assertMatrixEquals(retu, svd(0).getColumn(0)) assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) assertMatrixEquals(retv, svd(2).getColumn(0)) - } + }*/ } From 73daa700bd2acff7ff196c9262dffb2d8b9354bf Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 01:52:28 -0800 Subject: [PATCH 27/46] add k parameter --- docs/mllib-guide.md | 5 ++-- .../org/apache/spark/mllib/linalg/SVD.scala | 24 +++++++++---------- .../apache/spark/mllib/linalg/SVDSuite.scala | 3 +-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 711187fbea5b8..abeb55d081001 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -251,9 +251,10 @@ val data = sc.textFile("mllib/data/als/test.data").map { line => } val m = 4 val n = 4 +val k = 1 -// recover singular vectors for singular values at or above 1e-5 -val (u, s, v) = SVD.sparseSVD(data, m, n, 1e-5) +// recover largest singular vector +val (u, s, v) = SVD.sparseSVD(data, m, n, 1) println("singular values = " + s.toArray.mkString) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index ac9178e78c826..465fc746ed621 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -43,9 +43,8 @@ object SVD { * Then we compute U via easy matrix multiplication * as U = A * V * S^-1 * - * Only singular vectors associated with singular values - * greater or equal to MIN_SVALUE are recovered. If there are k - * such values, then the dimensions of the return will be: + * Only the k largest singular values and associated vectors are found. + * If there are k such values, then the dimensions of the return will be: * * S is k x k and diagonal, holding the singular values on diagonal * U is m x k and satisfies U'U = eye(k) @@ -57,22 +56,22 @@ object SVD { * @param data RDD Matrix in sparse 1-index format ((int, int), value) * @param m number of rows * @param n number of columns - * @param min_svalue Recover singular values greater or equal to min_svalue + * @param k Recover k singular values and vectors * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( data: RDD[MatrixEntry], m: Int, n: Int, - min_svalue: Double) + k: Int) : SVDecomposedMatrix = { if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } - if (min_svalue < 1.0e-8) { - throw new IllegalArgumentException("Minimum singular value requested is too small") + if (k < 1 || k > n) { + throw new IllegalArgumentException("Must request up to n singular values") } // Compute A^T A, assuming rows are sparse enough to fit in memory @@ -93,12 +92,13 @@ object SVD { // Since A^T A is small, we can compute its SVD directly val svd = Singular.sparseSVD(ata) val V = svd(0) - val sigma = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x >= min_svalue) + val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9) - // threshold s values - if(sigma.isEmpty) { - throw new Exception("All singular values are smaller than min_svalue: " + min_svalue) - } + if(sigmas.size < k) { + throw new Exception("Not enough singular values to return") + } + + val sigma = sigmas.take(k) val sc = data.sparkContext diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 71749ff72934c..dc4e9239a2ddc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -66,9 +66,8 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> MatrixEntry(a+1,b+1, (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) - val min_svalue = 1.0e-8 - val decomposed = SVD.sparseSVD(data, m, n, min_svalue) + val decomposed = SVD.sparseSVD(data, m, n, n) val u = decomposed.U val v = decomposed.V val s = decomposed.S From 35adc72794f25223502562f2dc0077f61d91cb79 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 11:30:36 -0800 Subject: [PATCH 28/46] set methods --- .../org/apache/spark/mllib/linalg/SVD.scala | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 465fc746ed621..9703e84312759 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -24,6 +24,50 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +/** + * Class used to obtain singular value decompositions + * @param data Matrix in sparse matrix format + * @param m number of rows + * @param n number of columns + */ +class GradientDescent(var data: RDD[MatrixEntry], var m: Int, var n: Int) { + private var k: Int = 1 + + /** + * Set the number of top-k singular vectors to return + */ + def setK(k: Int): this.type = { + this.k = k + this + } + + /** + * Set matrix to be used for SVD + */ + def setDatadata(data: RDD[MatrixEntry]): this.type = { + this.data = data + this + } + + /** + * Set dimensions of matrix: rows + */ + def setNumRows(m: Int): this.type = { + this.m = m + this + } + + /** + * Set dimensions of matrix: columns + */ + def setNumCols(n: Int): this.type = { + this.n = n + this + } +} + + /** * Top-level methods for calling Singular Value Decomposition * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)] @@ -76,7 +120,7 @@ object SVD { // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry => - (entry.i, (entry.j, entry.mval))).groupByKey().cache() + (entry.i, (entry.j, entry.mval))).groupByKey() val emits = rows.flatMap{ case (rowind, cols) => cols.flatMap{ case (colind1, mval1) => cols.map{ case (colind2, mval2) => @@ -85,7 +129,7 @@ object SVD { // Construct jblas A^T A locally val ata = DoubleMatrix.zeros(n, n) - for(entry <- emits.toArray) { + for (entry <- emits.toArray) { ata.put(entry._1._1-1, entry._1._2-1, entry._2) } @@ -94,7 +138,7 @@ object SVD { val V = svd(0) val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9) - if(sigmas.size < k) { + if (sigmas.size < k) { throw new Exception("Not enough singular values to return") } @@ -105,14 +149,15 @@ object SVD { // prepare V for returning val retV = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => - MatrixEntry(i+1, j+1, V.get(i,j)) }.flatten) + MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) - val retS = sc.makeRDD(Array.tabulate(sigma.length){x=>MatrixEntry(x+1,x+1,sigma(x))}) + val retS = sc.makeRDD(Array.tabulate(sigma.length){ + x => MatrixEntry(x + 1,x + 1, sigma(x))}) // Compute U as U = A V S^-1 - // turn V S^-1 into an RDD as a sparse matrix and cache it + // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i+1, j+1), V.get(i,j)/sigma(j)) }.flatten).cache() + { (i,j) => ((i + 1, j + 1), V.get(i,j)/sigma(j)) }.flatten) // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) From 8bfcce1ad81348a5eac3e3d332ddc293380c041a Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 11:52:42 -0800 Subject: [PATCH 29/46] fix tests --- .../org/apache/spark/mllib/linalg/SVD.scala | 28 +++++++++++++------ .../apache/spark/mllib/linalg/SVDSuite.scala | 28 +++++++++++-------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 9703e84312759..d1ee6c64896f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -31,7 +31,7 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} * @param m number of rows * @param n number of columns */ -class GradientDescent(var data: RDD[MatrixEntry], var m: Int, var n: Int) { +class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { private var k: Int = 1 /** @@ -65,6 +65,13 @@ class GradientDescent(var data: RDD[MatrixEntry], var m: Int, var n: Int) { this.n = n this } + + /** + * Compute SVD using the current set parameters + */ + def computeSVD() : SVDecomposedMatrix = { + SVD.sparseSVD(data, m, n, k) + } } @@ -169,33 +176,36 @@ object SVD { SVDecomposedMatrix(retU, retS, retV) } -/* + def main(args: Array[String]) { if (args.length < 8) { - println("Usage: SVD - ") + println("Usage: SVD " + + " ") System.exit(1) } - val (master, inputFile, m, n, min_svalue, output_u, output_s, output_v) = + val (master, inputFile, m, n, k, output_u, output_s, output_v) = (args(0), args(1), args(2).toInt, args(3).toInt, - args(4).toDouble, args(5), args(6), args(7)) + args(4).toInt, args(5), args(6), args(7)) val sc = new SparkContext(master, "SVD") val rawdata = sc.textFile(inputFile) val data = rawdata.map { line => val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val decomposed = SVD.sparseSVD(data, m, n, k) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V + println("Computed " + s.toArray.length + " singular values and vectors") u.saveAsTextFile(output_u) s.saveAsTextFile(output_s) v.saveAsTextFile(output_v) System.exit(0) } -*/ } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index dc4e9239a2ddc..a83d9d015b6e7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -87,15 +87,18 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { // check multiplication guarantee assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) } -/* + test("rank one matrix svd") { val m = 10 val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> - ((a+1,b+1), 1.0) }.flatten ) - val min_svalue = 1.0e-4 + val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) => + MatrixEntry(a + 1, b + 1, 1.0) }.flatten ) + val k = 1 - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val decomposed = SVD.sparseSVD(data, m, n, k) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V val retrank = s.toArray.length assert(retrank == 1, "rank returned not one") @@ -116,15 +119,18 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) } - test("truncated with min singular value") { + test("truncated with k") { val m = 10 val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> - ((a+1,b+1), (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) + val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => + MatrixEntry(a + 1, b + 1, (a + 2).toDouble*(b + 1)/(1 + a + b)) }.flatten ) - val min_svalue = 5.0 // only one svalue above this + val k = 1 // only one svalue above this - val (u, s, v) = SVD.sparseSVD(data, m, n, min_svalue) + val decomposed = SVD.sparseSVD(data, m, n, k) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V val retrank = s.toArray.length val densea = getDenseMatrix(data, m, n) @@ -140,5 +146,5 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assertMatrixEquals(retu, svd(0).getColumn(0)) assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) assertMatrixEquals(retv, svd(2).getColumn(0)) - }*/ + } } From e9bd6cb51dce9222a5a284cd171b299b0169852b Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 12:33:22 -0800 Subject: [PATCH 30/46] new example file --- .../org/apache/spark/examples/SparkSVD.scala | 58 +++++++++++++++++++ .../org/apache/spark/mllib/linalg/SVD.scala | 1 - 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala new file mode 100644 index 0000000000000..5590ee728a843 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.SVD +import org.apache.spark.mllib.linalg.MatrixEntry + +/** + * Compute SVD of an example matrix + * Input file should be comma separated, 1 indexed of the form + * i,j,value + * Where i is the column, j the row, and value is the matrix entry + * + * For example input file, see: + * mllib/data/als/test.data + */ +object SparkSVD { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SVD ") + System.exit(1) + } + val sc = new SparkContext(args(0), "SVD", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Load and parse the data file + val data = sc.textFile(args(1)).map { line => + val parts = line.split(',') + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) + } + val m = 4 + val n = 4 + + // recover largest singular vector + val decomposed = SVD.sparseSVD(data, m, n, 1) + val u = decomposed.U + val s = decomposed.S + val v = decomposed.V + + println("singular values = " + s.toArray.mkString) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index d1ee6c64896f9..e58b8e8fbdeb9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} - /** * Class used to obtain singular value decompositions * @param data Matrix in sparse matrix format From cdff9fc858b9b83eb1119ec2a6d1d3c9a66f47a9 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 12:44:04 -0800 Subject: [PATCH 31/46] prettify --- .../org/apache/spark/mllib/linalg/SVD.scala | 9 ++--- .../apache/spark/mllib/linalg/SVDSuite.scala | 34 +++++++++---------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index e58b8e8fbdeb9..31990b0223c42 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -136,7 +136,7 @@ object SVD { // Construct jblas A^T A locally val ata = DoubleMatrix.zeros(n, n) for (entry <- emits.toArray) { - ata.put(entry._1._1-1, entry._1._2-1, entry._2) + ata.put(entry._1._1 - 1, entry._1._2 - 1, entry._2) } // Since A^T A is small, we can compute its SVD directly @@ -158,12 +158,12 @@ object SVD { MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) val retS = sc.makeRDD(Array.tabulate(sigma.length){ - x => MatrixEntry(x + 1,x + 1, sigma(x))}) + x => MatrixEntry(x + 1, x + 1, sigma(x))}) // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i + 1, j + 1), V.get(i,j)/sigma(j)) }.flatten) + { (i,j) => ((i + 1, j + 1), V.get(i,j) / sigma(j)) }.flatten) // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) @@ -178,10 +178,11 @@ object SVD { def main(args: Array[String]) { if (args.length < 8) { - println("Usage: SVD " + + println("Usage: SVD " + " ") System.exit(1) } + val (master, inputFile, m, n, k, output_u, output_s, output_v) = (args(0), args(1), args(2).toInt, args(3).toInt, args(4).toInt, args(5), args(6), args(7)) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index a83d9d015b6e7..4126e819e3296 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -47,25 +47,25 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { // Return jblas matrix from sparse matrix RDD def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = { val ret = DoubleMatrix.zeros(m, n) - matrix.toArray.map(x => ret.put(x.i-1, x.j-1, x.mval)) + matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) ret } def assertMatrixEquals(a:DoubleMatrix, b:DoubleMatrix) { assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch") val diff = DoubleMatrix.zeros(a.rows, a.columns) - Array.tabulate(a.rows, a.columns){(i,j) => - diff.put(i,j, - Math.min(Math.abs(a.get(i,j)-b.get(i,j)), - Math.abs(a.get(i,j)+b.get(i,j)))) } + Array.tabulate(a.rows, a.columns){(i, j) => + diff.put(i, j, + Math.min(Math.abs(a.get(i, j) - b.get(i, j)), + Math.abs(a.get(i, j) + b.get(i, j)))) } assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1) } test("full rank matrix svd") { val m = 10 val n = 3 - val data = sc.makeRDD(Array.tabulate(m,n){ (a,b)=> - MatrixEntry(a+1,b+1, (a+2).toDouble*(b+1)/(1+a+b)) }.flatten ) + val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => + MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) val decomposed = SVD.sparseSVD(data, m, n, n) val u = decomposed.U @@ -75,9 +75,9 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val densea = getDenseMatrix(data, m, n) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u,m,n) - val rets = getDenseMatrix(s,n,n) - val retv = getDenseMatrix(v,n,n) + val retu = getDenseMatrix(u, m, n) + val rets = getDenseMatrix(s, n, n) + val retv = getDenseMatrix(v, n, n) // check individual decomposition assertMatrixEquals(retu, svd(0)) @@ -106,9 +106,9 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val densea = getDenseMatrix(data, m, n) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u,m,retrank) - val rets = getDenseMatrix(s,retrank,retrank) - val retv = getDenseMatrix(v,n,retrank) + val retu = getDenseMatrix(u, m, retrank) + val rets = getDenseMatrix(s, retrank, retrank) + val retv = getDenseMatrix(v, n, retrank) // check individual decomposition assertMatrixEquals(retu, svd(0).getColumn(0)) @@ -123,7 +123,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = 10 val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a + 1, b + 1, (a + 2).toDouble*(b + 1)/(1 + a + b)) }.flatten ) + MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) val k = 1 // only one svalue above this @@ -136,9 +136,9 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val densea = getDenseMatrix(data, m, n) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u,m,retrank) - val rets = getDenseMatrix(s,retrank,retrank) - val retv = getDenseMatrix(v,n,retrank) + val retu = getDenseMatrix(u, m, retrank) + val rets = getDenseMatrix(s, retrank, retrank) + val retv = getDenseMatrix(v, n, retrank) assert(retrank == 1, "rank returned not one") From 06c0f7628a213a08ef5adeab903160b806680acf Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 4 Jan 2014 14:28:07 -0800 Subject: [PATCH 32/46] use SparseMatrix everywhere --- .../org/apache/spark/examples/SparkSVD.scala | 9 +-- .../org/apache/spark/mllib/linalg/SVD.scala | 67 ++++++------------- .../mllib/linalg/SVDecomposedMatrix.scala | 8 +-- .../spark/mllib/linalg/SparseMatrix.scala | 30 +++++++++ .../apache/spark/mllib/linalg/SVDSuite.scala | 50 ++++++++------ 5 files changed, 89 insertions(+), 75 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala index 5590ee728a843..4b9e674c68c4e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala @@ -20,6 +20,7 @@ package org.apache.spark.examples import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.SVD import org.apache.spark.mllib.linalg.MatrixEntry +import org.apache.spark.mllib.linalg.SparseMatrix /** * Compute SVD of an example matrix @@ -48,10 +49,10 @@ object SparkSVD { val n = 4 // recover largest singular vector - val decomposed = SVD.sparseSVD(data, m, n, 1) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data println("singular values = " + s.toArray.mkString) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 31990b0223c42..a8efdc787e270 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -26,11 +26,8 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} /** * Class used to obtain singular value decompositions - * @param data Matrix in sparse matrix format - * @param m number of rows - * @param n number of columns */ -class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { +class SVD { private var k: Int = 1 /** @@ -41,35 +38,11 @@ class SVD(var data: RDD[MatrixEntry], var m: Int, var n: Int) { this } - /** - * Set matrix to be used for SVD - */ - def setDatadata(data: RDD[MatrixEntry]): this.type = { - this.data = data - this - } - - /** - * Set dimensions of matrix: rows - */ - def setNumRows(m: Int): this.type = { - this.m = m - this - } - - /** - * Set dimensions of matrix: columns - */ - def setNumCols(n: Int): this.type = { - this.n = n - this - } - /** * Compute SVD using the current set parameters */ - def computeSVD() : SVDecomposedMatrix = { - SVD.sparseSVD(data, m, n, k) + def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = { + SVD.sparseSVD(matrix, k) } } @@ -103,19 +76,19 @@ object SVD { * All input and output is expected in sparse matrix format, 1-indexed * as tuples of the form ((i,j),value) all in RDDs * - * @param data RDD Matrix in sparse 1-index format ((int, int), value) - * @param m number of rows - * @param n number of columns + * @param matrix sparse matrix to factorize * @param k Recover k singular values and vectors * @return Three sparse matrices: U, S, V such that A = USV^T */ def sparseSVD( - data: RDD[MatrixEntry], - m: Int, - n: Int, + matrix: SparseMatrix, k: Int) : SVDecomposedMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n + if (m < n || m <= 0 || n <= 0) { throw new IllegalArgumentException("Expecting a tall and skinny matrix") } @@ -153,13 +126,16 @@ object SVD { val sc = data.sparkContext // prepare V for returning - val retV = sc.makeRDD( + val retVdata = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) - - val retS = sc.makeRDD(Array.tabulate(sigma.length){ + val retV = SparseMatrix(retVdata, V.rows, sigma.length) + + val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ x => MatrixEntry(x + 1, x + 1, sigma(x))}) + val retS = SparseMatrix(retSdata, sigma.length, sigma.length) + // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) @@ -168,10 +144,11 @@ object SVD { // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2))) - val retU = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) + val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_) .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} - + val retU = SparseMatrix(retUdata, m, sigma.length) + SVDecomposedMatrix(retU, retS, retV) } @@ -195,10 +172,10 @@ object SVD { MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val decomposed = SVD.sparseSVD(data, m, n, k) - val u = decomposed.U - val s = decomposed.S - val v = decomposed.V + val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) + val u = decomposed.U.data + val s = decomposed.S.data + val v = decomposed.V.data println("Computed " + s.toArray.length + " singular values and vectors") u.saveAsTextFile(output_u) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala index e0bcdab2d2856..622003576d474 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.linalg -import org.apache.spark.rdd.RDD - /** * Class that represents the SV decomposition of a matrix * @@ -26,6 +24,6 @@ import org.apache.spark.rdd.RDD * @param S such that A = USV^T * @param V such that A = USV^T */ -case class SVDecomposedMatrix(val U: RDD[MatrixEntry], - val S: RDD[MatrixEntry], - val V: RDD[MatrixEntry]) +case class SVDecomposedMatrix(val U: SparseMatrix, + val S: SparseMatrix, + val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala new file mode 100644 index 0000000000000..cbd1a2a5a4bd8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + + +/** + * Class that represents a sparse matrix + * + * @param data RDD of nonzero entries + * @param m number of rows + * @param n numner of columns + */ +case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 4126e819e3296..f239e8505ff1a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -45,9 +45,12 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val EPSILON = 1e-4 // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix:RDD[MatrixEntry], m:Int, n:Int) : DoubleMatrix = { + def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = { + val data = matrix.data + val m = matrix.m + val n = matrix.n val ret = DoubleMatrix.zeros(m, n) - matrix.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) + matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) ret } @@ -67,24 +70,26 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) - val decomposed = SVD.sparseSVD(data, m, n, n) + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, n) val u = decomposed.U val v = decomposed.V - val s = decomposed.S + val s = decomposed.S - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, n) - val rets = getDenseMatrix(s, n, n) - val retv = getDenseMatrix(v, n, n) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) // check individual decomposition assertMatrixEquals(retu, svd(0)) assertMatrixEquals(rets, DoubleMatrix.diag(svd(1))) assertMatrixEquals(retv, svd(2)) - // check multiplication guarantee + // check multiplication guarantee assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea) } @@ -95,20 +100,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { MatrixEntry(a + 1, b + 1, 1.0) }.flatten ) val k = 1 - val decomposed = SVD.sparseSVD(data, m, n, k) + val a = SparseMatrix(data, m, n) + + val decomposed = SVD.sparseSVD(a, k) val u = decomposed.U val s = decomposed.S val v = decomposed.V - val retrank = s.toArray.length + val retrank = s.data.toArray.length assert(retrank == 1, "rank returned not one") - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, retrank) - val rets = getDenseMatrix(s, retrank, retrank) - val retv = getDenseMatrix(v, n, retrank) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) // check individual decomposition assertMatrixEquals(retu, svd(0).getColumn(0)) @@ -124,21 +131,22 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) + val a = SparseMatrix(data, m, n) val k = 1 // only one svalue above this - val decomposed = SVD.sparseSVD(data, m, n, k) + val decomposed = SVD.sparseSVD(a, k) val u = decomposed.U val s = decomposed.S val v = decomposed.V - val retrank = s.toArray.length + val retrank = s.data.toArray.length - val densea = getDenseMatrix(data, m, n) + val densea = getDenseMatrix(a) val svd = Singular.sparseSVD(densea) - val retu = getDenseMatrix(u, m, retrank) - val rets = getDenseMatrix(s, retrank, retrank) - val retv = getDenseMatrix(v, n, retrank) + val retu = getDenseMatrix(u) + val rets = getDenseMatrix(s) + val retv = getDenseMatrix(v) assert(retrank == 1, "rank returned not one") From 746148bc18d5e25ea93f5ff17a6cb4da9b671b75 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 5 Jan 2014 18:03:57 -0800 Subject: [PATCH 33/46] fix docs to use SparseMatrix --- docs/mllib-guide.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index abeb55d081001..653848b6d4757 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -243,18 +243,21 @@ as tuples of the form ((i,j),value) all in RDDs. Below is example usage. import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.SVD +import org.apache.spark.mllib.linalg.SparseMatrix +import org.apache.spark.mllib.linalg.MatrixEntry // Load and parse the data file val data = sc.textFile("mllib/data/als/test.data").map { line => val parts = line.split(',') - ((parts(0).toInt, parts(1).toInt), parts(2).toDouble) + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } val m = 4 val n = 4 val k = 1 // recover largest singular vector -val (u, s, v) = SVD.sparseSVD(data, m, n, 1) +val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k) +val = decomposed.S.data println("singular values = " + s.toArray.mkString) From 7d7490b67b8d0ff7f731e9ff6328ed0fca3f43c1 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Tue, 7 Jan 2014 17:16:17 -0800 Subject: [PATCH 34/46] More sparse matrix usage. --- mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index a8efdc787e270..6590e8f357d70 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -74,7 +74,8 @@ object SVD { * V is n x k and satisfies V'V = eye(k) * * All input and output is expected in sparse matrix format, 1-indexed - * as tuples of the form ((i,j),value) all in RDDs + * as tuples of the form ((i,j),value) all in RDDs using the + * SparseMatrix class * * @param matrix sparse matrix to factorize * @param k Recover k singular values and vectors From 4f38b6fab5bf633a205b9039db9d4a26ed28ec89 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Tue, 7 Jan 2014 17:19:28 -0800 Subject: [PATCH 35/46] documentation for sparsematrix --- docs/mllib-guide.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 653848b6d4757..44e6c8f58b973 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -228,8 +228,8 @@ from which we recover S and V. Then we compute U via easy matrix multiplication as *U = A * V * S^-1* -Only singular vectors associated with singular values -greater or equal to MIN_SVALUE are recovered. If there are k +Only singular vectors associated with largest k singular values +are recovered. If there are k such values, then the dimensions of the return will be: * *S* is *k x k* and diagonal, holding the singular values on diagonal. @@ -237,7 +237,8 @@ such values, then the dimensions of the return will be: * *V* is *n x k* and satisfies V^TV = eye(k). All input and output is expected in sparse matrix format, 1-indexed -as tuples of the form ((i,j),value) all in RDDs. Below is example usage. +as tuples of the form ((i,j),value) all in +SparseMatrix RDDs. Below is example usage. {% highlight scala %} From cf5bd4ab2e9db72d3d9164053523e9e872d85b94 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 9 Jan 2014 22:39:41 -0800 Subject: [PATCH 36/46] fix example --- .../src/main/scala/org/apache/spark/examples/SparkSVD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala index 4b9e674c68c4e..d9c672f14063c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala @@ -33,8 +33,8 @@ import org.apache.spark.mllib.linalg.SparseMatrix */ object SparkSVD { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: SVD ") + if (args.length != 2) { + System.err.println("Usage: SparkSVD ") System.exit(1) } val sc = new SparkContext(args(0), "SVD", From 1afdeaeb2f436084a6fbe8d73690f148f7b462c4 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 10 Jan 2014 21:30:54 -0800 Subject: [PATCH 37/46] add dimension parameters to example --- .../scala/org/apache/spark/examples/SparkSVD.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala index d9c672f14063c..ce7c1c48b5910 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala @@ -29,12 +29,12 @@ import org.apache.spark.mllib.linalg.SparseMatrix * Where i is the column, j the row, and value is the matrix entry * * For example input file, see: - * mllib/data/als/test.data + * mllib/data/als/test.data (example is 4 x 4) */ object SparkSVD { def main(args: Array[String]) { - if (args.length != 2) { - System.err.println("Usage: SparkSVD ") + if (args.length != 4) { + System.err.println("Usage: SparkSVD m n") System.exit(1) } val sc = new SparkContext(args(0), "SVD", @@ -45,8 +45,8 @@ object SparkSVD { val parts = line.split(',') MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } - val m = 4 - val n = 4 + val m = args(2).toInt + val n = args(3).toInt // recover largest singular vector val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1) From d28bf4182758f08862d5838c918756801a9d7327 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 13:39:40 -0800 Subject: [PATCH 38/46] changes from PR --- docs/mllib-guide.md | 5 +++-- .../org/apache/spark/examples/{ => mllib}/SparkSVD.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) rename examples/src/main/scala/org/apache/spark/examples/{ => mllib}/SparkSVD.scala (98%) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index a140ecb618607..26350ce1069c7 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -445,11 +445,12 @@ Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that *A = U * S * V^T* -There is no restriction on m, but we require n^2 doubles to fit in memory. +There is no restriction on m, but we require n^2 doubles to +fit in memory locally on one machine. Further, n should be less than m. The decomposition is computed by first computing *A^TA = V S^2 V^T*, -computing svd locally on that (since n x n is small), +computing SVD locally on that (since n x n is small), from which we recover S and V. Then we compute U via easy matrix multiplication as *U = A * V * S^-1* diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala similarity index 98% rename from examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala rename to examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala index ce7c1c48b5910..50e5f5bd879e9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.examples +package org.apache.spark.examples.mllib import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg.SVD From cb13b15a60ce8eb55b2d2971a57ac8d4bd5c7574 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 13:55:42 -0800 Subject: [PATCH 39/46] use 0-indexing --- docs/mllib-guide.md | 4 ++-- .../org/apache/spark/mllib/linalg/MatrixEntry.scala | 4 ++-- .../{SVDecomposedMatrix.scala => MatrixSVD.scala} | 0 .../scala/org/apache/spark/mllib/linalg/SVD.scala | 12 ++++++------ .../org/apache/spark/mllib/linalg/SVDSuite.scala | 8 ++++---- 5 files changed, 14 insertions(+), 14 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/linalg/{SVDecomposedMatrix.scala => MatrixSVD.scala} (100%) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 26350ce1069c7..89ac64a086ea0 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -476,8 +476,8 @@ import org.apache.spark.mllib.linalg.MatrixEntry // Load and parse the data file val data = sc.textFile("mllib/data/als/test.data").map { line => - val parts = line.split(',') - MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) + val parts = line.split(',') + MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) } val m = 4 val n = 4 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala index c7f2abab9750c..416996fcbe760 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala @@ -20,8 +20,8 @@ package org.apache.spark.mllib.linalg /** * Class that represents an entry in a sparse matrix of doubles. * - * @param i row index (1 indexing used) - * @param j column index (1 indexing used) + * @param i row index (0 indexing used) + * @param j column index (0 indexing used) * @param mval value of entry in matrix */ case class MatrixEntry(val i: Int, val j: Int, val mval: Double) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala similarity index 100% rename from mllib/src/main/scala/org/apache/spark/mllib/linalg/SVDecomposedMatrix.scala rename to mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 6590e8f357d70..ba7a0fde77d1c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -49,7 +49,7 @@ class SVD { /** * Top-level methods for calling Singular Value Decomposition - * NOTE: All matrices are in 1-indexed sparse format RDD[((int, int), value)] + * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)] */ object SVD { /** @@ -73,7 +73,7 @@ object SVD { * U is m x k and satisfies U'U = eye(k) * V is n x k and satisfies V'V = eye(k) * - * All input and output is expected in sparse matrix format, 1-indexed + * All input and output is expected in sparse matrix format, 0-indexed * as tuples of the form ((i,j),value) all in RDDs using the * SparseMatrix class * @@ -110,7 +110,7 @@ object SVD { // Construct jblas A^T A locally val ata = DoubleMatrix.zeros(n, n) for (entry <- emits.toArray) { - ata.put(entry._1._1 - 1, entry._1._2 - 1, entry._2) + ata.put(entry._1._1, entry._1._2, entry._2) } // Since A^T A is small, we can compute its SVD directly @@ -129,18 +129,18 @@ object SVD { // prepare V for returning val retVdata = sc.makeRDD( Array.tabulate(V.rows, sigma.length){ (i,j) => - MatrixEntry(i + 1, j + 1, V.get(i,j)) }.flatten) + MatrixEntry(i, j, V.get(i,j)) }.flatten) val retV = SparseMatrix(retVdata, V.rows, sigma.length) val retSdata = sc.makeRDD(Array.tabulate(sigma.length){ - x => MatrixEntry(x + 1, x + 1, sigma(x))}) + x => MatrixEntry(x, x, sigma(x))}) val retS = SparseMatrix(retSdata, sigma.length, sigma.length) // Compute U as U = A V S^-1 // turn V S^-1 into an RDD as a sparse matrix val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length) - { (i,j) => ((i + 1, j + 1), V.get(i,j) / sigma(j)) }.flatten) + { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten) // Multiply A by VS^-1 val aCols = data.map(entry => (entry.j, (entry.i, entry.mval))) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index f239e8505ff1a..12b3801722e93 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -50,7 +50,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = matrix.m val n = matrix.n val ret = DoubleMatrix.zeros(m, n) - matrix.data.toArray.map(x => ret.put(x.i - 1, x.j - 1, x.mval)) + matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval)) ret } @@ -68,7 +68,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = 10 val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten ) val a = SparseMatrix(data, m, n) @@ -97,7 +97,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = 10 val n = 3 val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) => - MatrixEntry(a + 1, b + 1, 1.0) }.flatten ) + MatrixEntry(a, b, 1.0) }.flatten ) val k = 1 val a = SparseMatrix(data, m, n) @@ -130,7 +130,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val m = 10 val n = 3 val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) => - MatrixEntry(a + 1, b + 1, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) + MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten ) val a = SparseMatrix(data, m, n) val k = 1 // only one svalue above this From eb2d8c431f3fa3a5634fe94ef85ed78a08393a25 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 13:57:27 -0800 Subject: [PATCH 40/46] replace this.type with SVD --- mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index ba7a0fde77d1c..83fcb01205829 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -33,7 +33,7 @@ class SVD { /** * Set the number of top-k singular vectors to return */ - def setK(k: Int): this.type = { + def setK(k: Int): SVD = { this.k = k this } From dbec69bbf40db65563b754f2802a384de0c568e5 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 13:59:05 -0800 Subject: [PATCH 41/46] add rename computeSVD --- mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 83fcb01205829..91c622033120a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -41,7 +41,7 @@ class SVD { /** * Compute SVD using the current set parameters */ - def computeSVD(matrix: SparseMatrix) : SVDecomposedMatrix = { + def compute(matrix: SparseMatrix) : SVDecomposedMatrix = { SVD.sparseSVD(matrix, k) } } From c9b4845bc120b8b63d5e033dd1d506d84c420b20 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 14:14:29 -0800 Subject: [PATCH 42/46] prettify --- .../test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 12b3801722e93..32f3f141cd652 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -45,7 +45,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val EPSILON = 1e-4 // Return jblas matrix from sparse matrix RDD - def getDenseMatrix(matrix:SparseMatrix) : DoubleMatrix = { + def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = { val data = matrix.data val m = matrix.m val n = matrix.n @@ -54,7 +54,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { ret } - def assertMatrixEquals(a:DoubleMatrix, b:DoubleMatrix) { + def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) { assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch") val diff = DoubleMatrix.zeros(a.rows, a.columns) Array.tabulate(a.rows, a.columns){(i, j) => From 5c639d70df3da48bb52841aa57074ec151bb61cf Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 14:31:39 -0800 Subject: [PATCH 43/46] 0index docs --- docs/mllib-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 89ac64a086ea0..5be8ce1ebe970 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -463,7 +463,7 @@ such values, then the dimensions of the return will be: * *U* is *m x k* and satisfies U^T*U = eye(k). * *V* is *n x k* and satisfies V^TV = eye(k). -All input and output is expected in sparse matrix format, 1-indexed +All input and output is expected in sparse matrix format, 0-indexed as tuples of the form ((i,j),value) all in SparseMatrix RDDs. Below is example usage. From 4e96757793e7aee165381f80a60b3f46f60c9ebc Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 14:33:03 -0800 Subject: [PATCH 44/46] make example 0-indexed --- .../main/scala/org/apache/spark/examples/mllib/SparkSVD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala index 50e5f5bd879e9..19676fcc1a2b0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala @@ -43,7 +43,7 @@ object SparkSVD { // Load and parse the data file val data = sc.textFile(args(1)).map { line => val parts = line.split(',') - MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble) + MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble) } val m = args(2).toInt val n = args(3).toInt From fa3299835bd52faf766929987e1aa4686730e2b4 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 14:39:30 -0800 Subject: [PATCH 45/46] rename to MatrixSVD --- .../scala/org/apache/spark/mllib/linalg/MatrixSVD.scala | 6 +++--- .../src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala index 622003576d474..319f82b449096 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala @@ -24,6 +24,6 @@ package org.apache.spark.mllib.linalg * @param S such that A = USV^T * @param V such that A = USV^T */ -case class SVDecomposedMatrix(val U: SparseMatrix, - val S: SparseMatrix, - val V: SparseMatrix) +case class MatrixSVD(val U: SparseMatrix, + val S: SparseMatrix, + val V: SparseMatrix) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 91c622033120a..cab98b33a16ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -150,7 +150,7 @@ object SVD { .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)} val retU = SparseMatrix(retUdata, m, sigma.length) - SVDecomposedMatrix(retU, retS, retV) + MatrixSVD(retU, retS, retV) } From 85b95d039ddfc7a2b2b27f506852859181ed16c1 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 17 Jan 2014 14:40:51 -0800 Subject: [PATCH 46/46] rename to MatrixSVD --- mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index cab98b33a16ac..e476b534503dc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -41,7 +41,7 @@ class SVD { /** * Compute SVD using the current set parameters */ - def compute(matrix: SparseMatrix) : SVDecomposedMatrix = { + def compute(matrix: SparseMatrix) : MatrixSVD = { SVD.sparseSVD(matrix, k) } } @@ -84,7 +84,7 @@ object SVD { def sparseSVD( matrix: SparseMatrix, k: Int) - : SVDecomposedMatrix = + : MatrixSVD = { val data = matrix.data val m = matrix.m