Skip to content

Commit

Permalink
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
Browse files Browse the repository at this point in the history
…itioner
  • Loading branch information
brkyvz committed Nov 15, 2014
1 parent 49b9586 commit d033861
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,28 @@ import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockRowIndex The row index of this block
* @param blockColIndex The column index of this block
* @param blockRowIndex The row index of this block. Must be zero based.
* @param blockColIndex The column index of this block. Must be zero based.
* @param mat The underlying local matrix
*/
case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable

/**
* Information of the submatrices of the BlockMatrix maintained on the driver
*
* @param partitionId The id of the partition the block is found in
* @param blockRowIndex The row index of this block
* @param blockColIndex The column index of this block
* @param startRow The starting row index with respect to the distributed BlockMatrix
* @param numRows The number of rows in this block
* @param startCol The starting column index with respect to the distributed BlockMatrix
* @param numCols The number of columns in this block
*/
case class SubMatrixInfo(
partitionId: Int,
blockRowIndex: Int,
blockColIndex: Int,
startRow: Long,
numRows: Int,
startCol: Long,
numCols: Int) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
*
* @param numPartitions Number of partitions
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
abstract class BlockMatrixPartitioner(
private[mllib] abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
val rowPerBlock: Int,
val colPerBlock: Int) extends Partitioner {
Expand Down Expand Up @@ -173,20 +154,30 @@ class ColumnBasedPartitioner(
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrixs (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrixs are stored in the cluster
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[SubMatrix],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
* Partitioner by default.
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
*/
def this(numRowBlocks: Int, numColBlocks: Int, rdd: RDD[SubMatrix]) = {
this(numRowBlocks, numColBlocks, rdd, new GridPartitioner(numRowBlocks, numColBlocks,
rdd.first().mat.numRows, rdd.first().mat.numCols))
}
// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

@transient var blockInfo_ : Map[(Int, Int), SubMatrixInfo] = null

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
Expand All @@ -211,55 +202,26 @@ class BlockMatrix(

/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
val bi = getBlockInfo
val xDim = bi.map { x =>
(x._1._1, x._2.numRows.toLong)
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

val yDim = bi.map { x =>
(x._1._2, x._2.numCols.toLong)
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

(xDim, yDim)
}

/** Calculates the information for each block and collects it on the driver */
private def calculateBlockInfo(): Unit = {
// collect may cause akka frameSize errors
val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) =>
iter.map { case (id, block) =>
((block.blockRowIndex, block.blockColIndex), (partId, block.mat.numRows, block.mat.numCols))
val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0).
map { block =>
((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
}
}.collect()
val blockStartRowCols = blockStartRowColsParts.sortBy(_._1)

// Group blockInfo by rowId, pick the first row and sort on rowId
val rowReps = blockStartRowCols.groupBy(_._1._1).values.map(_.head).toSeq.sortBy(_._1._1)

// Group blockInfo by columnId, pick the first column and sort on columnId
val colReps = blockStartRowCols.groupBy(_._1._2).values.map(_.head).toSeq.sortBy(_._1._2)

// Calculate startRows
val cumulativeRowSum = rowReps.scanLeft((0, 0L)) { case (x1, x2) =>
(x1._1 + 1, x1._2 + x2._2._2)
}.toMap

val cumulativeColSum = colReps.scanLeft((0, 0L)) { case (x1, x2) =>
(x1._1 + 1, x1._2 + x2._2._3)
}.toMap

blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
((rowId, colId), new SubMatrixInfo(partId, rowId, colId, cumulativeRowSum(rowId),
numRow, cumulativeColSum(colId), numCol))
}.toMap
}

/** Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), SubMatrixInfo] = {
if (blockInfo_ == null) {
calculateBlockInfo()
}
blockInfo_
firstRowColumn.treeAggregate((0L, 0L))(
seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
if (indX == 0 && indY == 0) {
(x_dim + nRow, y_dim + nCol)
} else if (indX == 0) {
(x_dim, y_dim + nCol)
} else {
(x_dim + nRow, y_dim)
}
},
combOp = (c1, c2) => (c1, c2) match {
case ((x_dim1, y_dim1), (x_dim2, y_dim2)) =>
(x_dim1 + x_dim2, y_dim1 + y_dim2)
})
}

/** Returns the Frobenius Norm of the matrix */
Expand Down Expand Up @@ -309,23 +271,26 @@ class BlockMatrix(
val nRows = numRows().toInt
val nCols = numCols().toInt
val values = new Array[Double](nRows * nCols)
val blockInfos = getBlockInfo
var rowStart = 0
var colStart = 0
parts.foreach { part =>
val blockInfo = blockInfos((part._1._1, part._1._2))
// Figure out where this part should be put
if (part._1._1 == 0) rowStart = 0
val block = part._2
var j = 0
while (j < blockInfo.numCols) {
while (j < block.numCols) {
var i = 0
val indStart = (j + blockInfo.startCol.toInt) * nRows + blockInfo.startRow.toInt
val indEnd = blockInfo.numRows
val matStart = j * blockInfo.numRows
val mat = part._2.values
val indStart = (j + colStart) * nRows + rowStart
val indEnd = block.numRows
val matStart = j * block.numRows
val mat = block.values
while (i < indEnd) {
values(indStart + i) = mat(matStart + i)
i += 1
}
j += 1
}
rowStart += block.numRows
if (part._1._1 == numRowBlocks - 1) colStart += block.numCols
}
new DenseMatrix(nRows, nCols, values)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart)
val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart)
val gridPart = new GridPartitioner(numRowBlocks, numColBlocks, rowPerPart, colPerPart)

colBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numColBlocks), colPart)
rowBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numRowBlocks), rowPart)
gridBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks,
sc.parallelize(entries, numRowBlocks * numColBlocks), gridPart)
gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks,
sc.parallelize(entries, numRowBlocks * numColBlocks))
}

