Skip to content

Commit

Permalink
[SPARK-5486] Added validate method to BlockMatrix
Browse files Browse the repository at this point in the history
The `validate` method will allow users to debug their `BlockMatrix`, if operations like `add` or `multiply` return unexpected results. It checks the following properties in a `BlockMatrix`:
- Are the dimensions of the `BlockMatrix` consistent with what the user entered: (`nRows`, `nCols`)
- Are the dimensions of each `MatrixBlock` consistent with what the user entered: (`rowsPerBlock`, `colsPerBlock`)
- Are there blocks with duplicate indices

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#4279 from brkyvz/SPARK-5486 and squashes the following commits:

c152a73 [Burak Yavuz] addressed code review v2
598c583 [Burak Yavuz] merged master
b55ac5c [Burak Yavuz] addressed code review v1
25f083b [Burak Yavuz] simplify implementation
0aa519a [Burak Yavuz] [SPARK-5486] Added validate method to BlockMatrix
  • Loading branch information
brkyvz authored and mengxr committed Jan 30, 2015
1 parent 0a95085 commit 6ee8338
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
import org.apache.spark.{SparkException, Logging, Partitioner}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -158,11 +158,13 @@ class BlockMatrix(
private[mllib] var partitioner: GridPartitioner =
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)

private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache()

/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) =>
(blockRowIndex.toLong * rowsPerBlock + m,
blockColIndex.toLong * colsPerBlock + n)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
Expand All @@ -172,6 +174,41 @@ class BlockMatrix(
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

def validate(): Unit = {
logDebug("Validating BlockMatrix...")
// check if the matrix is larger than the claimed dimensions
estimateDim()
logDebug("BlockMatrix dimensions are okay...")

// Check if there are multiple MatrixBlocks with the same index.
blockInfo.countByKey().foreach { case (key, cnt) =>
if (cnt > 1) {
throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " +
"remove blocks with duplicate indices.")
}
}
logDebug("MatrixBlock indices are okay...")
// Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
// The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " +
s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " +
s"dimensions. You may use the repartition method to fix this issue."
blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) =>
if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) ||
(blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) {
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
dimensionMsg)
}
if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) ||
(blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) {
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
dimensionMsg)
}
}
logDebug("MatrixBlock dimensions are okay...")
logDebug("BlockMatrix is valid!")
}

/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Random
import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite

import org.apache.spark.SparkException
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext

Expand Down Expand Up @@ -147,6 +148,47 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toBreeze() === expected)
}

test("validate") {
// No error
gridBasedMat.validate()
// Wrong MatrixBlock dimensions
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val rdd = sc.parallelize(blocks, numPartitions)
val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart)
val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1)
intercept[SparkException] {
wrongRowPerParts.validate()
}
intercept[SparkException] {
wrongColPerParts.validate()
}
// Wrong BlockMatrix dimensions
val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
intercept[AssertionError] {
wrongRowSize.validate()
}
val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
intercept[AssertionError] {
wrongColSize.validate()
}
// Duplicate indices
val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2)
intercept[SparkException] {
dupMatrix.validate()
}
}

test("transpose") {
val expected = BDM(
(1.0, 0.0, 3.0, 0.0, 0.0),
Expand Down

0 comments on commit 6ee8338

Please sign in to comment.