Skip to content

Commit

Permalink
persist factors in implicit ALS
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 18, 2014
1 parent 6983732 commit 63862d6
Showing 1 changed file with 70 additions and 39 deletions.
109 changes: 70 additions & 39 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ class ALS private (
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
val sc = ratings.context

val numBlocks = if (this.numBlocks == -1) {
math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
Expand Down Expand Up @@ -187,53 +189,79 @@ class ALS private (
}
}

for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model
val YtY = computeYtY(users)
val YtYb = ratings.context.broadcast(YtY)
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtYb)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
val XtX = computeYtY(products)
val XtXb = ratings.context.broadcast(XtX)
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtXb)
if (implicitPrefs) {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
users.persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtX)
previousUsers.unpersist()
}
} else {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
}
}

products.persist()

// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

usersOut.persist()
productsOut.persist()

// Materialize usersOut and productsOut.
usersOut.count()
productsOut.count()

products.unpersist()

// Clean up.
userInLinks.unpersist()
userOutLinks.unpersist()
productInLinks.unpersist()
productOutLinks.unpersist()

new MatrixFactorizationModel(rank, usersOut, productsOut)
}

/**
* Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
* for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as
* the driver program requires `YtY` to broadcast it to the slaves
* for each user (or product), in a distributed fashion.
*
* @param factors the (block-distributed) user or product factor vectors
* @return Option[YtY] - whose value is only used in the implicit preference model
* @return YtY - whose value is only used in the implicit preference model
*/
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
if (implicitPrefs) {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
Option(YtY)
} else {
None
}
private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
YtY
}

/**
Expand Down Expand Up @@ -264,7 +292,7 @@ class ALS private (
/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
outLinks: RDD[(Int, OutLinkBlock)]) = {
blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
Expand Down Expand Up @@ -332,8 +360,11 @@ class ALS private (
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
links.persist(StorageLevel.MEMORY_AND_DISK)
(links.mapValues(_._1), links.mapValues(_._2))
val inLinks = links.mapValues(_._1)
val outLinks = links.mapValues(_._2)
inLinks.persist(StorageLevel.MEMORY_AND_DISK)
outLinks.persist(StorageLevel.MEMORY_AND_DISK)
(inLinks, outLinks)
}

/**
Expand Down Expand Up @@ -365,7 +396,7 @@ class ALS private (
rank: Int,
lambda: Double,
alpha: Double,
YtY: Broadcast[Option[DoubleMatrix]])
YtY: Option[Broadcast[DoubleMatrix]])
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
Expand All @@ -388,8 +419,8 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]])
private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
Expand Down Expand Up @@ -445,7 +476,7 @@ class ALS private (
// Solve the resulting matrix, which is symmetric and positive-definite
implicitPrefs match {
case false => Solve.solvePositive(fullXtX, userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
}
}
}
Expand Down

0 comments on commit 63862d6

Please sign in to comment.