Skip to content

Commit

Permalink
[SPARK-4409] Fourth pass of code review
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Dec 20, 2014
1 parent f62d6c7 commit 10a63a6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 67 deletions.
145 changes: 83 additions & 62 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg

import java.util.{Arrays, Random}

import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map => MutableMap}
import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHashSet, ArrayBuffer}

import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}

Expand Down Expand Up @@ -150,11 +150,10 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])

/** Generate a `SparseMatrix` from the given `DenseMatrix`. */
def toSparse(): SparseMatrix = {
val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble
val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble
val colPtrs: Array[Int] = new Array[Int](numCols + 1)
val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt
val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt
var nnz = 0
var lastCol = -1
var j = 0
while (j < numCols) {
var i = 0
Expand All @@ -164,19 +163,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
if (v != 0.0) {
rowIndices += i
spVals += v
while (j != lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
nnz += 1
}
i += 1
}
j += 1
}
while (numCols > lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
colPtrs(j) = nnz
}
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result())
}
Expand Down Expand Up @@ -362,30 +354,54 @@ object SparseMatrix {

/**
* Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of
* (row, column, value) tuples.
* (i, j, value) tuples. Entries that have duplicate values of i and j are
* added together. Tuples where value is equal to zero will be omitted.
* @param numRows number of rows of the matrix
* @param numCols number of columns of the matrix
* @param entries Array of (row, column, value) tuples
* @param entries Array of (i, j, value) tuples
* @return The corresponding `SparseMatrix`
*/
def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = {
val sortedEntries = entries.sortBy(v => (v._2, v._1))
val colPtrs = new Array[Int](numCols + 1)
var nnz = 0
var lastCol = -1
val values = sortedEntries.map { case (i, j, v) =>
while (j != lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
var lastIndex = -1
sortedEntries.foreach { case (i, j, v) =>
require(i >= 0 && j >= 0, "Negative indices given. Please make sure all indices are " +
s"greater than or equal to zero. i: $i, j: $j, value: $v")
if (v != 0.0) {
while (j != lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
val index = j * numRows + i
if (lastIndex != index) {
nnz += 1
lastIndex = index
}
}
nnz += 1
v
}
while (numCols > lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values)
val values = new Array[Double](nnz)
val rowIndices = new Array[Int](nnz)
lastIndex = -1
var cnt = -1
sortedEntries.foreach { case (i, j, v) =>
if (v != 0.0) {
val index = j * numRows + i
if (lastIndex != index) {
cnt += 1
lastIndex = index
}
values(cnt) += v
rowIndices(cnt) = i
}
}
new SparseMatrix(numRows, numCols, colPtrs.toArray, rowIndices, values)
}

/**
Expand All @@ -397,57 +413,54 @@ object SparseMatrix {
new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0))
}

/** Generates a `SparseMatrix` with a given random number generator and `method`, which
* specifies the distribution. */
/** Generates the skeleton of a random `SparseMatrix` with a given random number generator. */
private def genRandMatrix(
numRows: Int,
numCols: Int,
density: Double,
rng: Random,
method: Random => Double): SparseMatrix = {
rng: Random): SparseMatrix = {
require(density >= 0.0 && density <= 1.0, "density must be a double in the range " +
s"0.0 <= d <= 1.0. Currently, density: $density")
val length = math.ceil(numRows * numCols * density).toInt
val entries = MutableMap[(Int, Int), Double]()
var i = 0
if (density == 0.0) {
return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1),
Array[Int](), Array[Double]())
} else if (density == 1.0) {
val rowIndices = Array.tabulate(numCols, numRows)((j, i) => i).flatten
return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray,
(0 until numRows * numCols).toArray, Array.fill(numRows * numCols)(method(rng)))
rowIndices, new Array[Double](numRows * numCols))
}
// Expected number of iterations is less than 1.5 * length
if (density < 0.34) {
while (i < length) {
var rowIndex = rng.nextInt(numRows)
var colIndex = rng.nextInt(numCols)
while (entries.contains((rowIndex, colIndex))) {
rowIndex = rng.nextInt(numRows)
colIndex = rng.nextInt(numCols)
}
entries += (rowIndex, colIndex) -> method(rng)
i += 1
if (density < 0.34) { // Expected number of iterations is less than 1.5 * length
val entries = MHashSet[(Int, Int)]()
while (entries.size < length) {
entries += ((rng.nextInt(numRows), rng.nextInt(numCols)))
}
val entryList = entries.toArray.map(v => (v._1, v._2, 1.0))
SparseMatrix.fromCOO(numRows, numCols, entryList)
} else { // selection - rejection method
var j = 0
val pool = numRows * numCols
// loop over columns so that the sort in fromCOO requires less sorting
val rowIndexBuilder = new MArrayBuilder.ofInt
val colPtrs = new Array[Int](numCols + 1)
while (i < length && j < numCols) {
var passedInPool = j * numRows
var r = 0
while (i < length && r < numRows) {
if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) {
entries += (r, j) -> method(rng)
rowIndexBuilder += r
i += 1
}
r += 1
passedInPool += 1
}
j += 1
colPtrs(j) = i
}
val rowIndices = rowIndexBuilder.result()
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](rowIndices.size))
}
SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2)))

}

