Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-1413
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 9, 2014
2 parents 3c98dc4 + bde9cc1 commit fa69dcf
Show file tree
Hide file tree
Showing 189 changed files with 3,141 additions and 1,641 deletions.
2 changes: 1 addition & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ work
.*\.q
golden
test.out/*
.*iml
.*iml
20 changes: 12 additions & 8 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,31 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
})
compute(vs.next,
c.hasNext match {
case true => Some(c.next)
case false => None
}
)

numMsgs += newMsgs.size
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}
}.persist(storageLevel)

// Force evaluation of processed RDD for accurate performance measurements
Expand Down
2 changes: 1 addition & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fi
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* :: DeveloperApi ::
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
@DeveloperApi
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer

/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable


/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
Expand All @@ -41,13 +46,15 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
Expand All @@ -61,20 +68,24 @@ class ShuffleDependency[K, V](


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -84,9 +87,11 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

Expand Down Expand Up @@ -141,17 +146,19 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
private def awaitResult(): Try[T] = {
jobWaiter.awaitResult() match {
case JobSucceeded => scala.util.Success(resultFunc)
case JobFailed(e: Exception, _) => scala.util.Failure(e)
case JobFailed(e: Exception) => scala.util.Failure(e)
}
}
}


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder
import org.slf4j.bridge.SLF4JBridgeHandler

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
@DeveloperApi
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
Expand Down
86 changes: 73 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
Expand All @@ -48,22 +49,35 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* :: DeveloperApi ::
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {

@DeveloperApi
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor that allows setting common Spark properties directly
Expand Down Expand Up @@ -93,10 +107,45 @@ class SparkContext(
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
this.preferredNodeLocationData = preferredNodeLocationData
}

// NOTE: The below constructors could be consolidated using default arguments. Due to
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
// Until we have a good workaround for that bug the constructors remain broken out.

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())

private[spark] val conf = config.clone()

/**
Expand Down Expand Up @@ -189,7 +238,7 @@ class SparkContext(
jars.foreach(addJar)
}

def warnSparkMem(value: String): String = {
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
Expand Down Expand Up @@ -653,6 +702,9 @@ class SparkContext(
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

// Methods for creating shared variables

/**
Expand Down Expand Up @@ -716,6 +768,11 @@ class SparkContext(
postEnvironmentUpdate()
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
Expand Down Expand Up @@ -1021,8 +1078,10 @@ class SparkContext(
}

/**
* :: DeveloperApi ::
* Run a job that can return approximate results.
*/
@DeveloperApi
def runApproximateJob[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
Expand All @@ -1040,6 +1099,7 @@ class SparkContext(
/**
* Submit a job for execution and return a FutureJob holding the result.
*/
@Experimental
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
Expand Down
Loading

0 comments on commit fa69dcf

Please sign in to comment.