Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into reference-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 16, 2014
2 parents ce9daf5 + cc36487 commit d0f7195
Show file tree
Hide file tree
Showing 117 changed files with 1,870 additions and 509 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
.*avsc
.*txt
.*json
.*data
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell
18 changes: 9 additions & 9 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,30 @@ while (($#)); do
case $1 in
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=($1); shift
CLI_ARGS+=("$1"); shift
CLI_ARGS+=("$1"); shift
;;

-e)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=(\"$1\"); shift
CLI_ARGS+=("$1"); shift
CLI_ARGS+=("$1"); shift
;;

-s | --silent)
CLI_ARGS+=($1); shift
CLI_ARGS+=("$1"); shift
;;

-v | --verbose)
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
CLI_ARGS+=($1)
SUBMISSION_ARGS+=($1); shift
CLI_ARGS+=("$1")
SUBMISSION_ARGS+=("$1"); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
SUBMISSION_ARGS+=("$1"); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_ARGS[@]}" spark-internal "${CLI_ARGS[@]}"
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
// introduces an expensive read fence.
if (context.interrupted) {
if (context.isInterrupted) {
throw new TaskKilledException
} else {
delegate.hasNext
Expand Down
63 changes: 56 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.TaskCompletionListener


/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* @param runningLocally whether the task is running locally in the driver JVM
* @param taskMetrics performance metrics of the task
*/
@DeveloperApi
class TaskContext(
Expand All @@ -39,27 +47,68 @@ class TaskContext(
def splitId = partitionId

// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

// Whether the corresponding task has been killed.
@volatile var interrupted: Boolean = false
@volatile private var interrupted: Boolean = false

// Whether the task has completed.
@volatile private var completed: Boolean = false

/** Checks whether the task has completed. */
def isCompleted: Boolean = completed

// Whether the task has completed, before the onCompleteCallbacks are executed.
@volatile var completed: Boolean = false
/** Checks whether the task has been killed. */
def isInterrupted: Boolean = interrupted

// TODO: Also track whether the task has completed successfully or with exception.

/**
* Add a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
}

/**
* Add a listener in the form of a Scala closure to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(f: TaskContext => Unit): this.type = {
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
}
this
}

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
* @param f Callback function.
*/
@deprecated("use addTaskCompletionListener", "1.1.0")
def addOnCompleteCallback(f: () => Unit) {
onCompleteCallbacks += f
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f()
}
}

def executeOnCompleteCallbacks() {
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach { _() }
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
}

/** Marks the task for interruption, i.e. cancellation. */
private[spark] def markInterrupted(): Unit = {
interrupted = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
Expand All @@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] class PythonRDD(
// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

context.addOnCompleteCallback { () =>
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()

// Cleanup the worker socket. This will also cause the Python worker to exit.
Expand Down Expand Up @@ -137,7 +137,7 @@ private[spark] class PythonRDD(
}
} catch {

case e: Exception if context.interrupted =>
case e: Exception if context.isInterrupted =>
logDebug("Exception thrown after task interruption", e)
throw new TaskKilledException

Expand Down Expand Up @@ -176,7 +176,7 @@ private[spark] class PythonRDD(

/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
def shutdownOnTaskCompletion() {
assert(context.completed)
assert(context.isCompleted)
this.interrupt()
}

Expand Down Expand Up @@ -209,7 +209,7 @@ private[spark] class PythonRDD(
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
dataOut.flush()
} catch {
case e: Exception if context.completed || context.interrupted =>
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)

case e: Exception =>
Expand All @@ -235,10 +235,10 @@ private[spark] class PythonRDD(
override def run() {
// Kill the worker if it is interrupted, checking until task completion.
// TODO: This has a race condition if interruption occurs, as completed may still become true.
while (!context.interrupted && !context.completed) {
while (!context.isInterrupted && !context.isCompleted) {
Thread.sleep(2000)
}
if (!context.completed) {
if (!context.isCompleted) {
try {
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
Expand Down Expand Up @@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
Expand Down Expand Up @@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
Expand Down Expand Up @@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
Expand Down Expand Up @@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
} yield (Class.forName(k), Class.forName(v))
} yield (Utils.classForName(k), Utils.classForName(v))
}

private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
Expand Down Expand Up @@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}

Expand Down Expand Up @@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ private[spark] class ApplicationInfo(

init()

private def readObject(in: java.io.ObjectInputStream): Unit = {
in.defaultReadObject()
init()
}

private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source

class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.%s.%s".format("application", application.desc.name,
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())

metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source

private[spark] class MasterSource(val master: Master) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "master"
override val metricRegistry = new MetricRegistry()
override val sourceName = "master"

// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source

private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
override val sourceName = "worker"
override val metricRegistry = new MetricRegistry()

metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// Set the classloader for serializer
env.serializer.setDefaultClassLoader(urlClassLoader)

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
})
}

val metricRegistry = new MetricRegistry()
override val metricRegistry = new MetricRegistry()

// TODO: It would be nice to pass the application name here
val sourceName = "executor.%s".format(executorId)
override val sourceName = "executor.%s".format(executorId)

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}

private[spark] class JvmSource extends Source {
val sourceName = "jvm"
val metricRegistry = new MetricRegistry()
override val sourceName = "jvm"
override val metricRegistry = new MetricRegistry()

val gcMetricSet = new GarbageCollectorMetricSet
val memGaugeSet = new MemoryUsageGaugeSet

metricRegistry.registerAll(gcMetricSet)
metricRegistry.registerAll(memGaugeSet)
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private[spark] object CheckpointRDD extends Logging {
val deserializeStream = serializer.deserializeStream(fileInputStream)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => deserializeStream.close())
context.addTaskCompletionListener(context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class HadoopRDD[K, V](
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

Expand Down
Loading

0 comments on commit d0f7195

Please sign in to comment.