/**
Expand All @@ -461,8 +474,8 @@ object SparseMatrix {
* @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1)
*/
def sprand(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = {
def method(rand: Random): Double = rand.nextDouble()
genRandMatrix(numRows, numCols, density, rng, method)
val mat = genRandMatrix(numRows, numCols, density, rng)
mat.update(i => rng.nextDouble())
}

/**
Expand All @@ -474,8 +487,8 @@ object SparseMatrix {
* @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1)
*/
def sprandn(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = {
def method(rand: Random): Double = rand.nextGaussian()
genRandMatrix(numRows, numCols, density, rng, method)
val mat = genRandMatrix(numRows, numCols, density, rng)
mat.update(i => rng.nextGaussian())
}

/**
Expand All @@ -488,11 +501,10 @@ object SparseMatrix {
val n = vector.size
vector match {
case sVec: SparseVector =>
val indices = sVec.indices
SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2)))
SparseMatrix.fromCOO(n, n, sVec.indices.zip(sVec.values).map(v => (v._1, v._1, v._2)))
case dVec: DenseVector =>
val values = dVec.values.zipWithIndex
val nnzVals = values.filter(v => v._1 != 0.0)
val entries = dVec.values.zipWithIndex
val nnzVals = entries.filter(v => v._1 != 0.0)
SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1)))
}
}
Expand Down Expand Up @@ -645,11 +657,11 @@ object Matrices {
return matrices(0)
}
val numRows = matrices(0).numRows
var rowsMatch = true
var hasSparse = false
var numCols = 0
matrices.foreach { mat =>
if (numRows != mat.numRows) rowsMatch = false
require(numRows == mat.numRows, "The number of rows of the matrices in this sequence, " +
"don't match!")
mat match {
case sparse: SparseMatrix => hasSparse = true
case dense: DenseMatrix => // empty on purpose
Expand All @@ -658,8 +670,6 @@ object Matrices {
}
numCols += mat.numCols
}
require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!")

if (!hasSparse) {
new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray))
} else {
Expand Down Expand Up @@ -723,12 +733,12 @@ object Matrices {
return matrices(0)
}
val numCols = matrices(0).numCols
var colsMatch = true
var hasSparse = false
var numRows = 0
var valsLength = 0
matrices.foreach { mat =>
if (numCols != mat.numCols) colsMatch = false
require(numCols == mat.numCols, "The number of rows of the matrices in this sequence, " +
"don't match!")
mat match {
case sparse: SparseMatrix =>
hasSparse = true
Expand All @@ -741,15 +751,26 @@ object Matrices {
numRows += mat.numRows

}
require(colsMatch, "The number of rows of the matrices in this sequence, don't match!")

if (!hasSparse) {
val matData = matrices.zipWithIndex.flatMap { case (mat, ind) =>
val allValues = new Array[Double](numRows * numCols)
var startRow = 0
matrices.foreach { mat =>
var j = 0
val nRows = mat.numRows
val values = mat.toArray
for (j <- 0 until numCols) yield (j, ind,
values.slice(j * mat.numRows, (j + 1) * mat.numRows))
}.sortBy(x => (x._1, x._2))
new DenseMatrix(numRows, numCols, matData.flatMap(_._3))
while (j < numCols) {
var i = 0
val indStart = j * numRows + startRow
val subMatStart = j * nRows
while (i < nRows) {
allValues(indStart + i) = values(subMatStart + i)
i += 1
}
j += 1
}
startRow += nRows
}
new DenseMatrix(numRows, numCols, allValues)
} else {
var startRow = 0
val entries: Array[(Int, Int, Double)] = matrices.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class MatricesSuite extends FunSuite {
assert(mat.colPtrs.eq(colPtrs), "should not copy data")
assert(mat.rowIndices.eq(rowIndices), "should not copy data")

val entries: Array[(Int, Int, Double)] = Array((1, 0, 1.0), (2, 0, 2.0),
(1, 2, 4.0), (2, 2, 5.0))
val entries: Array[(Int, Int, Double)] = Array((2, 2, 3.0), (1, 0, 1.0), (2, 0, 2.0),
(1, 2, 2.0), (2, 2, 2.0), (1, 2, 2.0), (0, 0, 0.0))

val mat2 = SparseMatrix.fromCOO(m, n, entries)
assert(mat.toBreeze === mat2.toBreeze)
assert(mat2.values.length == 4)
}

test("sparse matrix construction with wrong number of elements") {
Expand Down Expand Up @@ -308,12 +309,15 @@ class MatricesSuite extends FunSuite {
test("sprand") {
val rng = mock[Random]
when(rng.nextInt(4)).thenReturn(0, 1, 1, 3, 2, 2, 0, 1, 3, 0)
when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0)
when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)
val mat = SparseMatrix.sprand(4, 4, 0.25, rng)
assert(mat.numRows === 4)
assert(mat.numCols === 4)
assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1))
assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0))
assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0))
val mat2 = SparseMatrix.sprand(2, 3, 1.0, rng)
assert(mat2.rowIndices.toSeq === Seq(0, 1, 0, 1, 0, 1))
assert(mat2.colPtrs.toSeq === Seq(0, 2, 4, 6))
}

test("sprandn") {
Expand All @@ -324,6 +328,6 @@ class MatricesSuite extends FunSuite {
assert(mat.numRows === 4)
assert(mat.numCols === 4)
assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1))
assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0))
assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0))
}
}

0 comments on commit 10a63a6

Please sign in to comment.