Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into streaming-web-ui
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
	core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
	core/src/main/scala/org/apache/spark/ui/SparkUI.scala
	core/src/main/scala/org/apache/spark/ui/WebUI.scala
	core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala
	core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala
	core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
	core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
	core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
	core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala
	core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
	core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
  • Loading branch information
tdas committed Apr 10, 2014
2 parents ee6543f + 3bd3129 commit 6de06b0
Show file tree
Hide file tree
Showing 101 changed files with 1,150 additions and 310 deletions.
8 changes: 6 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"

# Add java opts and memory settings for master, worker, executors, and repl.
# Add java opts and memory settings for master, worker, history server, executors, and repl.
case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
Expand All @@ -58,6 +58,10 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;

# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
Expand Down
7 changes: 5 additions & 2 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m

set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true

rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%

rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
Expand Down Expand Up @@ -60,7 +60,7 @@ trait Logging {
protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}
Expand Down Expand Up @@ -117,10 +117,10 @@ trait Logging {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
val classLoader = this.getClass.getClassLoader
Option(classLoader.getResource(defaultLogProps)) match {
case Some(url) =>
case Some(url) =>
PropertyConfigurator.configure(url)
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
Expand All @@ -135,4 +135,16 @@ trait Logging {
private object Logging {
@volatile private var initialized = false
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
bridgeClass.getMethod("install").invoke(null)
}
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}
}
30 changes: 24 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down Expand Up @@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
cleaner.foreach(_.start())

postEnvironmentUpdate()
postApplicationStart()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
Expand Down Expand Up @@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.addListener(listener)
}

/** The version of Spark on which this application is running. */
def version = SparkContext.SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down Expand Up @@ -930,8 +931,8 @@ class SparkContext(config: SparkConf) extends Logging {

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand All @@ -940,13 +941,14 @@ class SparkContext(config: SparkConf) extends Logging {
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
Expand Down Expand Up @@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
}

/**
* Post the application end event to all listeners immediately, rather than adding it
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
* condition exists in which the listeners may stop before this event has been propagated.
*/
private def postApplicationEnd() {
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
}

/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
Expand All @@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.0.0"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.spark.deploy

import org.apache.spark.scheduler.EventLoggingInfo

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogInfo: Option[EventLoggingInfo] = None)
val eventLogDir: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
Loading

0 comments on commit 6de06b0

Please sign in to comment.