Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1266] persist factors in implicit ALS #165

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 89 additions & 56 deletions mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ class ALS private (
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
val sc = ratings.context

val numBlocks = if (this.numBlocks == -1) {
math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
math.max(sc.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
Expand Down Expand Up @@ -187,53 +189,81 @@ class ALS private (
}
}

for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model
val YtY = computeYtY(users)
val YtYb = ratings.context.broadcast(YtY)
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtYb)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
val XtX = computeYtY(products)
val XtXb = ratings.context.broadcast(XtX)
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtXb)
if (implicitPrefs) {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
users.persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, XtX)
previousUsers.unpersist()
}
} else {
for (iter <- 1 to iterations) {
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
}
}

// The last `products` will be used twice. One to generate the last `users` and the other to
// generate `productsOut`. So we cache it for better performance.
products.persist()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the final factors get materialized, both users and products are called:

val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

However, the last users depends on the last products. So the last products will be used twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

usersOut.persist()
productsOut.persist()

// Materialize usersOut and productsOut.
usersOut.count()
productsOut.count()

products.unpersist()

// Clean up.
userInLinks.unpersist()
userOutLinks.unpersist()
productInLinks.unpersist()
productOutLinks.unpersist()

new MatrixFactorizationModel(rank, usersOut, productsOut)
}

/**
* Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
* for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as
* the driver program requires `YtY` to broadcast it to the slaves
* for each user (or product), in a distributed fashion.
*
* @param factors the (block-distributed) user or product factor vectors
* @return Option[YtY] - whose value is only used in the implicit preference model
* @return YtY - whose value is only used in the implicit preference model
*/
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
if (implicitPrefs) {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
Option(YtY)
} else {
None
}
private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
val n = rank * (rank + 1) / 2
val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => {
Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L))
L
}, combOp = (L1, L2) => {
L1.addi(L2)
})
val YtY = new DoubleMatrix(rank, rank)
fillFullMatrix(LYtY, YtY)
YtY
}

/**
Expand Down Expand Up @@ -264,7 +294,7 @@ class ALS private (
/**
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
*/
def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
outLinks: RDD[(Int, OutLinkBlock)]) = {
blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
Expand Down Expand Up @@ -332,8 +362,11 @@ class ALS private (
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
}, true)
links.persist(StorageLevel.MEMORY_AND_DISK)
(links.mapValues(_._1), links.mapValues(_._2))
val inLinks = links.mapValues(_._1)
val outLinks = links.mapValues(_._2)
inLinks.persist(StorageLevel.MEMORY_AND_DISK)
outLinks.persist(StorageLevel.MEMORY_AND_DISK)
(inLinks, outLinks)
}

/**
Expand Down Expand Up @@ -365,7 +398,7 @@ class ALS private (
rank: Int,
lambda: Double,
alpha: Double,
YtY: Broadcast[Option[DoubleMatrix]])
YtY: Option[Broadcast[DoubleMatrix]])
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
Expand All @@ -388,8 +421,8 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]])
private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
Expand All @@ -416,21 +449,20 @@ class ALS private (
dspr(1.0, x, tempXtX)
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
for (i <- 0 until us.length) {
implicitPrefs match {
case false =>
userXtX(us(i)).addi(tempXtX)
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
case true =>
// Extension to the original paper to handle rs(i) < 0. confidence is a function
// of |rs(i)| instead so that it is never negative:
val confidence = 1 + alpha * abs(rs(i))
SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
// For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
// means we try to reconstruct 0. We add terms only where P = 1, so, term below
// is now only added for rs(i) > 0:
if (rs(i) > 0) {
SimpleBlas.axpy(confidence, x, userXy(us(i)))
}
if (implicitPrefs) {
// Extension to the original paper to handle rs(i) < 0. confidence is a function
// of |rs(i)| instead so that it is never negative:
val confidence = 1 + alpha * abs(rs(i))
SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i)))
// For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i)
// means we try to reconstruct 0. We add terms only where P = 1, so, term below
// is now only added for rs(i) > 0:
if (rs(i) > 0) {
SimpleBlas.axpy(confidence, x, userXy(us(i)))
}
} else {
userXtX(us(i)).addi(tempXtX)
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
}
}
}
Expand All @@ -443,9 +475,10 @@ class ALS private (
// Add regularization
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
implicitPrefs match {
case false => Solve.solvePositive(fullXtX, userXy(index)).data
case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data
if (implicitPrefs) {
Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
} else {
Solve.solvePositive(fullXtX, userXy(index)).data
}
}
}
Expand Down