Skip to content

Commit

Permalink
Merge github.com:apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 9, 2014
2 parents 6edf052 + bde9cc1 commit 567474a
Show file tree
Hide file tree
Showing 126 changed files with 969 additions and 252 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* :: DeveloperApi ::
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
@DeveloperApi
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer

/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable


/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
Expand All @@ -41,13 +46,15 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
Expand All @@ -61,20 +68,24 @@ class ShuffleDependency[K, V](


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -84,9 +87,11 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

Expand Down Expand Up @@ -148,10 +153,12 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
@DeveloperApi
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
Expand Down
86 changes: 73 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
Expand All @@ -48,22 +49,35 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* :: DeveloperApi ::
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {

@DeveloperApi
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor that allows setting common Spark properties directly
Expand Down Expand Up @@ -93,10 +107,45 @@ class SparkContext(
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
this.preferredNodeLocationData = preferredNodeLocationData
}

// NOTE: The below constructors could be consolidated using default arguments. Due to
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
// Until we have a good workaround for that bug the constructors remain broken out.

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())

private[spark] val conf = config.clone()

/**
Expand Down Expand Up @@ -186,7 +235,7 @@ class SparkContext(
jars.foreach(addJar)
}

def warnSparkMem(value: String): String = {
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
Expand Down Expand Up @@ -651,6 +700,9 @@ class SparkContext(
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

// Methods for creating shared variables

/**
Expand Down Expand Up @@ -714,6 +766,11 @@ class SparkContext(
postEnvironmentUpdate()
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
Expand Down Expand Up @@ -1020,8 +1077,10 @@ class SparkContext(
}

/**
* :: DeveloperApi ::
* Run a job that can return approximate results.
*/
@DeveloperApi
def runApproximateJob[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
Expand All @@ -1039,6 +1098,7 @@ class SparkContext(
/**
* Submit a job for execution and return a FutureJob holding the result.
*/
@Experimental
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.Properties
import akka.actor._
import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
Expand All @@ -35,13 +36,18 @@ import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
* in a future release.
*/
class SparkEnv private[spark] (
@DeveloperApi
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ package org.apache.spark

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*/
@DeveloperApi
class TaskContext(
val stageId: Int,
val partitionId: Int,
Expand Down
Loading

0 comments on commit 567474a

Please sign in to comment.