diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a5abc6f3186ff..44a797d16c996 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -477,11 +477,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Returns the maximum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the maximum of the RDD + * */ def max(comp: Comparator[T]): T = { import scala.collection.JavaConversions._ rdd.max()(Ordering.comparatorToOrdering(comp)) } + /** + * Returns the minimum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the minimum of the RDD + * */ def min(comp: Comparator[T]): T = { import scala.collection.JavaConversions._ rdd.min()(Ordering.comparatorToOrdering(comp)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf2851839017c..f7baab83cc170 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,9 +958,17 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) - def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)} + /** + * Returns the max of this RDD as defined by the implicit Ordering[T]. + * @return the maximum element of the RDD + * */ + def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max) - def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)} + /** + * Returns the min of this RDD as defined by the implicit Ordering[T]. + * @return the minimum element of the RDD + * */ + def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min) /** * Save this RDD as a text file, using string representations of elements. diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index ed9dd4334e6fe..752e96c2434f7 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) - private var max_v: Double = 0 // Running max of our values - private var min_v: Double = 0 // Running min of our values + private var max_v: Double = Double(-Infinity) // Running max of our values + private var min_v: Double = Double(Infinity) // Running min of our values merge(values) @@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sampleStdev: Double = math.sqrt(sampleVariance) override def toString: String = { - "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev) + "(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min) } } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 567d6be108749..0ef07a180a468 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -171,8 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(abs(6.0/2 - rdd.mean) < 0.01) assert(abs(1.0 - rdd.variance) < 0.01) assert(abs(1.0 - rdd.stdev) < 0.01) - assert(abs(4.0 - stats.max) === 0) - assert(abs(-1.0 - stats.max) === 0) + assert(stats.max === 4.0) + assert(stats.min === -1.0) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5f4b87abc6d25..ec547c6d29c4c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -543,7 +543,7 @@ def max(self): >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 43.0 """ - return self.stats().max() + return self.reduce(max) def min(self): """ @@ -552,7 +552,7 @@ def min(self): >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 1.0 """ - return self.stats().min() + return self.reduce(min) def sum(self): """