Skip to content

Commit

Permalink
[SPARK-3977] Conversion methods for BlockMatrix to other Distributed …
Browse files Browse the repository at this point in the history
…Matrices
  • Loading branch information
brkyvz committed Jan 28, 2015
1 parent e023112 commit cdb9895
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

/**
* A grid partitioner, which uses a regular grid to partition coordinates.
*
Expand Down Expand Up @@ -182,6 +184,34 @@ class BlockMatrix(
this
}

/** Converts to CoordinateMatrix. */
def toCoordinateMatrix(): CoordinateMatrix = {
val entryRDD = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) =>
val rowStart = blockRowIndex.toLong * rowsPerBlock
val colStart = blockColIndex.toLong * colsPerBlock
val entryValues = new ArrayBuffer[MatrixEntry](mat.numRows * mat.numCols)
mat match {
case dn: DenseMatrix =>
dn.foreachActive { (i, j, v) =>
if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v))
}
case sp: SparseMatrix =>
sp.foreachActive { (i, j, v) =>
entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v))
}
}
entryValues
}
new CoordinateMatrix(entryRDD, numRows(), numCols())
}

/** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
def toIndexedRowMatrix(): IndexedRowMatrix = {
require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " +
s"numCols: ${numCols()}")
toCoordinateMatrix().toIndexedRowMatrix()
}

/** Collect the distributed matrix on the driver as a `DenseMatrix`. */
def toLocalMatrix(): Matrix = {
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Matrix, SparseMatrix, Vectors}

/**
* :: Experimental ::
Expand Down Expand Up @@ -98,6 +97,43 @@ class CoordinateMatrix(
toIndexedRowMatrix().toRowMatrix()
}

/** Converts to BlockMatrix. */
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
val m = numRows()
val n = numCols()
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length)

val blocks: RDD[((Int, Int), Matrix)] = entries.map { entry =>
val blockRowIndex = (entry.i / rowsPerBlock).toInt
val blockColIndex = (entry.j / colsPerBlock).toInt

val rowId = entry.i % rowsPerBlock
val colId = entry.j % colsPerBlock

((blockRowIndex, blockColIndex), (rowId.toInt, colId.toInt, entry.value))
}.groupByKey(partitioner).map { case ((blockRowIndex, blockColIndex), entry) =>
val effRows =
if (blockRowIndex == numRowBlocks - 1) {
val modulo = m % rowsPerBlock
if (modulo != 0) modulo else rowsPerBlock
} else {
rowsPerBlock
}
val effCols =
if (blockColIndex == numColBlocks - 1) {
val modulo = n % colsPerBlock
if (modulo != 0) modulo else colsPerBlock
} else {
colsPerBlock
}
((blockRowIndex.toInt, blockColIndex.toInt),
SparseMatrix.fromCOO(effRows.toInt, effCols.toInt, entry))
}
new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n)
}

/** Determines the size by computing the max row/column index. */
private def computeSize() {
// Reduce will throw an exception if `entries` is empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.SingularValueDecomposition

import java.util.Arrays

/**
* :: Experimental ::
* Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
Expand Down Expand Up @@ -75,6 +77,11 @@ class IndexedRowMatrix(
new RowMatrix(rows.map(_.vector), 0L, nCols)
}

/** Converts to BlockMatrix. */
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
}

/**
* Converts this matrix to a
* [[org.apache.spark.mllib.linalg.distributed.CoordinateMatrix]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
}
}

test("toCoordinateMatrix") {
val coordMat = gridBasedMat.toCoordinateMatrix()
assert(coordMat.numRows() === m)
assert(coordMat.numCols() === n)
assert(coordMat.toBreeze() === gridBasedMat.toBreeze())
}

test("toIndexedRowMatrix") {
val rowMat = gridBasedMat.toIndexedRowMatrix()
assert(rowMat.numRows() === m)
assert(rowMat.numCols() === n)
assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
}

test("toBreeze and toLocalMatrix") {
val expected = BDM(
(1.0, 0.0, 0.0, 0.0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,11 @@ class CoordinateMatrixSuite extends FunSuite with MLlibTestSparkContext {
Vectors.dense(0.0, 9.0, 0.0, 0.0))
assert(rows === expected)
}

test("toBlockMatrix") {
val blockMat = mat.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === mat.toBreeze())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ class IndexedRowMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
}

test("toBlockMatrix") {
val idxRowMat = new IndexedRowMatrix(indexedRows)
val blockMat = idxRowMat.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMat.toBreeze())
}

test("multiply a local matrix") {
val A = new IndexedRowMatrix(indexedRows)
val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
Expand Down

0 comments on commit cdb9895

Please sign in to comment.