Skip to content

Commit

Permalink
Move RDD scope util methods and logic to its own file
Browse files Browse the repository at this point in the history
Just a small code re-organization.
  • Loading branch information
Andrew Or committed Apr 22, 2015
1 parent 494d5c2 commit 6a7cdca
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 247 deletions.
44 changes: 22 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScope}
import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
Expand Down Expand Up @@ -641,7 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
@RDDScope
@RDDScoped
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
Expand All @@ -651,15 +651,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
@RDDScope
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}

/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
@RDDScope
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
Expand All @@ -670,7 +670,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@RDDScope
@RDDScoped
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
Expand Down Expand Up @@ -704,7 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
@RDDScope
@RDDScoped
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
assertNotStopped()
Expand Down Expand Up @@ -751,7 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
@RDDScope
@RDDScoped
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
assertNotStopped()
Expand Down Expand Up @@ -780,7 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
@RDDScope
@RDDScoped
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
assertNotStopped()
Expand Down Expand Up @@ -818,7 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -840,7 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand Down Expand Up @@ -876,7 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -901,13 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
@RDDScope
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -928,7 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
Expand Down Expand Up @@ -962,7 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
Expand All @@ -983,7 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
Expand All @@ -1002,7 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
@RDDScope
@RDDScoped
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
Expand Down Expand Up @@ -1030,7 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScope
@RDDScoped
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
Expand All @@ -1054,7 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
@RDDScope
@RDDScoped
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions
Expand All @@ -1064,15 +1064,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

@RDDScope
@RDDScoped
protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
@RDDScope
@RDDScoped
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
Expand All @@ -1083,7 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
@RDDScope
@RDDScoped
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
union(Seq(first) ++ rest)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RDDScope {}
public @interface RDDScoped {}
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.RDDScope
import org.apache.spark.annotation.RDDScoped

/**
* A set of asynchronous RDD actions available through an implicit conversion.
Expand All @@ -34,7 +34,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for counting the number of elements in the RDD.
*/
@RDDScope
@RDDScoped
def countAsync(): FutureAction[Long] = {
val totalCount = new AtomicLong
self.context.submitJob(
Expand All @@ -55,7 +55,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving all elements of this RDD.
*/
@RDDScope
@RDDScoped
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
Expand All @@ -65,7 +65,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving the first num elements of the RDD.
*/
@RDDScope
@RDDScoped
def takeAsync(num: Int): FutureAction[Seq[T]] = {
val f = new ComplexFutureAction[Seq[T]]

Expand Down Expand Up @@ -113,7 +113,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to all elements of this RDD.
*/
@RDDScope
@RDDScoped
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
Expand All @@ -123,7 +123,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to each partition of this RDD.
*/
@RDDScope
@RDDScoped
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
Expand Down
Loading

0 comments on commit 6a7cdca

Please sign in to comment.