Skip to content

Commit

Permalink
Scraps and pieces (no functionality change)
Browse files Browse the repository at this point in the history
The biggest changes would probably include
  (1) The refactoring of helper methods in StorageUtils
  (2) The renaming of SparkListenerBlockManagerLost to
      SparkListenerBlockManagerRemoved.
  (3) Rendering an empty default page if the user requests an RDD
      page for an RDD that doesn't exist (otherwise it just crashes)

The rest are mostly formatting and comments.
  • Loading branch information
andrewor14 committed Mar 19, 2014
1 parent 222adcd commit 83af656
Show file tree
Hide file tree
Showing 31 changed files with 346 additions and 345 deletions.
45 changes: 23 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,27 @@ class SparkContext(
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
Expand Down Expand Up @@ -199,27 +220,6 @@ class SparkContext(
}
executorEnvs("SPARK_USER") = sparkUser

// Start the UI before posting events to listener bus, because the UI listens for Spark events
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()

// Optionally log SparkListenerEvents
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
taskScheduler.start()
Expand Down Expand Up @@ -835,6 +835,7 @@ class SparkContext(
if (dagSchedulerCopy != null) {
metadataCleaner.cancel()
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
Expand Down Expand Up @@ -1065,7 +1066,7 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the environment update event once the task scheduler is ready. */
/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
Expand Down
28 changes: 15 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,9 @@ object SparkEnv extends Logging {
}
}

val blockManagerMaster = new BlockManagerMaster(
registerOrLookup("BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)),
conf)
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager)
Expand Down Expand Up @@ -271,23 +270,26 @@ object SparkEnv extends Logging {
("Scala Home", Properties.scalaHome)
).sorted

// Spark properties, including scheduling mode whether or not it is configured
var additionalFields = Seq[(String, String)]()
conf.getOption("spark.scheduler.mode").getOrElse {
additionalFields ++= Seq(("spark.scheduler.mode", schedulingMode))
}
val sparkProperties = conf.getAll.sorted ++ additionalFields
// Spark properties
// This includes the scheduling mode whether or not it is configured (used by SparkUI)
val schedulerMode =
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
Seq[(String, String)]()
}
val sparkProperties = (conf.getAll ++ schedulerMode).sorted

// System properties that are not java classpaths
val systemProperties = System.getProperties.iterator.toSeq
val classPathProperty = systemProperties.find { case (k, v) =>
k == "java.class.path"
}.getOrElse(("", ""))
val otherProperties = systemProperties.filter { case (k, v) =>
k != "java.class.path" && !k.startsWith("spark.")
}.sorted

// Class paths including all added jars and files
val classPathProperty = systemProperties.find { case (k, v) =>
k == "java.class.path"
}.getOrElse(("", ""))
val classPathEntries = classPathProperty._2
.split(conf.get("path.separator", ":"))
.filterNot(e => e.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
Expand All @@ -33,12 +34,12 @@ private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
var serverInfo: Option[ServerInfo] = None

private val host = Utils.localHostName()
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)
private var serverInfo: Option[ServerInfo] = None

private val handlers: Seq[ServletContextHandler] = {
master.masterMetricsSystem.getServletHandlers ++
Expand Down Expand Up @@ -71,7 +72,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be initialized before attaching SparkUIs")
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
rootHandler.addHandler(handler)
Expand All @@ -83,7 +84,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be initialized before detaching SparkUIs")
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
if (handler.isStarted) {
Expand All @@ -94,7 +95,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
}

def stop() {
assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not initialized!")
assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
serverInfo.get.server.stop()
}
}
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,15 @@ private[spark] object Worker {
actorSystem.awaitTermination()
}

def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
: (ActorSystem, Int) =
{
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.worker.ui

import java.io.File
import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
Expand All @@ -32,29 +33,30 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)

var serverInfo: Option[ServerInfo] = None
extends Logging {

val indexPage = new IndexPage(this)

val metricsHandlers = worker.metricsSystem.getServletHandlers
val timeout = AkkaUtils.askTimeout(worker.conf)

val handlers = metricsHandlers ++ Seq[ServletContextHandler](
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
createServletHandler("/log",
(request: HttpServletRequest) => log(request), worker.securityMgr),
createServletHandler("/logPage",
(request: HttpServletRequest) => logPage(request), worker.securityMgr),
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), worker.securityMgr)
)
private val host = Utils.localHostName()
private val port = requestedPort.getOrElse(
worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
private val indexPage = new IndexPage(this)
private var serverInfo: Option[ServerInfo] = None

private val handlers: Seq[ServletContextHandler] = {
worker.metricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
createServletHandler("/log",
(request: HttpServletRequest) => log(request), worker.securityMgr),
createServletHandler("/logPage",
(request: HttpServletRequest) => logPage(request), worker.securityMgr),
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), worker.securityMgr)
)
}

