Skip to content

Commit

Permalink
First complete implementation of HistoryServer (only for finished apps)
Browse files Browse the repository at this point in the history
This involves a change in Spark's event log format. All event logs are
now prefixed with EVENT_LOG_. If compression is used, the logger creates
a special empty file prefixed with COMPRESSION_CODEC_ that indicates which
codec is used. After the application finishes, the logger logs a special
empty file named APPLICATION_COMPLETE.

The ReplayListenerBus is now responsible for parsing all of the above
file formats. In this commit, we establish a one-to-one mapping between
ReplayListenerBus and event logging applications. The semantics of the
ReplayListenerBus is further clarified (e.g. replay is not allowed
before starting, and can only be called once).

This commit also adds a control mechanism for the frequency at which
HistoryServer accesses the disk to check for log updates. This enforces
a minimum interval of N seconds between two checks, where N is arbitrarily
chosen to be 5.
  • Loading branch information
andrewor14 committed Mar 22, 2014
1 parent 7584418 commit 60bc6d5
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 151 deletions.
5 changes: 1 addition & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,12 @@ class SparkContext(
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.spark.deploy

import org.apache.spark.scheduler.EventLoggingInfo

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogInfo: Option[EventLoggingInfo] = None)
val eventLogDir: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
private val securityManager = new SecurityManager(conf)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheck = -1L

private val handlers = Seq[ServletContextHandler](
createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/",
Expand All @@ -74,81 +77,106 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
checkForLogs()
}

/** Parse app ID from the given log path. */
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort

/**
* Check for any updated event logs.
* Check for any updates to event logs in the base directory.
*
* If a new finished application is found, the server renders the associated SparkUI
* from the application's event logs, attaches this UI to itself, and stores metadata
* information for this application.
*
* If a new application is found, render the associated SparkUI and remember it.
* If an existing application is updated, re-render the associated SparkUI.
* If an existing application is removed, remove the associated SparkUI.
* If the logs for an existing finished application are no longer found, remove all
* associated information and detach the SparkUI.
*/
def checkForLogs() {
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Render any missing or outdated SparkUI
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) logFiles.map(_.getModificationTime).max else dir.getModificationTime
}
if (!appIdToInfo.contains(appId) || appIdToInfo(appId).lastUpdated < lastUpdated) {
maybeRenderUI(appId, path, lastUpdated)
if (logCheckReady) {
lastLogCheck = System.currentTimeMillis
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
if (!appIdToInfo.contains(appId)) {
maybeRenderUI(appId, path, lastUpdated)
}
}
}

// Remove any outdated SparkUIs
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
// Remove any outdated SparkUIs
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
}
}
}
}

/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
/**
* Render a new SparkUI from the event logs if the associated application is finished.
*
* HistoryServer looks for a special file that indicates application completion in the given
* directory. If this file exists, the associated application is regarded to be complete, in
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
*/
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
val replayBus = new ReplayListenerBus(conf)
val appListener = new ApplicationListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/%s".format(appId))

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay(logPath)
if (success) {
attachUI(ui)
if (!appListener.started) {
logWarning("Application has event logs but has not started: %s".format(appId))
val replayBus = new ReplayListenerBus(logPath)
replayBus.start()

// If the application completion file is found
if (replayBus.isApplicationComplete) {
val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId))
val appListener = new ApplicationListener
replayBus.addListener(appListener)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay()
if (success) {
attachUI(ui)
val appName = if (appListener.applicationStarted) appListener.appName else appId
ui.setAppName("%s (history)".format(appName))
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
appIdToInfo(appId) = info
}
val appName = appListener.appName
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)

// If the UI already exists, terminate it and replace it
appIdToInfo.remove(appId).foreach { info => detachUI(info.ui) }
appIdToInfo(appId) = info

// Use mnemonic original app name rather than app ID
val originalAppName = "%s (history)".format(appName)
ui.setAppName(originalAppName)
} else {
logWarning("Skipping incomplete application: %s".format(logPath))
}
replayBus.stop()
}

/** Parse app ID from the given log path. */
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort

/** Return when this directory is last modified. */
private def getModificationTime(dir: FileStatus): Long = {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) {
logFiles.map(_.getModificationTime).max
} else {
dir.getModificationTime
}
}

/** Return whether the last log check has happened sufficiently long ago. */
private def logCheckReady: Boolean = {
System.currentTimeMillis - lastLogCheck > HistoryServer.UPDATE_INTERVAL_SECONDS * 1000
}
}

