From aa8f086ad7ac311660a2b0efee18827bb7d834f6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Nov 2014 11:36:22 -0800 Subject: [PATCH] [SPARK-3974] Additional comments added --- .../spark/mllib/linalg/distributed/BlockMatrix.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 0a4837c29de3e..3c85af9ae6536 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -22,7 +22,6 @@ import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark._ import org.apache.spark.mllib.linalg.DenseMatrix import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -218,8 +217,8 @@ class BlockMatrix( (xDim, yDim) } + /* Calculates the information for each block and collects it on the driver */ private def calculateBlockInfo(): Unit = { - // collect may cause akka frameSize errors val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) => iter.map { case (id, block) => @@ -274,6 +273,7 @@ class BlockMatrix( this } + /* Add a key to the underlying rdd for partitioning and joins. */ private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = { rdd.map { block => part match { @@ -285,6 +285,12 @@ class BlockMatrix( } } + /** + * Repartition the BlockMatrix using a different partitioner. + * + * @param part The partitioner to partition by + * @return The repartitioned BlockMatrix + */ def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = { matrixRDD = keyBy(part) this