Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3001][MLLIB] Improve Spearman's correlation #1917

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{Logging, HashPartitioner}
import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
import org.apache.spark.rdd.{CoGroupedRDD, RDD}
import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
import org.apache.spark.rdd.RDD

/**
* Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix
Expand All @@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
/**
* Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the
* correlation between column i and j.
*
* Input RDD[Vector] should be cached or checkpointed if possible since it would be split into
* numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
*/
override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
val indexed = X.zipWithUniqueId()

val numCols = X.first.size
if (numCols > 50) {
logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more"
+ " than 50 columns.")
}
val ranks = new Array[RDD[(Long, Double)]](numCols)

// Note: we use a for loop here instead of a while loop with a single index variable
// to avoid race condition caused by closure serialization
for (k <- 0 until numCols) {
val column = indexed.map { case (vector, index) => (vector(k), index) }
ranks(k) = getRanks(column)
// ((columnIndex, value), rowUid)
val colBased = X.zipWithUniqueId().flatMap { case (vec, uid) =>
vec.toArray.view.zipWithIndex.map { case (v, j) =>
((j, v), uid)
}
}

val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
PearsonCorrelation.computeCorrelationMatrix(ranksMat)
}

/**
* Compute the ranks for elements in the input RDD, using the average method for ties.
*
* With the average method, elements with the same value receive the same rank that's computed
* by taking the average of their positions in the sorted list.
* e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
* Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for
* ranks in the standard definition for Spearman's correlation. This does not affect the final
* results and is slightly more performant.
*
* @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
* @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
* copied from the input RDD.
*/
private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
// Get elements' positions in the sorted list for computing average rank for duplicate values
val sorted = indexed.sortByKey().zipWithIndex()

val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
// add an extra element to signify the end of the list so that flatMap can flush the last
// batch of duplicates
val end = -1L
val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, end), end))
val firstEntry = padded.next()
var lastVal = firstEntry._1._1
var firstRank = firstEntry._2.toDouble
val idBuffer = ArrayBuffer(firstEntry._1._2)
padded.flatMap { case ((v, id), rank) =>
if (v == lastVal && id != end) {
idBuffer += id
Iterator.empty
} else {
val entries = if (idBuffer.size == 1) {
Iterator((idBuffer(0), firstRank))
} else {
val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
idBuffer.map(id => (id, averageRank))
}
lastVal = v
firstRank = rank
idBuffer.clear()
idBuffer += id
entries
// global sort by (columnIndex, value)
val sorted = colBased.sortByKey()
// assign global ranks (using average ranks for tied values)
val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
var preCol = -1
var preVal = Double.NaN
var startRank = -1.0
var cachedUids = ArrayBuffer.empty[Long]
val flush: () => Iterable[(Long, (Int, Double))] = () => {
val averageRank = startRank + (cachedUids.size - 1) / 2.0
val output = cachedUids.map { uid =>
(uid, (preCol, averageRank))
}
cachedUids.clear()
output
}
iter.flatMap { case (((j, v), uid), rank) =>
// If we see a new value or cachedUids is too big, we flush ids with their average rank.
if (j != preCol || v != preVal || cachedUids.size >= 10000000) {
val output = flush()
preCol = j
preVal = v
startRank = rank
cachedUids += uid
output
} else {
cachedUids += uid
Iterator.empty
}
} ++ flush()
}
ranks
}

private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
val partitioner = new HashPartitioner(input.partitions.size)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
cogrouped.map {
case (_, values: Array[Iterable[_]]) =>
val doubles = values.asInstanceOf[Array[Iterable[Double]]]
new DenseVector(doubles.flatten.toArray)
// Replace values in the input matrix by their ranks compared with values in the same column.
// Note that shifting all ranks in a column by a constant value doesn't affect result.
val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
// sort by column index and then convert values to a vector
Vectors.dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
}
PearsonCorrelation.computeCorrelationMatrix(groupedRanks)
}
}