Skip to content

Commit

Permalink
[SPARK-5486] Added validate method to BlockMatrix
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 29, 2015
1 parent a3dc618 commit 0aa519a
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -172,6 +172,96 @@ class BlockMatrix(
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

def validate(): Unit = {
validator match {
case (ValidationError.DIMENSION_MISMATCH, e: AssertionError) =>
println(s"$e\nPlease instantiate a new BlockMatrix with the correct dimensions.")
case (ValidationError.DIMENSION_MISMATCH, exc: Exception) =>
println(s"There was an error while calculating the dimensions for this matrix.\n$exc")
case (ValidationError.DUPLICATE_INDEX, size: Int) =>
println(s"There are $size MatrixBlocks with duplicate indices. Please remove blocks with " +
s"duplicate indices. You may call reduceByKey on the underlying RDD and sum the " +
s"duplicates. You may convert the matrices to Breeze before summing them up.")
case (ValidationError.DUPLICATE_INDEX, duplicates: Map[(Int, Int), Long]) =>
println(s"The following indices have more than one Matrix:")
duplicates.foreach(index => println(s"Index: ${index._1}, count: ${index._2}"))
println("Please remove these blocks with duplicate indices. You may call reduceByKey on " +
"the underlying RDD and sum the duplicates. You may convert the matrices to Breeze " +
"before summing them up.")
case (ValidationError.DUPLICATE_INDEX, exc: Exception) =>
println(s"There was an error while looking for duplicate indices.\n$exc")
case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, size: Long) =>
println(s"There are $size MatrixBlocks with dimensions different than " +
s"rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may use the " +
s"repartition method to fix this issue.")
case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH,
mismatches: Array[((Int, Int), (Int, Int))]) =>
println(s"The following MatrixBlocks have dimensions different than " +
s"(rowsPerBlock, colsPerBlock): ($rowsPerBlock, $colsPerBlock)")
mismatches.foreach(index => println(s"Index: ${index._1}, dimensions: ${index._2}"))
println("You may use the repartition method to fix this issue.")
case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, exc: Exception) =>
println(s"There was an error while looking for MatrixBlock dimension mismatches.\n$exc")
case (ValidationError.NO_ERROR, _) => println("There are no problems with this BlockMatrix!")
}
}

private[mllib] def validator: (ValidationError.Value, Any) = {
logDebug("Validating BlockMatrix...")
// check if the matrix is larger than the claimed dimensions
try {
estimateDim()
logDebug("BlockMatrix dimensions are okay...")
} catch {
case exc: AssertionError => return (ValidationError.DIMENSION_MISMATCH, exc)
case e: Exception =>
logError(s"${e.getMessage}\n${e.getStackTraceString}")
return (ValidationError.DIMENSION_MISMATCH, e)
}
try {
// Check if there are multiple MatrixBlocks with the same index.
val indexCounts = blocks.countByKey().filter(p => p._2 > 1)
if (indexCounts.size > 50) {
return (ValidationError.DUPLICATE_INDEX, indexCounts.size)
} else if (indexCounts.size > 0) {
return (ValidationError.DUPLICATE_INDEX, indexCounts)
}
logDebug("MatrixBlock indices are okay...")
} catch {
case e: Exception =>
logError(s"${e.getMessage}\n${e.getStackTraceString}")
return (ValidationError.DUPLICATE_INDEX, e)
}
try {
// Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
// The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
val blockDimensionMismatches = blocks.filter { case ((blockRowIndex, blockColIndex), block) =>
if ((blockRowIndex == numRowBlocks - 1) || (blockColIndex == numColBlocks - 1)) {
false // neglect edge blocks
} else {
// include it if the dimensions don't match
!(block.numRows == rowsPerBlock && block.numCols == colsPerBlock)
}
}.map { case ((blockRowIndex, blockColIndex), mat) =>
((blockRowIndex, blockColIndex), (mat.numRows, mat.numCols))
}
val dimensionMismatchCount = blockDimensionMismatches.count()
// Don't send whole list if there are more than 50 matrices with the wrong dimensions
if (dimensionMismatchCount > 50) {
return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, dimensionMismatchCount)
} else if (dimensionMismatchCount > 0) {
return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, blockDimensionMismatches.collect())
}
logDebug("MatrixBlock dimensions are okay...")
logDebug("BlockMatrix is valid!")
(ValidationError.NO_ERROR, null)
} catch {
case e: Exception =>
logError(s"${e.getMessage}\n${e.getStackTraceString}")
(ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, e)
}
}

/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
Expand Down Expand Up @@ -238,3 +328,8 @@ class BlockMatrix(
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
}
}

private[mllib] object ValidationError extends Enumeration {
type ValidationError = Value
val NO_ERROR, DUPLICATE_INDEX, MATRIX_BLOCK_DIMENSION_MISMATCH, DIMENSION_MISMATCH, OTHER = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,60 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toLocalMatrix() === dense)
assert(gridBasedMat.toBreeze() === expected)
}

test("validator") {
// No error
val (error0, info0) = gridBasedMat.validator
assert(error0 === ValidationError.NO_ERROR)

// Wrong MatrixBlock dimensions
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val rdd = sc.parallelize(blocks, numPartitions)
val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart)
val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1)
val (error, information) = wrongRowPerParts.validator
assert(error === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH)
assert(information.isInstanceOf[Array[((Int, Int),(Int, Int))]])
val (error2, information2) = wrongColPerParts.validator
assert(error2 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH)
assert(information2.isInstanceOf[Array[((Int, Int),(Int, Int))]])
// Large number of mismatching MatrixBlock dimensions
val manyBlocks = for (i <- 0 until 60) yield ((i, 0), DenseMatrix.eye(1))
val manyWrongDims = new BlockMatrix(sc.parallelize(manyBlocks, numPartitions), 2, 2, 140, 4)
val (error3, information3) = manyWrongDims.validator
assert(error3 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH)
assert(information3.isInstanceOf[Long])

// Wrong BlockMatrix dimensions
val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
val (error4, information4) = wrongRowSize.validator
assert(error4 === ValidationError.DIMENSION_MISMATCH)
assert(information4.isInstanceOf[AssertionError])
val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
val (error5, information5) = wrongColSize.validator
assert(error5 === ValidationError.DIMENSION_MISMATCH)
assert(information5.isInstanceOf[AssertionError])

// Duplicate indices
val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2)
val (error6, information6) = dupMatrix.validator
assert(error6 === ValidationError.DUPLICATE_INDEX)
assert(information6.isInstanceOf[Map[(Int, Int), Long]])
val duplicateBlocks2 = for (i <- 0 until 110) yield ((i / 2, i / 2), DenseMatrix.eye(1))
val largeDupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks2, numPartitions), 1, 1)
val (error7, information7) = largeDupMatrix.validator
assert(error7 === ValidationError.DUPLICATE_INDEX)
assert(information7.isInstanceOf[Int])
}
}

0 comments on commit 0aa519a

Please sign in to comment.