Skip to content

Commit

Permalink
update paraPAVA
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jan 30, 2015
1 parent 077606b commit 35d044e
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)
val poolSubArray = input.view.slice(start, end + 1)

val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum
Expand Down Expand Up @@ -259,11 +259,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val parallelStepResult = input
.sortBy(x => (x._2, x._1))
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)

poolAdjacentViolators(parallelStepResult.collect())
.glom()
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
poolAdjacentViolators(parallelStepResult)
}
}

0 comments on commit 35d044e

Please sign in to comment.