From f378e163b04dad88f6e4fe309e45a5a632aa4101 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 10 Nov 2014 21:26:34 -0800 Subject: [PATCH] [SPARK-3974] Block Matrix Abstractions ready --- .../linalg/distributed/BlockMatrix.scala | 183 ++++++++---------- 1 file changed, 85 insertions(+), 98 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index a98671f431153..0a4837c29de3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -20,18 +20,32 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark._ -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices } +import org.apache.spark.mllib.linalg.DenseMatrix import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils -case class BlockPartition( - blockIdRow: Int, - blockIdCol: Int, - mat: DenseMatrix) extends Serializable +/** + * Represents a local matrix that makes up one block of a distributed BlockMatrix + * + * @param blockIdRow The row index of this block + * @param blockIdCol The column index of this block + * @param mat The underlying local matrix + */ +case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable -// Information about BlockMatrix maintained on the driver +/** + * Information about the BlockMatrix maintained on the driver + * + * @param partitionId The id of the partition the block is found in + * @param blockIdRow The row index of this block + * @param blockIdCol The column index of this block + * @param startRow The starting row index with respect to the distributed BlockMatrix + * @param numRows The number of rows in this block + * @param startCol The starting column index with respect to the distributed BlockMatrix + * @param numCols The number of columns in this block + */ case class BlockPartitionInfo( partitionId: Int, blockIdRow: Int, @@ -41,6 +55,13 @@ case class BlockPartitionInfo( startCol: Long, numCols: Int) extends Serializable +/** + * A partitioner that decides how the matrix is distributed in the cluster + * + * @param numPartitions Number of partitions + * @param rowPerBlock Number of rows that make up each block. + * @param colPerBlock Number of columns that make up each block. + */ abstract class BlockMatrixPartitioner( override val numPartitions: Int, val rowPerBlock: Int, @@ -52,6 +73,14 @@ abstract class BlockMatrixPartitioner( } } +/** + * A grid partitioner, which stores every block in a separate partition. + * + * @param numRowBlocks Number of blocks that form the rows of the matrix. + * @param numColBlocks Number of blocks that form the columns of the matrix. + * @param rowPerBlock Number of rows that make up each block. + * @param colPerBlock Number of columns that make up each block. + */ class GridPartitioner( val numRowBlocks: Int, val numColBlocks: Int, @@ -74,6 +103,14 @@ class GridPartitioner( } } +/** + * A specialized partitioner that stores all blocks in the same row in just one partition. + * + * @param numPartitions Number of partitions. Should be set as the number of blocks that form + * the rows of the matrix. + * @param rowPerBlock Number of rows that make up each block. + * @param colPerBlock Number of columns that make up each block. + */ class RowBasedPartitioner( override val numPartitions: Int, override val rowPerBlock: Int, @@ -93,6 +130,14 @@ class RowBasedPartitioner( } } +/** + * A specialized partitioner that stores all blocks in the same column in just one partition. + * + * @param numPartitions Number of partitions. Should be set as the number of blocks that form + * the columns of the matrix. + * @param rowPerBlock Number of rows that make up each block. + * @param colPerBlock Number of columns that make up each block. + */ class ColumnBasedPartitioner( override val numPartitions: Int, override val rowPerBlock: Int, @@ -114,39 +159,44 @@ class ColumnBasedPartitioner( } } +/** + * Represents a distributed matrix in blocks of local matrices. + * + * @param numRowBlocks Number of blocks that form the rows of this matrix + * @param numColBlocks Number of blocks that form the columns of this matrix + * @param rdd The RDD of BlockPartitions (local matrices) that form this matrix + * @param partitioner A partitioner that specifies how BlockPartitions are stored in the cluster + */ class BlockMatrix( val numRowBlocks: Int, val numColBlocks: Int, val rdd: RDD[BlockPartition], val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging { - // We need a key-value pair RDD to partition properly - private var matrixRDD = rdd.map { block => - partitioner match { - case r: RowBasedPartitioner => (block.blockIdRow, block) - case c: ColumnBasedPartitioner => (block.blockIdCol, block) - case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block) - case _ => throw new IllegalArgumentException("Unrecognized partitioner") - } - } + // A key-value pair RDD is required to partition properly + private var matrixRDD: RDD[(Int, BlockPartition)] = keyBy() @transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null - lazy val dims: (Long, Long) = getDim + private lazy val dims: (Long, Long) = getDim override def numRows(): Long = dims._1 override def numCols(): Long = dims._2 if (partitioner.name.equals("column")) { - require(numColBlocks == partitioner.numPartitions) + require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" + + " the number of partitions of the column partitioner.") } else if (partitioner.name.equals("row")) { - require(numRowBlocks == partitioner.numPartitions) + require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" + + " the number of partitions of the row partitioner.") } else if (partitioner.name.equals("grid")) { - require(numRowBlocks * numColBlocks == partitioner.numPartitions) + require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " + + "should match the number of partitions of the grid partitioner.") } else { throw new IllegalArgumentException("Unrecognized partitioner.") } + /* Returns the dimensions of the matrix. */ def getDim: (Long, Long) = { val bi = getBlockInfo val xDim = bi.map { x => @@ -194,11 +244,12 @@ class BlockMatrix( }.toMap blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) => - ((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), numRow, - cumulativeColSum(colId), numCol)) + ((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), + numRow, cumulativeColSum(colId), numCol)) }.toMap } + /* Returns a map of the information of the blocks that form the distributed matrix. */ def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = { if (blockInfo_ == null) { calculateBlockInfo() @@ -206,6 +257,7 @@ class BlockMatrix( blockInfo_ } + /* Returns the Frobenius Norm of the matrix */ def normFro(): Double = { math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _)) } @@ -222,8 +274,19 @@ class BlockMatrix( this } + private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = { + rdd.map { block => + part match { + case r: RowBasedPartitioner => (block.blockIdRow, block) + case c: ColumnBasedPartitioner => (block.blockIdCol, block) + case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block) + case _ => throw new IllegalArgumentException("Unrecognized partitioner") + } + } + } + def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = { - matrixRDD = matrixRDD.partitionBy(part) + matrixRDD = keyBy(part) this } @@ -259,80 +322,4 @@ class BlockMatrix( val localMat = collect() new BDM[Double](localMat.numRows, localMat.numCols, localMat.values) } - - def add(other: DistributedMatrix): DistributedMatrix = { - other match { - // We really need a function to check if two matrices are partitioned similarly - case otherBlocked: BlockMatrix => - if (checkPartitioning(otherBlocked, OperationNames.add)){ - val addedBlocks = rdd.zip(otherBlocked.rdd).map{ case (a, b) => - val result = a.mat.toBreeze + b.mat.toBreeze - new BlockPartition(a.blockIdRow, a.blockIdCol, - Matrices.fromBreeze(result).asInstanceOf[DenseMatrix]) - } - new BlockMatrix(numRowBlocks, numColBlocks, addedBlocks, partitioner) - } else { - throw new SparkException( - "Cannot add matrices with non-matching partitioners") - } - case _ => - throw new IllegalArgumentException("Cannot add matrices of different types") - } - } - - def multiply(other: DistributedMatrix): BlockMatrix = { - other match { - case otherBlocked: BlockMatrix => - if (checkPartitioning(otherBlocked, OperationNames.multiply)){ - - val resultPartitioner = new GridPartitioner(numRowBlocks, otherBlocked.numColBlocks, - partitioner.rowPerBlock, otherBlocked.partitioner.colPerBlock) - - val multiplyBlocks = matrixRDD.join(otherBlocked.matrixRDD, partitioner). - map { case (key, (mat1, mat2)) => - val C = mat1.mat multiply mat2.mat - (mat1.blockIdRow + numRowBlocks * mat2.blockIdCol, C.toBreeze) - }.reduceByKey(resultPartitioner, (a, b) => a + b) - - val newBlocks = multiplyBlocks.map{ case (index, mat) => - val colId = index / numRowBlocks - val rowId = index - colId * numRowBlocks - new BlockPartition(rowId, colId, Matrices.fromBreeze(mat).asInstanceOf[DenseMatrix]) - } - new BlockMatrix(numRowBlocks, otherBlocked.numColBlocks, newBlocks, resultPartitioner) - } else { - throw new SparkException( - "Cannot multiply matrices with non-matching partitioners") - } - case _ => - throw new IllegalArgumentException("Cannot add matrices of different types") - } - } - - private def checkPartitioning(other: BlockMatrix, operation: Int): Boolean = { - val otherPartitioner = other.partitioner - operation match { - case OperationNames.add => - partitioner.equals(otherPartitioner) - case OperationNames.multiply => - partitioner.name == "column" && otherPartitioner.name == "row" && - partitioner.numPartitions == otherPartitioner.numPartitions && - partitioner.colPerBlock == otherPartitioner.rowPerBlock && - numColBlocks == other.numRowBlocks - case _ => - throw new IllegalArgumentException("Unsupported operation") - } - } -} - -/** - * Maintains supported and default block matrix operation names. - * - * Currently supported operations: `add`, `multiply`. - */ -private object OperationNames { - - val add: Int = 1 - val multiply: Int = 2 - }