From 589fbb65478851d88ea5a7f5bf54c1fa8d53f055 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 14 Nov 2014 11:16:09 -0800 Subject: [PATCH] [SPARK-3974] Code review feedback addressed --- .../linalg/distributed/BlockMatrix.scala | 71 ++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 3c85af9ae6536..6b8931e9f2b1b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -32,10 +32,10 @@ import org.apache.spark.util.Utils * @param blockIdCol The column index of this block * @param mat The underlying local matrix */ -case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable +case class SubMatrix(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable /** - * Information about the BlockMatrix maintained on the driver + * Information of the submatrices of the BlockMatrix maintained on the driver * * @param partitionId The id of the partition the block is found in * @param blockIdRow The row index of this block @@ -45,7 +45,7 @@ case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) ex * @param startCol The starting column index with respect to the distributed BlockMatrix * @param numCols The number of columns in this block */ -case class BlockPartitionInfo( +case class SubMatrixInfo( partitionId: Int, blockIdRow: Int, blockIdCol: Int, @@ -67,6 +67,13 @@ abstract class BlockMatrixPartitioner( val colPerBlock: Int) extends Partitioner { val name: String + /** + * Returns the index of the partition the SubMatrix belongs to. + * + * @param key The key for the SubMatrix. Can be its row index, column index or position in the + * grid. + * @return The index of the partition, which the SubMatrix belongs to. + */ override def getPartition(key: Any): Int = { Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions) } @@ -91,6 +98,7 @@ class GridPartitioner( override val numPartitions = numRowBlocks * numColBlocks + /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { case r: GridPartitioner => @@ -118,6 +126,7 @@ class RowBasedPartitioner( override val name = "row" + /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { case r: RowBasedPartitioner => @@ -145,6 +154,7 @@ class ColumnBasedPartitioner( override val name = "column" + /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { case p: ColumnBasedPartitioner => @@ -163,19 +173,19 @@ class ColumnBasedPartitioner( * * @param numRowBlocks Number of blocks that form the rows of this matrix * @param numColBlocks Number of blocks that form the columns of this matrix - * @param rdd The RDD of BlockPartitions (local matrices) that form this matrix - * @param partitioner A partitioner that specifies how BlockPartitions are stored in the cluster + * @param rdd The RDD of SubMatrixs (local matrices) that form this matrix + * @param partitioner A partitioner that specifies how SubMatrixs are stored in the cluster */ class BlockMatrix( val numRowBlocks: Int, val numColBlocks: Int, - val rdd: RDD[BlockPartition], + val rdd: RDD[SubMatrix], val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging { // A key-value pair RDD is required to partition properly - private var matrixRDD: RDD[(Int, BlockPartition)] = keyBy() + private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy() - @transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null + @transient var blockInfo_ : Map[(Int, Int), SubMatrixInfo] = null private lazy val dims: (Long, Long) = getDim @@ -184,40 +194,36 @@ class BlockMatrix( if (partitioner.name.equals("column")) { require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" + - " the number of partitions of the column partitioner.") + s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " + + s"partitioner.numPartitions: ${partitioner.numPartitions}") } else if (partitioner.name.equals("row")) { require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" + - " the number of partitions of the row partitioner.") + s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " + + s"partitioner.numPartitions: ${partitioner.numPartitions}") } else if (partitioner.name.equals("grid")) { require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " + - "should match the number of partitions of the grid partitioner.") + s"should match the number of partitions of the grid partitioner. numRowBlocks * " + + s"numColBlocks: ${numRowBlocks * numColBlocks}, " + + s"partitioner.numPartitions: ${partitioner.numPartitions}") } else { throw new IllegalArgumentException("Unrecognized partitioner.") } - /* Returns the dimensions of the matrix. */ + /** Returns the dimensions of the matrix. */ def getDim: (Long, Long) = { val bi = getBlockInfo val xDim = bi.map { x => (x._1._1, x._2.numRows.toLong) - }.groupBy(x => x._1).values.map { x => - x.head._2.toLong - }.reduceLeft { - _ + _ - } + }.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _) val yDim = bi.map { x => (x._1._2, x._2.numCols.toLong) - }.groupBy(x => x._1).values.map { x => - x.head._2.toLong - }.reduceLeft { - _ + _ - } + }.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _) (xDim, yDim) } - /* Calculates the information for each block and collects it on the driver */ + /** Calculates the information for each block and collects it on the driver */ private def calculateBlockInfo(): Unit = { // collect may cause akka frameSize errors val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) => @@ -243,38 +249,38 @@ class BlockMatrix( }.toMap blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) => - ((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), + ((rowId, colId), new SubMatrixInfo(partId, rowId, colId, cumulativeRowSum(rowId), numRow, cumulativeColSum(colId), numCol)) }.toMap } - /* Returns a map of the information of the blocks that form the distributed matrix. */ - def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = { + /** Returns a map of the information of the blocks that form the distributed matrix. */ + def getBlockInfo: Map[(Int, Int), SubMatrixInfo] = { if (blockInfo_ == null) { calculateBlockInfo() } blockInfo_ } - /* Returns the Frobenius Norm of the matrix */ + /** Returns the Frobenius Norm of the matrix */ def normFro(): Double = { math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _)) } - /* Cache the underlying RDD. */ + /** Cache the underlying RDD. */ def cache(): DistributedMatrix = { matrixRDD.cache() this } - /* Set the storage level for the underlying RDD. */ + /** Set the storage level for the underlying RDD. */ def persist(storageLevel: StorageLevel): DistributedMatrix = { matrixRDD.persist(storageLevel) this } - /* Add a key to the underlying rdd for partitioning and joins. */ - private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = { + /** Add a key to the underlying rdd for partitioning and joins. */ + private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = { rdd.map { block => part match { case r: RowBasedPartitioner => (block.blockIdRow, block) @@ -296,7 +302,7 @@ class BlockMatrix( this } - /* Collect the distributed matrix on the driver. */ + /** Collect the distributed matrix on the driver. */ def collect(): DenseMatrix = { val parts = rdd.map(x => ((x.blockIdRow, x.blockIdCol), x.mat)). collect().sortBy(x => (x._1._2, x._1._1)) @@ -324,6 +330,7 @@ class BlockMatrix( new DenseMatrix(nRows, nCols, values) } + /** Collects data and assembles a local dense breeze matrix (for test only). */ private[mllib] def toBreeze(): BDM[Double] = { val localMat = collect() new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)