Skip to content

Commit

Permalink
added max and min to StatCounter output, updated doc
Browse files Browse the repository at this point in the history
  • Loading branch information
dwmclary committed Mar 15, 2014
1 parent a5c13b0 commit 1a97558
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 9 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @return the maximum of the RDD
* */
def max(comp: Comparator[T]): T = {
import scala.collection.JavaConversions._
rdd.max()(Ordering.comparatorToOrdering(comp))
}

/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @return the minimum of the RDD
* */
def min(comp: Comparator[T]): T = {
import scala.collection.JavaConversions._
rdd.min()(Ordering.comparatorToOrdering(comp))
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -958,9 +958,17 @@ abstract class RDD[T: ClassTag](
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)

def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)}
/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
* @return the maximum element of the RDD
* */
def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)

def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)}
/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)

/**
* Save this RDD as a text file, using string representations of elements.
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
private var max_v: Double = 0 // Running max of our values
private var min_v: Double = 0 // Running min of our values
private var max_v: Double = Double(-Infinity) // Running max of our values
private var min_v: Double = Double(Infinity) // Running min of our values

merge(values)

Expand Down Expand Up @@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
def sampleStdev: Double = math.sqrt(sampleVariance)

override def toString: String = {
"(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev)
"(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)
assert(abs(4.0 - stats.max) === 0)
assert(abs(-1.0 - stats.max) === 0)
assert(stats.max === 4.0)
assert(stats.min === -1.0)

// Add other tests here for classes that should be able to handle empty partitions correctly
}
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def max(self):
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
43.0
"""
return self.stats().max()
return self.reduce(max)

def min(self):
"""
Expand All @@ -552,7 +552,7 @@ def min(self):
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
1.0
"""
return self.stats().min()
return self.reduce(min)

def sum(self):
"""
Expand Down

0 comments on commit 1a97558

Please sign in to comment.