Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into job-page
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
  • Loading branch information
JoshRosen committed Nov 20, 2014
2 parents d69c775 + 04d462f commit 7d10b97
Show file tree
Hide file tree
Showing 115 changed files with 2,321 additions and 1,359 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
ORIG_ARGS=("$@")

# Set COLUMNS for progress bar
export COLUMNS=`tput cols`

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public interface SparkStageInfo {
int stageId();
int currentAttemptId();
long submissionTime();
String name();
int numTasks();
int numActiveTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details.
*/
package object function
package object function
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ $(function() {
// Switch the class of the arrow from open to closed.
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');

// If clicking caused the metrics to expand, automatically check all options for additional
// metrics (don't trigger a click when collapsing metrics, because it leads to weird
// toggling behavior).
if (!$(additionalMetricsDiv).hasClass('collapsed')) {
$(this).parent().find('input:checkbox:not(:checked)').trigger('click');
}
});

$("input:checkbox:not(:checked)").each(function() {
Expand All @@ -48,6 +41,16 @@ $(function() {
stripeTables();
});

$("#select-all-metrics").click(function() {
if (this.checked) {
// Toggle all un-checked options.
$('input:checkbox:not(:checked)').trigger('click');
} else {
// Toggle all checked options.
$('input:checkbox:checked').trigger('click');
}
});

// Trigger a click on the checkbox if a user clicks the label next to it.
$("span.additional-metric-title").click(function() {
$(this).parent().find('input:checkbox').trigger('click');
Expand Down
180 changes: 157 additions & 23 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,34 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/

class SparkContext(config: SparkConf) extends Logging {

// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
Expand Down Expand Up @@ -233,6 +247,13 @@ class SparkContext(config: SparkConf) extends Logging {

val statusTracker = new SparkStatusTracker(this)

private[spark] val progressBar: Option[ConsoleProgressBar] =
if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Expand Down Expand Up @@ -1166,27 +1187,30 @@ class SparkContext(config: SparkConf) extends Logging {

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
SparkContext.clearActiveContext()
} else {
logInfo("SparkContext already stopped")
}
}
}

Expand Down Expand Up @@ -1257,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging {
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1475,6 +1500,11 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
SparkContext.setActiveContext(this, allowMultipleContexts)
}

/**
Expand All @@ -1483,6 +1513,107 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

/**
* Lock that guards access to global variables that track SparkContext construction.
*/
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private var activeContext: Option[SparkContext] = None

/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private var contextBeingConstructed: Option[SparkContext] = None

/**
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private def assertNoOtherContextIsRunning(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
contextBeingConstructed.foreach { otherContext =>
if (otherContext ne sc) { // checks for reference equality
// Since otherContext might point to a partially-constructed context, guard against
// its creationSite field being null:
val otherContextCreationSite =
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" running in this JVM (see SPARK-2243)." +
s" The other SparkContext was created at:\n$otherContextCreationSite"
logWarning(warnMsg)
}

activeContext.foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (allowMultipleContexts) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
}
}
}
}

/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = Some(sc)
}
}

/**
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
*/
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext = Some(sc)
}
}

/**
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
* properly clean up their SparkContexts.
*/
private[spark] def clearActiveContext(): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
activeContext = None
}
}

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down Expand Up @@ -1682,6 +1813,9 @@ object SparkContext extends Logging {
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.getOrElse(0),
info.name,
info.numTasks,
data.numActiveTasks,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ private class SparkJobInfoImpl (
private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
val submissionTime: Long,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
*/
class JavaSparkContext(val sc: SparkContext)
extends JavaSparkContextVarargsWorkaround with Closeable {
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
broadcastVars: JList[Broadcast[Array[Array[Byte]]]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {

Expand Down Expand Up @@ -230,8 +230,8 @@ private[spark] class PythonRDD(
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
dataOut.writeLong(broadcast.value.map(_.length.toLong).sum)
broadcast.value.foreach(dataOut.write)
oldBids.add(broadcast.id)
}
}
Expand Down Expand Up @@ -368,16 +368,24 @@ private[spark] object PythonRDD extends Logging {
}
}

def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
def readBroadcastFromFile(
sc: JavaSparkContext,
filename: String): Broadcast[Array[Array[Byte]]] = {
val size = new File(filename).length()
val file = new DataInputStream(new FileInputStream(filename))
val blockSize = 1 << 20
val n = ((size + blockSize - 1) / blockSize).toInt
val obj = new Array[Array[Byte]](n)
try {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
sc.broadcast(obj)
for (i <- 0 until n) {
val length = if (i < (n - 1)) blockSize else (size % blockSize).toInt
obj(i) = new Array[Byte](length)
file.readFully(obj(i))
}
} finally {
file.close()
}
sc.broadcast(obj)
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
Expand Down
Loading

0 comments on commit 7d10b97

Please sign in to comment.