diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cd6de28b9796e..d10b681848d99 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScope} +import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump} @@ -641,7 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. */ - @RDDScope + @RDDScoped def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) @@ -651,7 +651,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * This method is identical to `parallelize`. */ - @RDDScope + @RDDScoped def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { parallelize(seq, numSlices) } @@ -659,7 +659,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - @RDDScope + @RDDScoped def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap @@ -670,7 +670,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - @RDDScope + @RDDScoped def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], @@ -704,7 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - @RDDScope + @RDDScoped def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = { assertNotStopped() @@ -751,7 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note Small files are preferred; very large files may cause bad performance. */ @Experimental - @RDDScope + @RDDScoped def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = { assertNotStopped() @@ -780,7 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return An RDD of data with values, represented as byte arrays */ @Experimental - @RDDScope + @RDDScoped def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration) : RDD[Array[Byte]] = { assertNotStopped() @@ -818,7 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def hadoopRDD[K, V]( conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], @@ -840,7 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], @@ -876,7 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { @@ -901,13 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = hadoopFile[K, V, F](path, defaultMinPartitions) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ - @RDDScope + @RDDScoped def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { @@ -928,7 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, fClass: Class[F], @@ -962,7 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], @@ -983,7 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -1002,7 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. * */ - @RDDScope + @RDDScoped def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) @@ -1030,7 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScope + @RDDScoped def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], @@ -1054,7 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * though the nice thing about it is that there's very little effort required to save arbitrary * objects. */ - @RDDScope + @RDDScoped def objectFile[T: ClassTag]( path: String, minPartitions: Int = defaultMinPartitions @@ -1064,7 +1064,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } - @RDDScope + @RDDScoped protected[spark] def checkpointFile[T: ClassTag]( path: String ): RDD[T] = { @@ -1072,7 +1072,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs. */ - @RDDScope + @RDDScoped def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { val partitioners = rdds.flatMap(_.partitioner).toSet if (partitioners.size == 1) { @@ -1083,7 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs passed as variable-length arguments. */ - @RDDScope + @RDDScoped def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = union(Seq(first) ++ rest) diff --git a/core/src/main/scala/org/apache/spark/annotation/RDDScope.java b/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java similarity index 97% rename from core/src/main/scala/org/apache/spark/annotation/RDDScope.java rename to core/src/main/scala/org/apache/spark/annotation/RDDScoped.java index 71cc0ba3b5462..e161a09cc57a2 100644 --- a/core/src/main/scala/org/apache/spark/annotation/RDDScope.java +++ b/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java @@ -25,4 +25,4 @@ */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) -public @interface RDDScope {} +public @interface RDDScoped {} diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 67db6908bd612..de6e0b99f08af 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} -import org.apache.spark.annotation.RDDScope +import org.apache.spark.annotation.RDDScoped /** * A set of asynchronous RDD actions available through an implicit conversion. @@ -34,7 +34,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for counting the number of elements in the RDD. */ - @RDDScope + @RDDScoped def countAsync(): FutureAction[Long] = { val totalCount = new AtomicLong self.context.submitJob( @@ -55,7 +55,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving all elements of this RDD. */ - @RDDScope + @RDDScoped def collectAsync(): FutureAction[Seq[T]] = { val results = new Array[Array[T]](self.partitions.length) self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length), @@ -65,7 +65,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving the first num elements of the RDD. */ - @RDDScope + @RDDScoped def takeAsync(num: Int): FutureAction[Seq[T]] = { val f = new ComplexFutureAction[Seq[T]] @@ -113,7 +113,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to all elements of this RDD. */ - @RDDScope + @RDDScoped def foreachAsync(f: T => Unit): FutureAction[Unit] = { val cleanF = self.context.clean(f) self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length), @@ -123,7 +123,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to each partition of this RDD. */ - @RDDScope + @RDDScoped def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length), (index, data) => Unit, Unit) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 75be1ee35b72e..65f0948f65b56 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner -import org.apache.spark.annotation.{Experimental, RDDScope} +import org.apache.spark.annotation.{Experimental, RDDScoped} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataWriteMethod, OutputMetrics} import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil @@ -70,7 +70,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - @RDDScope + @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, @@ -106,7 +106,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - @RDDScope + @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, @@ -123,7 +123,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScope + @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key @@ -146,7 +146,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScope + @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) @@ -161,7 +161,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScope + @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) @@ -172,7 +172,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScope + @RDDScoped def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) @@ -191,7 +191,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScope + @RDDScoped def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } @@ -201,7 +201,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScope + @RDDScoped def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, defaultPartitioner(self))(func) } @@ -219,7 +219,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param seed seed for the random number generator * @return RDD containing the sampled subset */ - @RDDScope + @RDDScoped def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)] = { @@ -251,7 +251,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @return RDD containing the sampled subset */ @Experimental - @RDDScope + @RDDScoped def sampleByKeyExact( withReplacement: Boolean, fractions: Map[K, Double], @@ -272,7 +272,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - @RDDScope + @RDDScoped def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } @@ -282,7 +282,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - @RDDScope + @RDDScoped def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func) } @@ -293,7 +293,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ - @RDDScope + @RDDScoped def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } @@ -303,7 +303,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * immediately to the master as a Map. This will also perform the merging locally on each mapper * before sending results to a reducer, similarly to a "combiner" in MapReduce. */ - @RDDScope + @RDDScoped def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { if (keyClass.isArray) { @@ -332,7 +332,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** Alias for reduceByKeyLocally */ @deprecated("Use reduceByKeyLocally", "1.0.0") - @RDDScope + @RDDScoped def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) /** @@ -343,7 +343,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - @RDDScope + @RDDScoped def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap /** @@ -352,7 +352,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * not finish within a timeout. */ @Experimental - @RDDScope + @RDDScoped def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[Map[K, BoundedDouble]] = { self.map(_._1).countByValueApprox(timeout, confidence) @@ -378,7 +378,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental - @RDDScope + @RDDScoped def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { require(p >= 4, s"p ($p) must be >= 4") require(sp <= 32, s"sp ($sp) must be <= 32") @@ -411,7 +411,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD */ - @RDDScope + @RDDScoped def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt @@ -430,7 +430,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD */ - @RDDScope + @RDDScoped def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -445,7 +445,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - @RDDScope + @RDDScoped def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } @@ -463,7 +463,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - @RDDScope + @RDDScoped def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted @@ -488,7 +488,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - @RDDScope + @RDDScoped def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) } @@ -496,7 +496,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Return a copy of the RDD partitioned using the specified partitioner. */ - @RDDScope + @RDDScoped def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") @@ -513,7 +513,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - @RDDScope + @RDDScoped def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) @@ -526,7 +526,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - @RDDScope + @RDDScoped def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { @@ -543,7 +543,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - @RDDScope + @RDDScoped def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { pair => @@ -563,7 +563,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements * in `this` have key k. Uses the given Partitioner to partition the output RDD. */ - @RDDScope + @RDDScoped def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { @@ -577,7 +577,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - @RDDScope + @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) @@ -593,7 +593,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScope + @RDDScoped def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) } @@ -603,7 +603,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - @RDDScope + @RDDScoped def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } @@ -613,7 +613,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - @RDDScope + @RDDScoped def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } @@ -624,7 +624,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - @RDDScope + @RDDScoped def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -635,7 +635,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - @RDDScope + @RDDScoped def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -646,7 +646,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - @RDDScope + @RDDScoped def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -657,7 +657,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - @RDDScope + @RDDScoped def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -671,7 +671,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ * parallelism level. */ - @RDDScope + @RDDScoped def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { fullOuterJoin(other, defaultPartitioner(self, other)) } @@ -684,7 +684,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. */ - @RDDScope + @RDDScoped def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { fullOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -695,7 +695,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * one value per key is preserved in the map returned) */ - @RDDScope + @RDDScoped def collectAsMap(): Map[K, V] = { val data = self.collect() val map = new mutable.HashMap[K, V] @@ -708,7 +708,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - @RDDScope + @RDDScoped def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, @@ -720,7 +720,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - @RDDScope + @RDDScoped def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, @@ -735,7 +735,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], @@ -757,7 +757,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScope + @RDDScoped def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { @@ -773,7 +773,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { @@ -792,7 +792,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) @@ -802,7 +802,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScope + @RDDScoped def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -811,7 +811,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) @@ -821,7 +821,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScope + @RDDScoped def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -830,7 +830,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) @@ -841,7 +841,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScope + @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], @@ -851,20 +851,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** Alias for cogroup. */ - @RDDScope + @RDDScoped def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - @RDDScope + @RDDScoped def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** Alias for cogroup. */ - @RDDScope + @RDDScoped def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) @@ -876,17 +876,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - @RDDScope + @RDDScoped def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length))) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - @RDDScope + @RDDScoped def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = subtractByKey(other, new HashPartitioner(numPartitions)) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - @RDDScope + @RDDScoped def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = new SubtractedRDD[K, V, W](self, other, p) @@ -894,7 +894,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */ - @RDDScope + @RDDScoped def lookup(key: K): Seq[V] = { self.partitioner match { case Some(p) => @@ -917,7 +917,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - @RDDScope + @RDDScoped def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -927,7 +927,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * supporting the key and value types K and V in this RDD. Compress the result with the * supplied codec. */ - @RDDScope + @RDDScoped def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { val runtimeClass = fm.runtimeClass @@ -938,7 +938,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - @RDDScope + @RDDScoped def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -947,7 +947,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - @RDDScope + @RDDScoped def saveAsNewAPIHadoopFile( path: String, keyClass: Class[_], @@ -969,7 +969,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. Compress with the supplied codec. */ - @RDDScope + @RDDScoped def saveAsHadoopFile( path: String, keyClass: Class[_], @@ -984,7 +984,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - @RDDScope + @RDDScoped def saveAsHadoopFile( path: String, keyClass: Class[_], @@ -1024,7 +1024,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - @RDDScope + @RDDScoped def saveAsNewAPIHadoopDataset(conf: Configuration) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf @@ -1092,7 +1092,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - @RDDScope + @RDDScoped def saveAsHadoopDataset(conf: JobConf) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e3369983730f5..e87757aec72ca 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ import org.apache.spark.Partitioner._ -import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScope} +import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped} import org.apache.spark.api.java.JavaRDD import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -283,7 +283,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to all elements of this RDD. */ - @RDDScope + @RDDScoped def map[U: ClassTag](f: T => U): RDD[U] = { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) @@ -293,7 +293,7 @@ abstract class RDD[T: ClassTag]( * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - @RDDScope + @RDDScoped def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) @@ -302,7 +302,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing only the elements that satisfy a predicate. */ - @RDDScope + @RDDScoped def filter(f: T => Boolean): RDD[T] = { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( @@ -314,14 +314,14 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - @RDDScope + @RDDScoped def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) /** * Return a new RDD containing the distinct elements in this RDD. */ - @RDDScope + @RDDScoped def distinct(): RDD[T] = distinct(partitions.length) /** @@ -333,7 +333,7 @@ abstract class RDD[T: ClassTag]( * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ - @RDDScope + @RDDScoped def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { coalesce(numPartitions, shuffle = true) } @@ -358,7 +358,7 @@ abstract class RDD[T: ClassTag]( * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ - @RDDScope + @RDDScoped def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = { if (shuffle) { @@ -392,7 +392,7 @@ abstract class RDD[T: ClassTag]( * with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */ - @RDDScope + @RDDScoped def sample( withReplacement: Boolean, fraction: Double, @@ -413,7 +413,7 @@ abstract class RDD[T: ClassTag]( * * @return split RDDs in an array */ - @RDDScope + @RDDScoped def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) @@ -431,7 +431,7 @@ abstract class RDD[T: ClassTag]( * @param seed seed for the random number generator * @return sample of specified size in an array */ - @RDDScope + @RDDScoped def takeSample( withReplacement: Boolean, num: Int, @@ -481,7 +481,7 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - @RDDScope + @RDDScoped def union(other: RDD[T]): RDD[T] = { if (partitioner.isDefined && other.partitioner == partitioner) { new PartitionerAwareUnionRDD(sc, Array(this, other)) @@ -494,13 +494,13 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - @RDDScope + @RDDScoped def ++(other: RDD[T]): RDD[T] = this.union(other) /** * Return this RDD sorted by the given key function. */ - @RDDScope + @RDDScoped def sortBy[K]( f: (T) => K, ascending: Boolean = true, @@ -516,7 +516,7 @@ abstract class RDD[T: ClassTag]( * * Note that this method performs a shuffle internally. */ - @RDDScope + @RDDScoped def intersection(other: RDD[T]): RDD[T] = { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } @@ -531,7 +531,7 @@ abstract class RDD[T: ClassTag]( * * @param partitioner Partitioner to use for the resulting RDD */ - @RDDScope + @RDDScoped def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null) : RDD[T] = { this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner) @@ -547,7 +547,7 @@ abstract class RDD[T: ClassTag]( * * @param numPartitions How many partitions to use in the resulting RDD */ - @RDDScope + @RDDScoped def intersection(other: RDD[T], numPartitions: Int): RDD[T] = { intersection(other, new HashPartitioner(numPartitions)) } @@ -555,7 +555,7 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD created by coalescing all elements within each partition into an array. */ - @RDDScope + @RDDScoped def glom(): RDD[Array[T]] = { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) } @@ -564,7 +564,7 @@ abstract class RDD[T: ClassTag]( * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ - @RDDScope + @RDDScoped def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) /** @@ -576,7 +576,7 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScope + @RDDScoped def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) @@ -589,7 +589,7 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScope + @RDDScoped def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) @@ -602,7 +602,7 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScope + @RDDScoped def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])] = { val cleanF = sc.clean(f) @@ -612,13 +612,13 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD created by piping elements to a forked external process. */ - @RDDScope + @RDDScoped def pipe(command: String): RDD[String] = new PipedRDD(this, command) /** * Return an RDD created by piping elements to a forked external process. */ - @RDDScope + @RDDScoped def pipe(command: String, env: Map[String, String]): RDD[String] = new PipedRDD(this, command, env) @@ -641,7 +641,7 @@ abstract class RDD[T: ClassTag]( * @param separateWorkingDir Use separate working directories for each task. * @return the result RDD */ - @RDDScope + @RDDScoped def pipe( command: Seq[String], env: Map[String, String] = Map(), @@ -660,7 +660,7 @@ abstract class RDD[T: ClassTag]( * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ - @RDDScope + @RDDScoped def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) @@ -674,7 +674,7 @@ abstract class RDD[T: ClassTag]( * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ - @RDDScope + @RDDScoped def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) @@ -690,7 +690,7 @@ abstract class RDD[T: ClassTag]( * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ @DeveloperApi - @RDDScope + @RDDScoped @deprecated("use TaskContext.get", "1.2.0") def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], @@ -704,7 +704,7 @@ abstract class RDD[T: ClassTag]( * of the original partition. */ @deprecated("use mapPartitionsWithIndex", "0.7.0") - @RDDScope + @RDDScoped def mapPartitionsWithSplit[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { mapPartitionsWithIndex(f, preservesPartitioning) @@ -716,7 +716,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex", "1.0.0") - @RDDScope + @RDDScoped def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = { @@ -732,7 +732,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") - @RDDScope + @RDDScoped def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = { @@ -748,7 +748,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") - @RDDScope + @RDDScoped def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) @@ -762,7 +762,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") - @RDDScope + @RDDScoped def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) @@ -776,7 +776,7 @@ abstract class RDD[T: ClassTag]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - @RDDScope + @RDDScoped def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { @@ -797,37 +797,37 @@ abstract class RDD[T: ClassTag]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = zipPartitions(rdd2, preservesPartitioning = false)(f) - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f) - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) - @RDDScope + @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = @@ -839,7 +839,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to all elements of this RDD. */ - @RDDScope + @RDDScoped def foreach(f: T => Unit): Unit = { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) @@ -848,7 +848,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to each partition of this RDD. */ - @RDDScope + @RDDScoped def foreachPartition(f: Iterator[T] => Unit): Unit = { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) @@ -857,7 +857,7 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. */ - @RDDScope + @RDDScoped def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) @@ -868,7 +868,7 @@ abstract class RDD[T: ClassTag]( * * The iterator will consume as much memory as the largest partition in this RDD. */ - @RDDScope + @RDDScoped def toLocalIterator: Iterator[T] = { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head @@ -880,13 +880,13 @@ abstract class RDD[T: ClassTag]( * Return an array that contains all of the elements in this RDD. */ @deprecated("use collect", "1.0.0") - @RDDScope + @RDDScoped def toArray(): Array[T] = collect() /** * Return an RDD that contains all matching values by applying `f`. */ - @RDDScope + @RDDScoped def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { filter(f.isDefinedAt).map(f) } @@ -897,21 +897,21 @@ abstract class RDD[T: ClassTag]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - @RDDScope + @RDDScoped def subtract(other: RDD[T]): RDD[T] = subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) /** * Return an RDD with the elements from `this` that are not in `other`. */ - @RDDScope + @RDDScoped def subtract(other: RDD[T], numPartitions: Int): RDD[T] = subtract(other, new HashPartitioner(numPartitions)) /** * Return an RDD with the elements from `this` that are not in `other`. */ - @RDDScope + @RDDScoped def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is @@ -934,7 +934,7 @@ abstract class RDD[T: ClassTag]( * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ - @RDDScope + @RDDScoped def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -964,7 +964,7 @@ abstract class RDD[T: ClassTag]( * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#reduce]] */ - @RDDScope + @RDDScoped def treeReduce(f: (T, T) => T, depth: Int = 2): T = { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = context.clean(f) @@ -997,7 +997,7 @@ abstract class RDD[T: ClassTag]( * modify t1 and return it as its result value to avoid object allocation; however, it should not * modify t2. */ - @RDDScope + @RDDScoped def fold(zeroValue: T)(op: (T, T) => T): T = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) @@ -1016,7 +1016,7 @@ abstract class RDD[T: ClassTag]( * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ - @RDDScope + @RDDScoped def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) @@ -1034,7 +1034,7 @@ abstract class RDD[T: ClassTag]( * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#aggregate]] */ - @RDDScope + @RDDScoped def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, @@ -1071,7 +1071,7 @@ abstract class RDD[T: ClassTag]( * within a timeout, even if not all tasks have finished. */ @Experimental - @RDDScope + @RDDScoped def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L @@ -1093,7 +1093,7 @@ abstract class RDD[T: ClassTag]( * To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - @RDDScope + @RDDScoped def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = { map(value => (value, null)).countByKey() } @@ -1103,7 +1103,7 @@ abstract class RDD[T: ClassTag]( * Approximate version of countByValue(). */ @Experimental - @RDDScope + @RDDScoped def countByValueApprox(timeout: Long, confidence: Double = 0.95) (implicit ord: Ordering[T] = null) : PartialResult[Map[T, BoundedDouble]] = @@ -1140,7 +1140,7 @@ abstract class RDD[T: ClassTag]( * If `sp` equals 0, the sparse representation is skipped. */ @Experimental - @RDDScope + @RDDScoped def countApproxDistinct(p: Int, sp: Int): Long = { require(p >= 4, s"p ($p) must be at least 4") require(sp <= 32, s"sp ($sp) cannot be greater than 32") @@ -1167,7 +1167,7 @@ abstract class RDD[T: ClassTag]( * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - @RDDScope + @RDDScoped def countApproxDistinct(relativeSD: Double = 0.05): Long = { val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(p, 0) @@ -1186,7 +1186,7 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - @RDDScope + @RDDScoped def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this) /** @@ -1199,7 +1199,7 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - @RDDScope + @RDDScoped def zipWithUniqueId(): RDD[(T, Long)] = { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => @@ -1217,7 +1217,7 @@ abstract class RDD[T: ClassTag]( * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ - @RDDScope + @RDDScoped def take(num: Int): Array[T] = { if (num == 0) { return new Array[T](0) @@ -1257,7 +1257,7 @@ abstract class RDD[T: ClassTag]( /** * Return the first element in this RDD. */ - @RDDScope + @RDDScoped def first(): T = take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") @@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - @RDDScope + @RDDScoped def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse) /** @@ -1297,7 +1297,7 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - @RDDScope + @RDDScoped def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { if (num == 0) { Array.empty @@ -1323,14 +1323,14 @@ abstract class RDD[T: ClassTag]( * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ - @RDDScope + @RDDScoped def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max) /** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the minimum element of the RDD * */ - @RDDScope + @RDDScoped def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min) /** @@ -1341,13 +1341,13 @@ abstract class RDD[T: ClassTag]( * @return true if and only if the RDD contains no elements at all. Note that an RDD * may be empty even when it has at least 1 partition. */ - @RDDScope + @RDDScoped def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0 /** * Save this RDD as a text file, using string representations of elements. */ - @RDDScope + @RDDScoped def saveAsTextFile(path: String): Unit = { // https://issues.apache.org/jira/browse/SPARK-2075 // @@ -1375,7 +1375,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a compressed text file, using string representations of elements. */ - @RDDScope + @RDDScoped def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = { // https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] @@ -1394,7 +1394,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - @RDDScope + @RDDScoped def saveAsObjectFile(path: String): Unit = { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) @@ -1404,13 +1404,13 @@ abstract class RDD[T: ClassTag]( /** * Creates tuples of the elements in this RDD by applying `f`. */ - @RDDScope + @RDDScoped def keyBy[K](f: T => K): RDD[(K, T)] = { map(x => (f(x), x)) } /** A private method for tests, to look at the contents of each partition */ - @RDDScope + @RDDScoped private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } @@ -1451,7 +1451,7 @@ abstract class RDD[T: ClassTag]( @transient private[spark] val creationSite = sc.getCallSite() /** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */ - @transient private[spark] val scope = RDD.getScope.orNull + @transient private[spark] val scope = RDDScope.getScope.orNull private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") @@ -1622,81 +1622,6 @@ abstract class RDD[T: ClassTag]( */ object RDD { - /** - * - */ - private[spark] val SCOPE_NESTING_DELIMITER = ";" - private[spark] val SCOPE_NAME_DELIMITER = "_" - - /** - * - */ - private val classesWithScopeMethods = Set( - "org.apache.spark.SparkContext", - "org.apache.spark.rdd.RDD", - "org.apache.spark.rdd.PairRDDFunctions", - "org.apache.spark.rdd.AsyncRDDActions" - ) - - /** - * - */ - private val scopeIdCounter = new AtomicInteger(0) - - - /** - * - */ - private def makeScopeId(name: String): String = { - name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") + - SCOPE_NAME_DELIMITER + scopeIdCounter.getAndIncrement - } - - /** - * - */ - private[spark] def getScope: Option[String] = { - val rddScopeNames = Thread.currentThread.getStackTrace - // Avoid reflecting on all classes in the stack trace - .filter { ste => classesWithScopeMethods.contains(ste.getClassName) } - // Return the corresponding method if it has the @RDDScope annotation - .flatMap { ste => - // Note that this is an approximation since we match the method only by name - // Unfortunate we cannot be more precise because the stack trace does not - // include parameter information - Class.forName(ste.getClassName).getDeclaredMethods.find { m => - m.getName == ste.getMethodName && - m.getDeclaredAnnotations.exists { a => - a.annotationType() == classOf[RDDScope] - } - } - } - // Use the method name as the scope name for now - .map { m => m.getName } - - // It is common for such methods to internally invoke other methods with the same name - // (e.g. union, reduceByKey). Here we remove adjacent duplicates such that the scope - // chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). This is - // surprisingly difficult to express even in Scala. - var prev: String = null - val dedupedRddScopeNames = rddScopeNames.flatMap { n => - if (n != prev) { - prev = n - Some(n) - } else { - None - } - } - - // Chain scope IDs to denote hierarchy, with outermost scope first - val rddScopeIds = dedupedRddScopeNames.map(makeScopeId) - if (rddScopeIds.nonEmpty) { - Some(rddScopeIds.reverse.mkString(SCOPE_NESTING_DELIMITER)) - } else { - None - } - } - // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala new file mode 100644 index 0000000000000..f466970b1b883 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala @@ -0,0 +1,86 @@ +package org.apache.spark.rdd + +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.annotation.RDDScoped + +/** + * + */ +private[spark] object RDDScope { + + /** + * + */ + val SCOPE_NESTING_DELIMITER = ";" + val SCOPE_NAME_DELIMITER = "_" + + /** + * + */ + private val classesWithScopeMethods = Set( + "org.apache.spark.SparkContext", + "org.apache.spark.rdd.RDD", + "org.apache.spark.rdd.PairRDDFunctions", + "org.apache.spark.rdd.AsyncRDDActions" + ) + + /** + * + */ + private val scopeIdCounter = new AtomicInteger(0) + + + /** + * + */ + private def makeScopeId(name: String): String = { + name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") + + SCOPE_NAME_DELIMITER + scopeIdCounter.getAndIncrement + } + + /** + * + */ + private[spark] def getScope: Option[String] = { + val rddScopeNames = Thread.currentThread.getStackTrace + // Avoid reflecting on all classes in the stack trace + .filter { ste => classesWithScopeMethods.contains(ste.getClassName) } + // Return the corresponding method if it has the @RDDScoped annotation + .flatMap { ste => + // Note that this is an approximation since we match the method only by name + // Unfortunate we cannot be more precise because the stack trace does not + // include parameter information + Class.forName(ste.getClassName).getDeclaredMethods.find { m => + m.getName == ste.getMethodName && + m.getDeclaredAnnotations.exists { a => + a.annotationType() == classOf[RDDScoped] + } + } + } + // Use the method name as the scope name for now + .map { m => m.getName } + + // It is common for such methods to internally invoke other methods with the same name + // (e.g. union, reduceByKey). Here we remove adjacent duplicates such that the scope + // chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). This is + // surprisingly difficult to express even in Scala. + var prev: String = null + val dedupedRddScopeNames = rddScopeNames.flatMap { n => + if (n != prev) { + prev = n + Some(n) + } else { + None + } + } + + // Chain scope IDs to denote hierarchy, with outermost scope first + val rddScopeIds = dedupedRddScopeNames.map(makeScopeId) + if (rddScopeIds.nonEmpty) { + Some(rddScopeIds.reverse.mkString(SCOPE_NESTING_DELIMITER)) + } else { + None + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala index 71831891677e8..4ff3b918aa15d 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.storage.RDDInfo -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.RDDScope /** */ private[ui] case class VizNode(id: Int, name: String, isCached: Boolean = false) @@ -35,7 +35,7 @@ private[ui] case class VizEdge(fromId: Int, toId: Int) private[ui] class VizScope(val id: String) { private val _childrenNodes = new ArrayBuffer[VizNode] private val _childrenScopes = new ArrayBuffer[VizScope] - val name: String = id.split(RDD.SCOPE_NAME_DELIMITER).head + val name: String = id.split(RDDScope.SCOPE_NAME_DELIMITER).head def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq def childrenScopes: Seq[VizScope] = _childrenScopes.iterator.toSeq @@ -77,7 +77,7 @@ private[ui] object VizGraph { } else { // Attach children scopes and nodes to each scope var previousScope: VizScope = null - val scopeIt = rdd.scope.split(RDD.SCOPE_NESTING_DELIMITER).iterator + val scopeIt = rdd.scope.split(RDDScope.SCOPE_NESTING_DELIMITER).iterator while (scopeIt.hasNext) { val scopeId = scopeIt.next() val scope = scopes.getOrElseUpdate(scopeId, new VizScope(scopeId))