Skip to content

Commit

Permalink
Merge pull request apache#315 from rezazadeh/sparsesvd
Browse files Browse the repository at this point in the history
Sparse SVD

# Singular Value Decomposition
Given an *m x n* matrix *A*, compute matrices *U, S, V* such that

*A = U * S * V^T*

There is no restriction on m, but we require n^2 doubles to fit in memory.
Further, n should be less than m.

The decomposition is computed by first computing *A^TA = V S^2 V^T*,
computing svd locally on that (since n x n is small),
from which we recover S and V.
Then we compute U via easy matrix multiplication
as *U =  A * V * S^-1*

Only singular vectors associated with the largest k singular values
If there are k such values, then the dimensions of the return will be:

* *S* is *k x k* and diagonal, holding the singular values on diagonal.
* *U* is *m x k* and satisfies U^T*U = eye(k).
* *V* is *n x k* and satisfies V^TV = eye(k).

All input and output is expected in sparse matrix format, 0-indexed
as tuples of the form ((i,j),value) all in RDDs.

# Testing
Tests included. They test:
- Decomposition promise (A = USV^T)
- For small matrices, output is compared to that of jblas
- Rank 1 matrix test included
- Full Rank matrix test included
- Middle-rank matrix forced via k included

# Example Usage

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.SVD
import org.apache.spark.mllib.linalg.SparseMatrix
import org.apache.spark.mllib.linalg.MatrixyEntry

// Load and parse the data file
val data = sc.textFile("mllib/data/als/test.data").map { line =>
      val parts = line.split(',')
      MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}
val m = 4
val n = 4

// recover top 1 singular vector
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)

println("singular values = " + decomposed.S.data.toArray.mkString)

# Documentation
Added to docs/mllib-guide.md
  • Loading branch information
mateiz committed Jan 22, 2014
2 parents 749f842 + 85b95d0 commit d009b17
Show file tree
Hide file tree
Showing 7 changed files with 543 additions and 0 deletions.
51 changes: 51 additions & 0 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,54 @@ signals), you can use the trainImplicit method to get better results.
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, 1, 20)
{% endhighlight %}


# Singular Value Decomposition
Singular Value Decomposition for Tall and Skinny matrices.
Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that

*A = U * S * V^T*

There is no restriction on m, but we require n^2 doubles to
fit in memory locally on one machine.
Further, n should be less than m.

The decomposition is computed by first computing *A^TA = V S^2 V^T*,
computing SVD locally on that (since n x n is small),
from which we recover S and V.
Then we compute U via easy matrix multiplication
as *U = A * V * S^-1*

Only singular vectors associated with largest k singular values
are recovered. If there are k
such values, then the dimensions of the return will be:

* *S* is *k x k* and diagonal, holding the singular values on diagonal.
* *U* is *m x k* and satisfies U^T*U = eye(k).
* *V* is *n x k* and satisfies V^TV = eye(k).

All input and output is expected in sparse matrix format, 0-indexed
as tuples of the form ((i,j),value) all in
SparseMatrix RDDs. Below is example usage.

{% highlight scala %}

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.SVD
import org.apache.spark.mllib.linalg.SparseMatrix
import org.apache.spark.mllib.linalg.MatrixEntry

// Load and parse the data file
val data = sc.textFile("mllib/data/als/test.data").map { line =>
val parts = line.split(',')
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}
val m = 4
val n = 4
val k = 1

// recover largest singular vector
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
val = decomposed.S.data

println("singular values = " + s.toArray.mkString)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.SVD
import org.apache.spark.mllib.linalg.MatrixEntry
import org.apache.spark.mllib.linalg.SparseMatrix

/**
* Compute SVD of an example matrix
* Input file should be comma separated, 1 indexed of the form
* i,j,value
* Where i is the column, j the row, and value is the matrix entry
*
* For example input file, see:
* mllib/data/als/test.data (example is 4 x 4)
*/
object SparkSVD {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("Usage: SparkSVD <master> <file> m n")
System.exit(1)
}
val sc = new SparkContext(args(0), "SVD",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

// Load and parse the data file
val data = sc.textFile(args(1)).map { line =>
val parts = line.split(',')
MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble)
}
val m = args(2).toInt
val n = args(3).toInt

// recover largest singular vector
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data

println("singular values = " + s.toArray.mkString)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

