Skip to content

Commit

Permalink
Re-implement scopes through annotations instead
Browse files Browse the repository at this point in the history
The previous "working" implementation frequently ran into
NotSerializableExceptions. Why? ClosureCleaner doesn't like
closures being wrapped in other closures, and these closures
are simply not cleaned (details are intentionally omitted here).

This commit reimplements scoping through annotations. All methods
that should be scoped are now annotated with @RDDScope. Then, on
creation, each RDD derives its scope from the stack trace, similar
to how it derives its call site. This is the cleanest approach
that bypasses NotSerializableExceptions with least significant
limitations.
  • Loading branch information
Andrew Or committed Apr 22, 2015
1 parent f22f337 commit 9fac6f3
Show file tree
Hide file tree
Showing 9 changed files with 502 additions and 277 deletions.
2 changes: 1 addition & 1 deletion core/src/main/resources/org/apache/spark/ui/static/viz.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScope}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
Expand Down Expand Up @@ -641,6 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
@RDDScope
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
Expand All @@ -650,13 +651,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
@RDDScope
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}

/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
@RDDScope
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
Expand All @@ -667,10 +670,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@RDDScope
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
minPartitions).map(pair => pair._2.toString)
}

/**
Expand Down Expand Up @@ -700,6 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
@RDDScope
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
assertNotStopped()
Expand Down Expand Up @@ -746,6 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
@RDDScope
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
assertNotStopped()
Expand Down Expand Up @@ -774,6 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
@RDDScope
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
assertNotStopped()
Expand Down Expand Up @@ -811,6 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -832,6 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -850,7 +859,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
minPartitions).setName(s"HadoopRDD[$path]")
}

/**
Expand All @@ -867,6 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -891,11 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
@RDDScope
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -916,6 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
Expand Down Expand Up @@ -949,6 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
Expand All @@ -969,6 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
Expand All @@ -987,6 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
@RDDScope
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
Expand Down Expand Up @@ -1014,6 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
Expand All @@ -1037,6 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
@RDDScope
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions
Expand All @@ -1046,13 +1064,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

@RDDScope
protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
@RDDScope
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
Expand All @@ -1063,6 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
@RDDScope
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
union(Seq(first) ++ rest)

Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/annotation/RDDScope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.annotation;

import java.lang.annotation.*;

/**
* Blah blah blah blah blah.
* This should really be private and not displayed on the docs.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RDDScope {}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.RDDScope

/**
* A set of asynchronous RDD actions available through an implicit conversion.
Expand All @@ -33,6 +34,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for counting the number of elements in the RDD.
*/
@RDDScope
def countAsync(): FutureAction[Long] = {
val totalCount = new AtomicLong
self.context.submitJob(
Expand All @@ -53,6 +55,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving all elements of this RDD.
*/
@RDDScope
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
Expand All @@ -62,6 +65,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving the first num elements of the RDD.
*/
@RDDScope
def takeAsync(num: Int): FutureAction[Seq[T]] = {
val f = new ComplexFutureAction[Seq[T]]

Expand Down Expand Up @@ -109,6 +113,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to all elements of this RDD.
*/
@RDDScope
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
Expand All @@ -118,6 +123,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to each partition of this RDD.
*/
@RDDScope
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
Expand Down
Loading

0 comments on commit 9fac6f3

Please sign in to comment.