test("size") {
Expand Down Expand Up @@ -84,37 +82,4 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(rowBasedMat.collect() === dense)
assert(gridBasedMat.collect() === dense)
}

test("blockInfo") {
val colMatInfo = colBasedMat.getBlockInfo
val rowMatInfo = rowBasedMat.getBlockInfo
val gridMatInfo = gridBasedMat.getBlockInfo

assert(colMatInfo((0, 1)).numRows === 2)
assert(colMatInfo((0, 1)).numCols === 2)
assert(colMatInfo((0, 1)).startRow === 0)
assert(colMatInfo((0, 1)).startCol === 2)
assert(colMatInfo((2, 0)).numRows === 1)
assert(colMatInfo((2, 0)).numCols === 2)
assert(colMatInfo((2, 0)).startRow === 4)
assert(colMatInfo((2, 0)).startCol === 0)

assert(rowMatInfo((0, 1)).numRows === 2)
assert(rowMatInfo((0, 1)).numCols === 2)
assert(rowMatInfo((0, 1)).startRow === 0)
assert(rowMatInfo((0, 1)).startCol === 2)
assert(rowMatInfo((2, 0)).numRows === 1)
assert(rowMatInfo((2, 0)).numCols === 2)
assert(rowMatInfo((2, 0)).startRow === 4)
assert(rowMatInfo((2, 0)).startCol === 0)

assert(gridMatInfo((0, 1)).numRows === 2)
assert(gridMatInfo((0, 1)).numCols === 2)
assert(gridMatInfo((0, 1)).startRow === 0)
assert(gridMatInfo((0, 1)).startCol === 2)
assert(gridMatInfo((2, 0)).numRows === 1)
assert(gridMatInfo((2, 0)).numCols === 2)
assert(gridMatInfo((2, 0)).startRow === 4)
assert(gridMatInfo((2, 0)).startCol === 0)
}
}

0 comments on commit d033861

Please sign in to comment.