Skip to content

Commit

Permalink
[SPARK-3974] Block Matrix Abstractions ready
Browse files Browse the repository at this point in the history
  • Loading branch information
Burak Yavuz committed Nov 11, 2014
1 parent b693209 commit f378e16
Showing 1 changed file with 85 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,32 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices }
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

case class BlockPartition(
blockIdRow: Int,
blockIdCol: Int,
mat: DenseMatrix) extends Serializable
/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockIdRow The row index of this block
* @param blockIdCol The column index of this block
* @param mat The underlying local matrix
*/
case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable

// Information about BlockMatrix maintained on the driver
/**
* Information about the BlockMatrix maintained on the driver
*
* @param partitionId The id of the partition the block is found in
* @param blockIdRow The row index of this block
* @param blockIdCol The column index of this block
* @param startRow The starting row index with respect to the distributed BlockMatrix
* @param numRows The number of rows in this block
* @param startCol The starting column index with respect to the distributed BlockMatrix
* @param numCols The number of columns in this block
*/
case class BlockPartitionInfo(
partitionId: Int,
blockIdRow: Int,
Expand All @@ -41,6 +55,13 @@ case class BlockPartitionInfo(
startCol: Long,
numCols: Int) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
*
* @param numPartitions Number of partitions
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
val rowPerBlock: Int,
Expand All @@ -52,6 +73,14 @@ abstract class BlockMatrixPartitioner(
}
}

/**
* A grid partitioner, which stores every block in a separate partition.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
Expand All @@ -74,6 +103,14 @@ class GridPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same row in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the rows of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class RowBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
Expand All @@ -93,6 +130,14 @@ class RowBasedPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same column in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class ColumnBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
Expand All @@ -114,39 +159,44 @@ class ColumnBasedPartitioner(
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of BlockPartitions (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how BlockPartitions are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[BlockPartition],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {

// We need a key-value pair RDD to partition properly
private var matrixRDD = rdd.map { block =>
partitioner match {
case r: RowBasedPartitioner => (block.blockIdRow, block)
case c: ColumnBasedPartitioner => (block.blockIdCol, block)
case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, BlockPartition)] = keyBy()

@transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null

lazy val dims: (Long, Long) = getDim
private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
override def numCols(): Long = dims._2

if (partitioner.name.equals("column")) {
require(numColBlocks == partitioner.numPartitions)
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
" the number of partitions of the column partitioner.")
} else if (partitioner.name.equals("row")) {
require(numRowBlocks == partitioner.numPartitions)
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
" the number of partitions of the row partitioner.")
} else if (partitioner.name.equals("grid")) {
require(numRowBlocks * numColBlocks == partitioner.numPartitions)
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
"should match the number of partitions of the grid partitioner.")
} else {
throw new IllegalArgumentException("Unrecognized partitioner.")
}

/* Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
val bi = getBlockInfo
val xDim = bi.map { x =>
Expand Down Expand Up @@ -194,18 +244,20 @@ class BlockMatrix(
}.toMap

blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), numRow,
cumulativeColSum(colId), numCol))
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId),
numRow, cumulativeColSum(colId), numCol))
}.toMap
}

/* Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = {
if (blockInfo_ == null) {
calculateBlockInfo()
}
blockInfo_
}

/* Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
}
Expand All @@ -222,8 +274,19 @@ class BlockMatrix(
this
}

private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = {
rdd.map { block =>
part match {
case r: RowBasedPartitioner => (block.blockIdRow, block)
case c: ColumnBasedPartitioner => (block.blockIdCol, block)
case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
}

def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = {
matrixRDD = matrixRDD.partitionBy(part)
matrixRDD = keyBy(part)
this
}

Expand Down Expand Up @@ -259,80 +322,4 @@ class BlockMatrix(
val localMat = collect()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
}

def add(other: DistributedMatrix): DistributedMatrix = {
other match {
// We really need a function to check if two matrices are partitioned similarly
case otherBlocked: BlockMatrix =>
if (checkPartitioning(otherBlocked, OperationNames.add)){
val addedBlocks = rdd.zip(otherBlocked.rdd).map{ case (a, b) =>
val result = a.mat.toBreeze + b.mat.toBreeze
new BlockPartition(a.blockIdRow, a.blockIdCol,
Matrices.fromBreeze(result).asInstanceOf[DenseMatrix])
}
new BlockMatrix(numRowBlocks, numColBlocks, addedBlocks, partitioner)
} else {
throw new SparkException(
"Cannot add matrices with non-matching partitioners")
}
case _ =>
throw new IllegalArgumentException("Cannot add matrices of different types")
}
}

def multiply(other: DistributedMatrix): BlockMatrix = {
other match {
case otherBlocked: BlockMatrix =>
if (checkPartitioning(otherBlocked, OperationNames.multiply)){

val resultPartitioner = new GridPartitioner(numRowBlocks, otherBlocked.numColBlocks,
partitioner.rowPerBlock, otherBlocked.partitioner.colPerBlock)

val multiplyBlocks = matrixRDD.join(otherBlocked.matrixRDD, partitioner).
map { case (key, (mat1, mat2)) =>
val C = mat1.mat multiply mat2.mat
(mat1.blockIdRow + numRowBlocks * mat2.blockIdCol, C.toBreeze)
}.reduceByKey(resultPartitioner, (a, b) => a + b)

val newBlocks = multiplyBlocks.map{ case (index, mat) =>
val colId = index / numRowBlocks
val rowId = index - colId * numRowBlocks
new BlockPartition(rowId, colId, Matrices.fromBreeze(mat).asInstanceOf[DenseMatrix])
}
new BlockMatrix(numRowBlocks, otherBlocked.numColBlocks, newBlocks, resultPartitioner)
} else {
throw new SparkException(
"Cannot multiply matrices with non-matching partitioners")
}
case _ =>
throw new IllegalArgumentException("Cannot add matrices of different types")
}
}

private def checkPartitioning(other: BlockMatrix, operation: Int): Boolean = {
val otherPartitioner = other.partitioner
operation match {
case OperationNames.add =>
partitioner.equals(otherPartitioner)
case OperationNames.multiply =>
partitioner.name == "column" && otherPartitioner.name == "row" &&
partitioner.numPartitions == otherPartitioner.numPartitions &&
partitioner.colPerBlock == otherPartitioner.rowPerBlock &&
numColBlocks == other.numRowBlocks
case _ =>
throw new IllegalArgumentException("Unsupported operation")
}
}
}

/**
* Maintains supported and default block matrix operation names.
*
* Currently supported operations: `add`, `multiply`.
*/
private object OperationNames {

val add: Int = 1
val multiply: Int = 2

}

0 comments on commit f378e16

Please sign in to comment.