diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index de80367ba9211..0ab74ba294535 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -52,17 +52,17 @@ private[mllib] class GridPartitioner( * Returns the index of the partition the input coordinate belongs to. * * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in - * multiplication. + * multiplication. k is ignored in computing partitions. * @return The index of the partition, which the coordinate belongs to. */ override def getPartition(key: Any): Int = { key match { case (i: Int, j: Int) => getPartitionId(i, j) - case (i: Int, j: Int, _) => + case (i: Int, j: Int, _: Int) => getPartitionId(i, j) case _ => - throw new IllegalArgumentException(s"Unrecognized key: $key") + throw new IllegalArgumentException(s"Unrecognized key: $key.") } } @@ -73,7 +73,6 @@ private[mllib] class GridPartitioner( i / rowsPerPart + j / colsPerPart * rowPartitions } - /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { case r: GridPartitioner => @@ -87,10 +86,12 @@ private[mllib] class GridPartitioner( private[mllib] object GridPartitioner { + /** Creates a new [[GridPartitioner]] instance. */ def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) } + /** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */ def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { require(suggestedNumPartitions > 0) val scale = 1.0 / math.sqrt(suggestedNumPartitions) @@ -103,24 +104,25 @@ private[mllib] object GridPartitioner { /** * Represents a distributed matrix in blocks of local matrices. * - * @param rdd The RDD of SubMatrices (local matrices) that form this matrix - * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero, - * the number of rows will be calculated when `numRows` is invoked. - * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to - * zero, the number of columns will be calculated when `numCols` is invoked. + * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form + * this distributed matrix. * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final * rows are not required to have the given number of rows * @param colsPerBlock Number of columns that make up each block. The blocks forming the final * columns are not required to have the given number of columns + * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero, + * the number of rows will be calculated when `numRows` is invoked. + * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to + * zero, the number of columns will be calculated when `numCols` is invoked. */ class BlockMatrix( - val rdd: RDD[((Int, Int), Matrix)], - private var nRows: Long, - private var nCols: Long, + val blocks: RDD[((Int, Int), Matrix)], val rowsPerBlock: Int, - val colsPerBlock: Int) extends DistributedMatrix with Logging { + val colsPerBlock: Int, + private var nRows: Long, + private var nCols: Long) extends DistributedMatrix with Logging { - private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix) + private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix) /** * Alternate constructor for BlockMatrix without the input of the number of rows and columns. @@ -135,18 +137,16 @@ class BlockMatrix( rdd: RDD[((Int, Int), Matrix)], rowsPerBlock: Int, colsPerBlock: Int) = { - this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock) + this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L) } - private lazy val dims: (Long, Long) = getDim - override def numRows(): Long = { - if (nRows <= 0L) nRows = dims._1 + if (nRows <= 0L) estimateDim() nRows } override def numCols(): Long = { - if (nCols <= 0L) nCols = dims._2 + if (nCols <= 0L) estimateDim() nCols } @@ -154,26 +154,31 @@ class BlockMatrix( val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt private[mllib] var partitioner: GridPartitioner = - GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = rdd.partitions.size) - - /** Returns the dimensions of the matrix. */ - private def getDim: (Long, Long) = { - val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) => - (blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols) - }.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2))) - - (math.max(rows, nRows), math.max(cols, nCols)) + GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) + + /** Estimates the dimensions of the matrix. */ + private def estimateDim(): Unit = { + val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => + (blockRowIndex.toLong * rowsPerBlock + mat.numRows, + blockColIndex.toLong * colsPerBlock + mat.numCols) + }.reduce { (x0, x1) => + (math.max(x0._1, x1._1), math.max(x0._2, x1._2)) + } + if (nRows <= 0L) nRows = rows + assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.") + if (nCols <= 0L) nCols = cols + assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") } - /** Cache the underlying RDD. */ - def cache(): BlockMatrix = { - rdd.cache() + /** Caches the underlying RDD. */ + def cache(): this.type = { + blocks.cache() this } - /** Set the storage level for the underlying RDD. */ - def persist(storageLevel: StorageLevel): BlockMatrix = { - rdd.persist(storageLevel) + /** Persists the underlying RDD with the specified storage level. */ + def persist(storageLevel: StorageLevel): this.type = { + blocks.persist(storageLevel) this } @@ -185,22 +190,22 @@ class BlockMatrix( s"Int.MaxValue. Currently numCols: ${numCols()}") require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " + s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}") - val nRows = numRows().toInt - val nCols = numCols().toInt - val mem = nRows * nCols / 125000 + val m = numRows().toInt + val n = numCols().toInt + val mem = m * n / 125000 if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") - val parts = rdd.collect() - val values = new Array[Double](nRows * nCols) - parts.foreach { case ((blockRowIndex, blockColIndex), block) => + val localBlocks = blocks.collect() + val values = new Array[Double](m * n) + localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => val rowOffset = blockRowIndex * rowsPerBlock val colOffset = blockColIndex * colsPerBlock - block.foreachActive { (i, j, v) => - val indexOffset = (j + colOffset) * nRows + rowOffset + i + submat.foreachActive { (i, j, v) => + val indexOffset = (j + colOffset) * m + rowOffset + i values(indexOffset) = v } } - new DenseMatrix(nRows, nCols, values) + new DenseMatrix(m, n, values) } /** Collects data and assembles a local dense breeze matrix (for test only). */