object HistoryServer {
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR

// Minimum interval between each check for logs, which requires a disk access
val UPDATE_INTERVAL_SECONDS = 5

def main(argStrings: Array[String]) {
val conf = new SparkConf
val args = new HistoryServerArguments(argStrings, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@ private[spark] class IndexPage(parent: HistoryServer) {
private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

def render(request: HttpServletRequest): Seq[Node] = {
// Check if logs have been updated
parent.checkForLogs()

// Populate app table, with most recently modified app first
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)

val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
<h4>Applications</h4> {appTable}
<br></br>
<h4>Finished Applications</h4> {appTable}
</ul>
</div>
</div>
Expand All @@ -67,7 +66,6 @@ private[spark] class IndexPage(parent: HistoryServer) {
val duration = if (difference > 0) DeployWebUI.formatDuration(difference) else "---"
val logDirectory = parent.getAppId(info.logPath)
val lastUpdated = dateFmt.format(new Date(info.lastUpdated))

<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
Expand Down
30 changes: 6 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ private[spark] class Master(

override def postStop() {
webUi.stop()
appIdToUI.values.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down Expand Up @@ -622,10 +621,7 @@ private[spark] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui =>
ui.stop()
webUi.detachUI(ui)
}
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
Expand Down Expand Up @@ -663,28 +659,14 @@ private[spark] class Master(
*/
def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val appName = app.desc.name
val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None }
val eventLogDir = eventLogInfo.logDir
val eventCompressionCodec = eventLogInfo.compressionCodec
val appConf = new SparkConf
eventCompressionCodec.foreach { codec =>
appConf.set("spark.eventLog.compress", "true")
appConf.set("spark.io.compression.codec", codec)
}
val replayBus = new ReplayListenerBus(appConf)
val ui = new SparkUI(
appConf,
replayBus,
"%s (finished)".format(appName),
"/history/%s".format(app.id))
val eventLogDir = app.desc.eventLogDir.getOrElse { return None }
val replayBus = new ReplayListenerBus(eventLogDir)
val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id))

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay(eventLogDir)
if (!success) {
ui.stop()
None
} else Some(ui)
val success = replayBus.replay()
if (success) Some(ui) else None
}

/** Generate a new app ID given a app's submission date */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ private[spark] class ApplicationListener extends SparkListener {
var startTime = -1L
var endTime = -1L

def started = startTime != -1
def applicationStarted = startTime != -1

def finished = endTime != -1
def applicationFinished = endTime != -1

def duration: Long = {
def applicationDuration: Long = {
val difference = endTime - startTime
if (started && finished && difference > 0) difference else -1L
if (applicationStarted && applicationFinished && difference > 0) difference else -1L
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger}
private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
extends SparkListener with Logging {

import EventLoggingListener._

private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
Expand All @@ -46,16 +48,19 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
private val logger =
new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)

// Information needed to replay the events logged by this listener later
val info = {
val compressionCodec = if (shouldCompress) {
Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
} else None
EventLoggingInfo(logDir, compressionCodec)
/**
* Begin logging events. If compression is used, log a file that indicates which compression
* library is used.
*/
def start() {
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(LOG_PREFIX + logger.fileIndex)
}

logInfo("Logging events to %s".format(logDir))

/** Log the event as JSON */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
Expand Down Expand Up @@ -95,8 +100,36 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
logEvent(event, flushLogger = true)

def stop() = logger.stop()
/**
* Stop logging events. In addition, create an empty special file to indicate application
* completion.
*/
def stop() = {
logger.newFile(APPLICATION_COMPLETE)
logger.stop()
}
}

// If compression is not enabled, compressionCodec is None
private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])
private[spark] object EventLoggingListener {
val LOG_PREFIX = "EVENT_LOG_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

def isEventLogFile(fileName: String): Boolean = {
fileName.contains(LOG_PREFIX)
}

def isCompressionCodecFile(fileName: String): Boolean = {
fileName.contains(COMPRESSION_CODEC_PREFIX)
}

def isApplicationCompleteFile(fileName: String): Boolean = {
fileName == APPLICATION_COMPLETE
}

def parseCompressionCodec(fileName: String): String = {
if (isCompressionCodecFile(fileName)) {
fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
} else ""
}
}
Loading

0 comments on commit 60bc6d5

Please sign in to comment.