Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jan 28, 2015
1 parent 24ec7b8 commit e1d3ee8
Showing 1 changed file with 48 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ private[mllib] class GridPartitioner(
* Returns the index of the partition the input coordinate belongs to.
*
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
* multiplication.
* multiplication. k is ignored in computing partitions.
* @return The index of the partition, which the coordinate belongs to.
*/
override def getPartition(key: Any): Int = {
key match {
case (i: Int, j: Int) =>
getPartitionId(i, j)
case (i: Int, j: Int, _) =>
case (i: Int, j: Int, _: Int) =>
getPartitionId(i, j)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key: $key")
throw new IllegalArgumentException(s"Unrecognized key: $key.")
}
}

Expand All @@ -73,7 +73,6 @@ private[mllib] class GridPartitioner(
i / rowsPerPart + j / colsPerPart * rowPartitions
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
Expand All @@ -87,10 +86,12 @@ private[mllib] class GridPartitioner(

private[mllib] object GridPartitioner {

/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}

/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
Expand All @@ -103,24 +104,25 @@ private[mllib] object GridPartitioner {
/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
* @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
* this distributed matrix.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
*/
class BlockMatrix(
val rdd: RDD[((Int, Int), Matrix)],
private var nRows: Long,
private var nCols: Long,
val blocks: RDD[((Int, Int), Matrix)],
val rowsPerBlock: Int,
val colsPerBlock: Int) extends DistributedMatrix with Logging {
val colsPerBlock: Int,
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix with Logging {

private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)

/**
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
Expand All @@ -135,45 +137,48 @@ class BlockMatrix(
rdd: RDD[((Int, Int), Matrix)],
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock)
this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
}

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = {
if (nRows <= 0L) nRows = dims._1
if (nRows <= 0L) estimateDim()
nRows
}

override def numCols(): Long = {
if (nCols <= 0L) nCols = dims._2
if (nCols <= 0L) estimateDim()
nCols
}

val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = rdd.partitions.size)

/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
}.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))

(math.max(rows, nRows), math.max(cols, nCols))
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)

/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
if (nRows <= 0L) nRows = rows
assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
if (nCols <= 0L) nCols = cols
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

/** Cache the underlying RDD. */
def cache(): BlockMatrix = {
rdd.cache()
/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
this
}

/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): BlockMatrix = {
rdd.persist(storageLevel)
/** Persists the underlying RDD with the specified storage level. */
def persist(storageLevel: StorageLevel): this.type = {
blocks.persist(storageLevel)
this
}

Expand All @@ -185,22 +190,22 @@ class BlockMatrix(
s"Int.MaxValue. Currently numCols: ${numCols()}")
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val mem = nRows * nCols / 125000
val m = numRows().toInt
val n = numCols().toInt
val mem = m * n / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val parts = rdd.collect()
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
val localBlocks = blocks.collect()
val values = new Array[Double](m * n)
localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
block.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * nRows + rowOffset + i
submat.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * m + rowOffset + i
values(indexOffset) = v
}
}
new DenseMatrix(nRows, nCols, values)
new DenseMatrix(m, n, values)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
Expand Down

0 comments on commit e1d3ee8

Please sign in to comment.