/**
* Class that represents an entry in a sparse matrix of doubles.
*
* @param i row index (0 indexing used)
* @param j column index (0 indexing used)
* @param mval value of entry in matrix
*/
case class MatrixEntry(val i: Int, val j: Int, val mval: Double)
29 changes: 29 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

/**
* Class that represents the SV decomposition of a matrix
*
* @param U such that A = USV^T
* @param S such that A = USV^T
* @param V such that A = USV^T
*/
case class MatrixSVD(val U: SparseMatrix,
val S: SparseMatrix,
val V: SparseMatrix)
189 changes: 189 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}


/**
* Class used to obtain singular value decompositions
*/
class SVD {
private var k: Int = 1

/**
* Set the number of top-k singular vectors to return
*/
def setK(k: Int): SVD = {
this.k = k
this
}

/**
* Compute SVD using the current set parameters
*/
def compute(matrix: SparseMatrix) : MatrixSVD = {
SVD.sparseSVD(matrix, k)
}
}


/**
* Top-level methods for calling Singular Value Decomposition
* NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)]
*/
object SVD {
/**
* Singular Value Decomposition for Tall and Skinny matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
* A = U * S * V'
*
* There is no restriction on m, but we require n^2 doubles to fit in memory.
* Further, n should be less than m.
*
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small),
* from which we recover S and V.
* Then we compute U via easy matrix multiplication
* as U = A * V * S^-1
*
* Only the k largest singular values and associated vectors are found.
* If there are k such values, then the dimensions of the return will be:
*
* S is k x k and diagonal, holding the singular values on diagonal
* U is m x k and satisfies U'U = eye(k)
* V is n x k and satisfies V'V = eye(k)
*
* All input and output is expected in sparse matrix format, 0-indexed
* as tuples of the form ((i,j),value) all in RDDs using the
* SparseMatrix class
*
* @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(
matrix: SparseMatrix,
k: Int)
: MatrixSVD =
{
val data = matrix.data
val m = matrix.m
val n = matrix.n

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}

if (k < 1 || k > n) {
throw new IllegalArgumentException("Must request up to n singular values")
}

// Compute A^T A, assuming rows are sparse enough to fit in memory
val rows = data.map(entry =>
(entry.i, (entry.j, entry.mval))).groupByKey()
val emits = rows.flatMap{ case (rowind, cols) =>
cols.flatMap{ case (colind1, mval1) =>
cols.map{ case (colind2, mval2) =>
((colind1, colind2), mval1*mval2) } }
}.reduceByKey(_+_)

// Construct jblas A^T A locally
val ata = DoubleMatrix.zeros(n, n)
for (entry <- emits.toArray) {
ata.put(entry._1._1, entry._1._2, entry._2)
}

// Since A^T A is small, we can compute its SVD directly
val svd = Singular.sparseSVD(ata)
val V = svd(0)
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)

if (sigmas.size < k) {
throw new Exception("Not enough singular values to return")
}

val sigma = sigmas.take(k)

val sc = data.sparkContext

// prepare V for returning
val retVdata = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
MatrixEntry(i, j, V.get(i,j)) }.flatten)
val retV = SparseMatrix(retVdata, V.rows, sigma.length)

val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
x => MatrixEntry(x, x, sigma(x))})

val retS = SparseMatrix(retSdata, sigma.length, sigma.length)

// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
{ (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)

// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
=> ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
.map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
val retU = SparseMatrix(retUdata, m, sigma.length)

MatrixSVD(retU, retS, retV)
}


def main(args: Array[String]) {
if (args.length < 8) {
println("Usage: SVD <master> <matrix_file> <m> <n> " +
"<k> <output_U_file> <output_S_file> <output_V_file>")
System.exit(1)
}

val (master, inputFile, m, n, k, output_u, output_s, output_v) =
(args(0), args(1), args(2).toInt, args(3).toInt,
args(4).toInt, args(5), args(6), args(7))

val sc = new SparkContext(master, "SVD")

val rawdata = sc.textFile(inputFile)
val data = rawdata.map { line =>
val parts = line.split(',')
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}

val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data

println("Computed " + s.toArray.length + " singular values and vectors")
u.saveAsTextFile(output_u)
s.saveAsTextFile(output_s)
v.saveAsTextFile(output_v)
System.exit(0)
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

import org.apache.spark.rdd.RDD


/**
* Class that represents a sparse matrix
*
* @param data RDD of nonzero entries
* @param m number of rows
* @param n numner of columns
*/
case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
Loading

0 comments on commit d009b17

Please sign in to comment.