diff --git a/assembly/pom.xml b/assembly/pom.xml index c65192bde64c6..4e2b773e7d2f3 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 93db0d5efda5f..0327ffa402671 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/bin/spark-submit b/bin/spark-submit index c557311b4b20e..f92d90c3a66b0 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -22,6 +22,9 @@ export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" ORIG_ARGS=("$@") +# Set COLUMNS for progress bar +export COLUMNS=`tput cols` + while (($#)); do if [ "$1" = "--deploy-mode" ]; then SPARK_SUBMIT_DEPLOY_MODE=$2 diff --git a/core/pom.xml b/core/pom.xml index 492eddda744c2..1feb00b3a7fb8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.2.0-SNAPSHOT + 1.3.0-SNAPSHOT ../pom.xml diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java index 04e2247210ecc..fd74321093658 100644 --- a/core/src/main/java/org/apache/spark/SparkStageInfo.java +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -26,6 +26,7 @@ public interface SparkStageInfo { int stageId(); int currentAttemptId(); + long submissionTime(); String name(); int numTasks(); int numActiveTasks(); diff --git a/core/src/main/java/org/apache/spark/api/java/function/package.scala b/core/src/main/java/org/apache/spark/api/java/function/package.scala index 7f91de653a64a..0f9bac7164162 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/package.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/package.scala @@ -22,4 +22,4 @@ package org.apache.spark.api.java * these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's * Java programming guide for more details. */ -package object function \ No newline at end of file +package object function diff --git a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js index badd85ed48c82..d33c5c769d683 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js +++ b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js @@ -26,13 +26,6 @@ $(function() { // Switch the class of the arrow from open to closed. $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open'); $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed'); - - // If clicking caused the metrics to expand, automatically check all options for additional - // metrics (don't trigger a click when collapsing metrics, because it leads to weird - // toggling behavior). - if (!$(additionalMetricsDiv).hasClass('collapsed')) { - $(this).parent().find('input:checkbox:not(:checked)').trigger('click'); - } }); $("input:checkbox:not(:checked)").each(function() { @@ -48,6 +41,16 @@ $(function() { stripeTables(); }); + $("#select-all-metrics").click(function() { + if (this.checked) { + // Toggle all un-checked options. + $('input:checkbox:not(:checked)').trigger('click'); + } else { + // Toggle all checked options. + $('input:checkbox:checked').trigger('click'); + } + }); + // Trigger a click on the checkbox if a user clicks the label next to it. $("span.additional-metric-title").click(function() { $(this).parent().find('input:checkbox').trigger('click'); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6b8c4eb9afe32..e95819d75a4c5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -50,7 +50,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{SparkUI, ConsoleProgressBar} import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ @@ -58,12 +58,26 @@ import org.apache.spark.util._ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * + * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before + * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. + * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ - class SparkContext(config: SparkConf) extends Logging { + // The call site where this SparkContext was constructed. + private val creationSite: CallSite = Utils.getCallSite() + + // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active + private val allowMultipleContexts: Boolean = + config.getBoolean("spark.driver.allowMultipleContexts", false) + + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having started construction. + // NOTE: this must be placed at the beginning of the SparkContext constructor. + SparkContext.markPartiallyConstructed(this, allowMultipleContexts) + // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. @@ -233,6 +247,13 @@ class SparkContext(config: SparkConf) extends Logging { val statusTracker = new SparkStatusTracker(this) + private[spark] val progressBar: Option[ConsoleProgressBar] = + if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { + Some(new ConsoleProgressBar(this)) + } else { + None + } + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { @@ -1166,27 +1187,30 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { - postApplicationEnd() - ui.foreach(_.stop()) - // Do this only if not stopped already - best case effort. - // prevent NPE if stopped more than once. - val dagSchedulerCopy = dagScheduler - dagScheduler = null - if (dagSchedulerCopy != null) { - env.metricsSystem.report() - metadataCleaner.cancel() - env.actorSystem.stop(heartbeatReceiver) - cleaner.foreach(_.stop()) - dagSchedulerCopy.stop() - taskScheduler = null - // TODO: Cache.stop()? - env.stop() - SparkEnv.set(null) - listenerBus.stop() - eventLogger.foreach(_.stop()) - logInfo("Successfully stopped SparkContext") - } else { - logInfo("SparkContext already stopped") + SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + postApplicationEnd() + ui.foreach(_.stop()) + // Do this only if not stopped already - best case effort. + // prevent NPE if stopped more than once. + val dagSchedulerCopy = dagScheduler + dagScheduler = null + if (dagSchedulerCopy != null) { + env.metricsSystem.report() + metadataCleaner.cancel() + env.actorSystem.stop(heartbeatReceiver) + cleaner.foreach(_.stop()) + dagSchedulerCopy.stop() + taskScheduler = null + // TODO: Cache.stop()? + env.stop() + SparkEnv.set(null) + listenerBus.stop() + eventLogger.foreach(_.stop()) + logInfo("Successfully stopped SparkContext") + SparkContext.clearActiveContext() + } else { + logInfo("SparkContext already stopped") + } } } @@ -1257,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging { logInfo("Starting job: " + callSite.shortForm) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) + progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } @@ -1475,6 +1500,11 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) } + + // In order to prevent multiple SparkContexts from being active at the same time, mark this + // context as having finished construction. + // NOTE: this must be placed at the end of the SparkContext constructor. + SparkContext.setActiveContext(this, allowMultipleContexts) } /** @@ -1483,6 +1513,107 @@ class SparkContext(config: SparkConf) extends Logging { */ object SparkContext extends Logging { + /** + * Lock that guards access to global variables that track SparkContext construction. + */ + private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() + + /** + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`. + * + * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private var activeContext: Option[SparkContext] = None + + /** + * Points to a partially-constructed SparkContext if some thread is in the SparkContext + * constructor, or `None` if no SparkContext is being constructed. + * + * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + */ + private var contextBeingConstructed: Option[SparkContext] = None + + /** + * Called to ensure that no other SparkContext is running in this JVM. + * + * Throws an exception if a running context is detected and logs a warning if another thread is + * constructing a SparkContext. This warning is necessary because the current locking scheme + * prevents us from reliably distinguishing between cases where another context is being + * constructed and cases where another constructor threw an exception. + */ + private def assertNoOtherContextIsRunning( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + contextBeingConstructed.foreach { otherContext => + if (otherContext ne sc) { // checks for reference equality + // Since otherContext might point to a partially-constructed context, guard against + // its creationSite field being null: + val otherContextCreationSite = + Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") + val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + + " constructor). This may indicate an error, since only one SparkContext may be" + + " running in this JVM (see SPARK-2243)." + + s" The other SparkContext was created at:\n$otherContextCreationSite" + logWarning(warnMsg) + } + + activeContext.foreach { ctx => + val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" + val exception = new SparkException(errMsg) + if (allowMultipleContexts) { + logWarning("Multiple running SparkContexts detected in the same JVM!", exception) + } else { + throw exception + } + } + } + } + } + + /** + * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is + * running. Throws an exception if a running context is detected and logs a warning if another + * thread is constructing a SparkContext. This warning is necessary because the current locking + * scheme prevents us from reliably distinguishing between cases where another context is being + * constructed and cases where another constructor threw an exception. + */ + private[spark] def markPartiallyConstructed( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + assertNoOtherContextIsRunning(sc, allowMultipleContexts) + contextBeingConstructed = Some(sc) + } + } + + /** + * Called at the end of the SparkContext constructor to ensure that no other SparkContext has + * raced with this constructor and started. + */ + private[spark] def setActiveContext( + sc: SparkContext, + allowMultipleContexts: Boolean): Unit = { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + assertNoOtherContextIsRunning(sc, allowMultipleContexts) + contextBeingConstructed = None + activeContext = Some(sc) + } + } + + /** + * Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's + * also called in unit tests to prevent a flood of warnings from test suites that don't / can't + * properly clean up their SparkContexts. + */ + private[spark] def clearActiveContext(): Unit = { + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + activeContext = None + } + } + private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" @@ -1682,6 +1813,9 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt + if (threadCount <= 0) { + throw new SparkException(s"Asked to run locally with $threadCount threads") + } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index c18d763d7ff4d..edbdda8a0bcb6 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { new SparkStageInfoImpl( stageId, info.attemptId, + info.submissionTime.getOrElse(0), info.name, info.numTasks, data.numActiveTasks, diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index 90b47c847fbca..e5c7c8d0db578 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -26,6 +26,7 @@ private class SparkJobInfoImpl ( private class SparkStageInfoImpl( val stageId: Int, val currentAttemptId: Int, + val submissionTime: Long, val name: String, val numTasks: Int, val numActiveTasks: Int, diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index d50ed32ca085c..6a6d9bf6857d3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -42,6 +42,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. + * + * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before + * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround with Closeable { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 45beb8fc8c925..b80c771d58a8f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -47,7 +47,7 @@ private[spark] class PythonRDD( pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], + broadcastVars: JList[Broadcast[Array[Array[Byte]]]], accumulator: Accumulator[JList[Array[Byte]]]) extends RDD[Array[Byte]](parent) { @@ -230,8 +230,8 @@ private[spark] class PythonRDD( if (!oldBids.contains(broadcast.id)) { // send new broadcast dataOut.writeLong(broadcast.id) - dataOut.writeInt(broadcast.value.length) - dataOut.write(broadcast.value) + dataOut.writeLong(broadcast.value.map(_.length.toLong).sum) + broadcast.value.foreach(dataOut.write) oldBids.add(broadcast.id) } } @@ -368,16 +368,24 @@ private[spark] object PythonRDD extends Logging { } } - def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = { + def readBroadcastFromFile( + sc: JavaSparkContext, + filename: String): Broadcast[Array[Array[Byte]]] = { + val size = new File(filename).length() val file = new DataInputStream(new FileInputStream(filename)) + val blockSize = 1 << 20 + val n = ((size + blockSize - 1) / blockSize).toInt + val obj = new Array[Array[Byte]](n) try { - val length = file.readInt() - val obj = new Array[Byte](length) - file.readFully(obj) - sc.broadcast(obj) + for (i <- 0 until n) { + val length = if (i < (n - 1)) blockSize else (size % blockSize).toInt + obj(i) = new Array[Byte](length) + file.readFully(obj(i)) + } } finally { file.close() } + sc.broadcast(obj) } def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) { diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 39150deab863c..2e1e52906ceeb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy +import java.net.{URI, URISyntaxException} + import scala.collection.mutable.ListBuffer import org.apache.log4j.Level @@ -73,7 +75,8 @@ private[spark] class ClientArguments(args: Array[String]) { if (!ClientArguments.isValidJarUrl(_jarUrl)) { println(s"Jar url '${_jarUrl}' is not in valid format.") - println(s"Must be a jar file path in URL format (e.g. hdfs://XX.jar, file://XX.jar)") + println(s"Must be a jar file path in URL format " + + "(e.g. hdfs://host:port/XX.jar, file:///XX.jar)") printUsageAndExit(-1) } @@ -114,5 +117,12 @@ private[spark] class ClientArguments(args: Array[String]) { } object ClientArguments { - def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar") + def isValidJarUrl(s: String): Boolean = { + try { + val uri = new URI(s) + uri.getScheme != null && uri.getPath != null && uri.getPath.endsWith(".jar") + } catch { + case _: URISyntaxException => false + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index aa3743ca7df63..d2687faad62b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -134,7 +134,7 @@ private[spark] object SparkSubmitDriverBootstrapper { override def run() = { if (process != null) { process.destroy() - sys.exit(process.waitFor()) + process.waitFor() } } }) diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index e2fc9c649925e..436dbed1730bc 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,5 +44,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.2.0-SNAPSHOT" + val SPARK_VERSION = "1.3.0-SNAPSHOT" } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala index e2c301603b4a5..8c43a559409f2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala @@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long) private[spark] class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) { - override def getPartitions: Array[Partition] = { + /** The start index of each partition. */ + @transient private val startIndices: Array[Long] = { val n = prev.partitions.size - val startIndices: Array[Long] = - if (n == 0) { - Array[Long]() - } else if (n == 1) { - Array(0L) - } else { - prev.context.runJob( - prev, - Utils.getIteratorSize _, - 0 until n - 1, // do not need to count the last partition - false - ).scanLeft(0L)(_ + _) - } + if (n == 0) { + Array[Long]() + } else if (n == 1) { + Array(0L) + } else { + prev.context.runJob( + prev, + Utils.getIteratorSize _, + 0 until n - 1, // do not need to count the last partition + allowLocal = false + ).scanLeft(0L)(_ + _) + } + } + + override def getPartitions: Array[Partition] = { firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index))) } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index 6908a59a79e60..af873034215a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -148,6 +148,7 @@ private[spark] class TachyonBlockManager( logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } } + client.close() } }) } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 6dbad5ff0518e..233d1e2b7c616 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -116,6 +116,8 @@ private[spark] class TachyonStore( case ioe: IOException => logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe) None + } finally { + is.close() } } diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala new file mode 100644 index 0000000000000..27ba9e18237b5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.util.{Timer, TimerTask} + +import org.apache.spark._ + +/** + * ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the + * status of active stages from `sc.statusTracker` periodically, the progress bar will be showed + * up after the stage has ran at least 500ms. If multiple stages run in the same time, the status + * of them will be combined together, showed in one line. + */ +private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { + + // Carrige return + val CR = '\r' + // Update period of progress bar, in milliseconds + val UPDATE_PERIOD = 200L + // Delay to show up a progress bar, in milliseconds + val FIRST_DELAY = 500L + + // The width of terminal + val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) { + sys.env.get("COLUMNS").get.toInt + } else { + 80 + } + + var lastFinishTime = 0L + var lastUpdateTime = 0L + var lastProgressBar = "" + + // Schedule a refresh thread to run periodically + private val timer = new Timer("refresh progress", true) + timer.schedule(new TimerTask{ + override def run() { + refresh() + } + }, FIRST_DELAY, UPDATE_PERIOD) + + /** + * Try to refresh the progress bar in every cycle + */ + private def refresh(): Unit = synchronized { + val now = System.currentTimeMillis() + if (now - lastFinishTime < FIRST_DELAY) { + return + } + val stageIds = sc.statusTracker.getActiveStageIds() + val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1) + .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId()) + if (stages.size > 0) { + show(now, stages.take(3)) // display at most 3 stages in same time + } + } + + /** + * Show progress bar in console. The progress bar is displayed in the next line + * after your last output, keeps overwriting itself to hold in one line. The logging will follow + * the progress bar, then progress bar will be showed in next line without overwrite logs. + */ + private def show(now: Long, stages: Seq[SparkStageInfo]) { + val width = TerminalWidth / stages.size + val bar = stages.map { s => + val total = s.numTasks() + val header = s"[Stage ${s.stageId()}:" + val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]" + val w = width - header.size - tailer.size + val bar = if (w > 0) { + val percent = w * s.numCompletedTasks() / total + (0 until w).map { i => + if (i < percent) "=" else if (i == percent) ">" else " " + }.mkString("") + } else { + "" + } + header + bar + tailer + }.mkString("") + + // only refresh if it's changed of after 1 minute (or the ssh connection will be closed + // after idle some time) + if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { + System.err.print(CR + bar) + lastUpdateTime = now + } + lastProgressBar = bar + } + + /** + * Clear the progress bar if showed. + */ + private def clear() { + if (!lastProgressBar.isEmpty) { + System.err.printf(CR + " " * TerminalWidth + CR) + lastProgressBar = "" + } + } + + /** + * Mark all the stages as finished, clear the progress bar if showed, then the progress will not + * interweave with output of jobs. + */ + def finishAll(): Unit = synchronized { + clear() + lastFinishTime = System.currentTimeMillis() + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index be81ca6ab9be7..6ba80dbc3d95a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -40,42 +40,110 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ + // Define a handful of type aliases so that data structures' types can serve as documentation. + // These type aliases are public because they're used in the types of public fields: + type JobId = Int type StageId = Int type StageAttemptId = Int + type PoolName = String + type ExecutorId = String - // How many stages to remember - val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - // How many jobs to remember - val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + // Define all of our state: + // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() val failedJobs = ListBuffer[JobUIData]() val jobIdToData = new HashMap[JobId, JobUIData] + // Stages: val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] - - // Number of completed and failed stages, may not actually equal to completedStages.size and - // failedStages.size respectively due to completedStage and failedStages only maintain the latest - // part of the stages, the earlier ones will be removed when there are too many stages for - // memory sake. + val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() + // Total of completed and failed stages that have ever been run. These may be greater than + // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than + // JobProgressListener's retention limits. var numCompletedStages = 0 var numFailedStages = 0 - // Map from pool name to a hash map (map from stage id to StageInfo). - val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() - - val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() + // Misc: + val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() + def blockManagerIds = executorIdToBlockManagerId.values.toSeq var schedulingMode: Option[SchedulingMode] = None - def blockManagerIds = executorIdToBlockManagerId.values.toSeq + // To limit the total memory usage of JobProgressListener, we only track information for a fixed + // number of non-active jobs and stages (there is no limit for active jobs and stages): + + val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + + // We can test for memory leaks by ensuring that collections that track non-active jobs and + // stages do not grow without bound and that collections for active jobs/stages eventually become + // empty once Spark is idle. Let's partition our collections into ones that should be empty + // once Spark is idle and ones that should have a hard- or soft-limited sizes. + // These methods are used by unit tests, but they're defined here so that people don't forget to + // update the tests when adding new collections. Some collections have multiple levels of + // nesting, etc, so this lets us customize our notion of "size" for each structure: + + // These collections should all be empty once Spark is idle (no active stages / jobs): + private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = { + Map( + "activeStages" -> activeStages.size, + "activeJobs" -> activeJobs.size, + "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum, + "stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum + ) + } + + // These collections should stop growing once we have run at least `spark.ui.retainedStages` + // stages and `spark.ui.retainedJobs` jobs: + private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = { + Map( + "completedJobs" -> completedJobs.size, + "failedJobs" -> failedJobs.size, + "completedStages" -> completedStages.size, + "failedStages" -> failedStages.size + ) + } + + // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to + // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings: + private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = { + Map( + "jobIdToData" -> jobIdToData.size, + "stageIdToData" -> stageIdToData.size, + "stageIdToStageInfo" -> stageIdToInfo.size + ) + } + + /** If stages is too large, remove and garbage collect old stages */ + private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { + if (stages.size > retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stages.take(toRemove).foreach { s => + stageIdToData.remove((s.stageId, s.attemptId)) + stageIdToInfo.remove(s.stageId) + } + stages.trimStart(toRemove) + } + } + + /** If jobs is too large, remove and garbage collect old jobs */ + private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { + if (jobs.size > retainedJobs) { + val toRemove = math.max(retainedJobs / 10, 1) + jobs.take(toRemove).foreach { job => + jobIdToData.remove(job.jobId) + } + jobs.trimStart(toRemove) + } + } override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { val jobGroup = for ( @@ -113,9 +181,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData + trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED case JobFailed(exception) => failedJobs += jobData + trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED } for (stageId <- jobData.stageIds) { @@ -142,11 +212,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { if (stage.failureReason.isEmpty) { completedStages += stage numCompletedStages += 1 - trimIfNecessary(completedStages) + trimStagesIfNecessary(completedStages) } else { failedStages += stage numFailedStages += 1 - trimIfNecessary(failedStages) + trimStagesIfNecessary(failedStages) } for ( @@ -163,19 +233,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } } - /** If stages is too large, remove and garbage collect old stages */ - private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { - if (stages.size > retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => - stageIdToData.remove((s.stageId, s.attemptId)) - stageIdToInfo.remove(s.stageId) - stageIdToActiveJobIds.remove(s.stageId) - } - stages.trimStart(toRemove) - } - } - /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index cd90b2358812e..40e05f86b661d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -114,6 +114,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {