From a9ed4f9e563a6b4ba4a351f0170da53b3a4c973f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 16 Apr 2015 17:46:19 -0700 Subject: [PATCH] Add a few missing scopes to certain RDD methods --- .../main/scala/org/apache/spark/rdd/RDD.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8766659283962..47d3af47f4122 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -317,7 +317,9 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(): RDD[T] = distinct(partitions.length) + def distinct(): RDD[T] = withScope { + distinct(partitions.length) + } /** * Return a new RDD that has exactly numPartitions partitions. @@ -429,7 +431,7 @@ abstract class RDD[T: ClassTag]( def takeSample( withReplacement: Boolean, num: Int, - seed: Long = Utils.random.nextLong): Array[T] = { + seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 if (num < 0) { @@ -487,7 +489,9 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - def ++(other: RDD[T]): RDD[T] = this.union(other) + def ++(other: RDD[T]): RDD[T] = withScope { + this.union(other) + } /** * Return this RDD sorted by the given key function. @@ -567,8 +571,9 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy[K](f, defaultPartitioner(this)) + } /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements @@ -579,8 +584,11 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K]( + f: T => K, + numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy(f, new HashPartitioner(numPartitions)) + } /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements @@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag]( mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) - }.foreach(_ => {}) + } } /**