def bind() {
try {
Expand All @@ -69,7 +71,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

def log(request: HttpServletRequest): String = {
private def log(request: HttpServletRequest): String = {
val defaultBytes = 100 * 1024

val appId = Option(request.getParameter("appId"))
Expand All @@ -96,7 +98,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
pre + Utils.offsetBytes(path, startByte, endByte)
}

def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
val defaultBytes = 100 * 1024
val appId = Option(request.getParameter("appId"))
val executorId = Option(request.getParameter("executorId"))
Expand All @@ -117,17 +119,14 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
val logLength = file.length

val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>

val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>

val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>

val backButton =
if (startByte > 0) {
<a href={"?%s&logType=%s&offset=%s&byteLength=%s"
.format(params, logType, math.max(startByte-byteLength, 0), byteLength)}>
.format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
<button type="button" class="btn btn-default">
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
Expand All @@ -144,7 +143,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
<a href={"?%s&logType=%s&offset=%s&byteLength=%s".
format(params, logType, endByte, byteLength)}>
<button type="button" class="btn btn-default">
Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
</button>
</a>
}
Expand Down Expand Up @@ -173,29 +172,23 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
}

/** Determine the byte range for a log or log page. */
def getByteRange(path: String, offset: Option[Long], byteLength: Int)
: (Long, Long) = {
private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
val defaultBytes = 100 * 1024
val maxBytes = 1024 * 1024

val file = new File(path)
val logLength = file.length()
val getOffset = offset.getOrElse(logLength-defaultBytes)

val getOffset = offset.getOrElse(logLength - defaultBytes)
val startByte =
if (getOffset < 0) 0L
else if (getOffset > logLength) logLength
else getOffset

val logPageLength = math.min(byteLength, maxBytes)

val endByte = math.min(startByte + logPageLength, logLength)

(startByte, endByte)
}

def stop() {
assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not initialized!")
assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!")
serverInfo.get.server.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class TaskMetrics extends Serializable {
var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

/**
* Statuses of any blocks that have been updated as a result of this task.
* Storage statuses of any blocks that have been updated as a result of this task.
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class DAGScheduler(
"stageToInfos" -> stageToInfos,
"jobIdToStageIds" -> jobIdToStageIds,
"stageIdToJobIds" -> stageIdToJobIds).
foreach { case(s, t) =>
foreach { case (s, t) =>
val sizeBefore = t.size
t.clearOldValues(cleanupTime)
logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
Expand All @@ -1106,7 +1106,6 @@ class DAGScheduler(
}
metadataCleaner.cancel()
taskScheduler.stop()
listenerBus.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
}

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted) = logEvent(event)
override def onTaskStart(event: SparkListenerTaskStart) = logEvent(event)
override def onTaskGettingResult(event: SparkListenerTaskGettingResult) = logEvent(event)
override def onTaskEnd(event: SparkListenerTaskEnd) = logEvent(event)
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) = logEvent(event)
override def onStageSubmitted(event: SparkListenerStageSubmitted) =
logEvent(event)
override def onTaskStart(event: SparkListenerTaskStart) =
logEvent(event)
override def onTaskGettingResult(event: SparkListenerTaskGettingResult) =
logEvent(event)
override def onTaskEnd(event: SparkListenerTaskEnd) =
logEvent(event)
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) =
logEvent(event)

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted) =
Expand All @@ -81,7 +86,7 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
logEvent(event, flushLogger = true)
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) =
logEvent(event, flushLogger = true)
override def onBlockManagerLost(event: SparkListenerBlockManagerLost) =
override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) =
logEvent(event, flushLogger = true)
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
logEvent(event, flushLogger = true)
Expand Down
Loading

0 comments on commit 83af656

Please sign in to comment.