Skip to content

Commit

Permalink
[SPARK-3974] Code review feedback addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
Burak Yavuz committed Nov 14, 2014
1 parent aa8f086 commit 589fbb6
Showing 1 changed file with 39 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import org.apache.spark.util.Utils
* @param blockIdCol The column index of this block
* @param mat The underlying local matrix
*/
case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable
case class SubMatrix(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable

/**
* Information about the BlockMatrix maintained on the driver
* Information of the submatrices of the BlockMatrix maintained on the driver
*
* @param partitionId The id of the partition the block is found in
* @param blockIdRow The row index of this block
Expand All @@ -45,7 +45,7 @@ case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) ex
* @param startCol The starting column index with respect to the distributed BlockMatrix
* @param numCols The number of columns in this block
*/
case class BlockPartitionInfo(
case class SubMatrixInfo(
partitionId: Int,
blockIdRow: Int,
blockIdCol: Int,
Expand All @@ -67,6 +67,13 @@ abstract class BlockMatrixPartitioner(
val colPerBlock: Int) extends Partitioner {
val name: String

/**
* Returns the index of the partition the SubMatrix belongs to.
*
* @param key The key for the SubMatrix. Can be its row index, column index or position in the
* grid.
* @return The index of the partition, which the SubMatrix belongs to.
*/
override def getPartition(key: Any): Int = {
Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions)
}
Expand All @@ -91,6 +98,7 @@ class GridPartitioner(

override val numPartitions = numRowBlocks * numColBlocks

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
Expand Down Expand Up @@ -118,6 +126,7 @@ class RowBasedPartitioner(

override val name = "row"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: RowBasedPartitioner =>
Expand Down Expand Up @@ -145,6 +154,7 @@ class ColumnBasedPartitioner(

override val name = "column"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case p: ColumnBasedPartitioner =>
Expand All @@ -163,19 +173,19 @@ class ColumnBasedPartitioner(
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of BlockPartitions (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how BlockPartitions are stored in the cluster
* @param rdd The RDD of SubMatrixs (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrixs are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[BlockPartition],
val rdd: RDD[SubMatrix],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {

// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, BlockPartition)] = keyBy()
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

@transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null
@transient var blockInfo_ : Map[(Int, Int), SubMatrixInfo] = null

private lazy val dims: (Long, Long) = getDim

Expand All @@ -184,40 +194,36 @@ class BlockMatrix(

if (partitioner.name.equals("column")) {
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
" the number of partitions of the column partitioner.")
s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("row")) {
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
" the number of partitions of the row partitioner.")
s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("grid")) {
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
"should match the number of partitions of the grid partitioner.")
s"should match the number of partitions of the grid partitioner. numRowBlocks * " +
s"numColBlocks: ${numRowBlocks * numColBlocks}, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else {
throw new IllegalArgumentException("Unrecognized partitioner.")
}

/* Returns the dimensions of the matrix. */
/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
val bi = getBlockInfo
val xDim = bi.map { x =>
(x._1._1, x._2.numRows.toLong)
}.groupBy(x => x._1).values.map { x =>
x.head._2.toLong
}.reduceLeft {
_ + _
}
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

val yDim = bi.map { x =>
(x._1._2, x._2.numCols.toLong)
}.groupBy(x => x._1).values.map { x =>
x.head._2.toLong
}.reduceLeft {
_ + _
}
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

(xDim, yDim)
}

/* Calculates the information for each block and collects it on the driver */
/** Calculates the information for each block and collects it on the driver */
private def calculateBlockInfo(): Unit = {
// collect may cause akka frameSize errors
val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) =>
Expand All @@ -243,38 +249,38 @@ class BlockMatrix(
}.toMap

blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId),
((rowId, colId), new SubMatrixInfo(partId, rowId, colId, cumulativeRowSum(rowId),
numRow, cumulativeColSum(colId), numCol))
}.toMap
}

/* Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = {
/** Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), SubMatrixInfo] = {
if (blockInfo_ == null) {
calculateBlockInfo()
}
blockInfo_
}

/* Returns the Frobenius Norm of the matrix */
/** Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
}

/* Cache the underlying RDD. */
/** Cache the underlying RDD. */
def cache(): DistributedMatrix = {
matrixRDD.cache()
this
}

/* Set the storage level for the underlying RDD. */
/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): DistributedMatrix = {
matrixRDD.persist(storageLevel)
this
}

/* Add a key to the underlying rdd for partitioning and joins. */
private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = {
/** Add a key to the underlying rdd for partitioning and joins. */
private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = {
rdd.map { block =>
part match {
case r: RowBasedPartitioner => (block.blockIdRow, block)
Expand All @@ -296,7 +302,7 @@ class BlockMatrix(
this
}

/* Collect the distributed matrix on the driver. */
/** Collect the distributed matrix on the driver. */
def collect(): DenseMatrix = {
val parts = rdd.map(x => ((x.blockIdRow, x.blockIdCol), x.mat)).
collect().sortBy(x => (x._1._2, x._1._1))
Expand Down Expand Up @@ -324,6 +330,7 @@ class BlockMatrix(
new DenseMatrix(nRows, nCols, values)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
val localMat = collect()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
Expand Down

0 comments on commit 589fbb6

Please sign in to comment.