Skip to content

Commit

Permalink
Make app attempts part of the history server model.
Browse files Browse the repository at this point in the history
This change explicitly models app attempts in the history server.
An app is now a collection of app attempts, instead of the logic
to match apps to app attempts being implicit in the rendering code.

This makes the rendering code a lot simpler, since it doesn't need
to do any fancy processing of the app list to figure out what to show.
  • Loading branch information
Marcelo Vanzin committed Apr 8, 2015
1 parent 5fd5c6f commit 3245aa2
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[history] case class ApplicationHistoryInfo(
id: String,
private[history] case class ApplicationAttemptInfo(
attemptId: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false,
appAttemptId: String = "")
completed: Boolean = false)

private[history] case class ApplicationHistoryInfo(
id: String,
attempts: List[ApplicationAttemptInfo])

private[history] abstract class ApplicationHistoryProvider {

Expand All @@ -42,9 +45,11 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @param attemptId The application attempt ID for apps with multiple attempts (or an empty
* string for apps with a single attempt).
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): Option[SparkUI]
def getAppUI(appId: String, attemptId: String): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,31 +143,34 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values

override def getAppUI(appId: String): Option[SparkUI] = {
override def getAppUI(appId: String, attemptId: String): Option[SparkUI] = {
try {
applications.get(appId).map { info =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}
applications.get(appId).flatMap { info =>
val attempts = info.attempts.filter(_.attemptId == attemptId)
attempts.headOption.map { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}

val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
ui.setAppName(s"${attempt.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}
}
} catch {
case e: FileNotFoundException => None
Expand Down Expand Up @@ -225,7 +228,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
val newApps = logs.flatMap { fileStatus =>
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Expand All @@ -237,41 +240,52 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
}.toSeq.sortWith(compareAppInfo)

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (newApps.nonEmpty) {
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id) ||
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
val key =
if (info.appAttemptId.equals("")) {
info.id
} else {
info.id + "_" + info.appAttemptId
}
mergedApps += (key -> info)
}
}
}

val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
if (newAttempts.isEmpty) {
return
}

// Build a map containing all apps that contain new attempts. The app information in this map
// contains both the new app attempt, and those that were already loaded in the existing apps
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
newAttempts.foreach { attempt =>
val appInfo = applications.get(attempt.appId)
.orElse(newAppMap.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
new FsApplicationHistoryInfo(attempt.appId, attempts.sortWith(compareAttemptInfo))
}
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, List(attempt)))
newAppMap(attempt.appId) = appInfo
}

// Merge the new app list with the existing one, maintaining the expected ordering (descending
// end time). Maintaining the order is important to avoid having to sort the list every time
// there is a request for the log list.
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id)) {
mergedApps += (info.id -> info)
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)
}

applications = mergedApps
val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = mergedApps
}

/**
Expand All @@ -288,7 +302,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
if (now - info.attempts.head.lastUpdated <= maxAge) {
appsToRetain += (info.id -> info)
}
}
Expand Down Expand Up @@ -321,14 +335,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
compareAttemptInfo(i1.attempts.head, i2.attempts.head)
}

private def compareAttemptInfo(
i1: FsApplicationAttemptInfo,
i2: FsApplicationAttemptInfo): Boolean = {
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
}

/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -341,16 +361,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appListener = new ApplicationEventListener
bus.addListener(appListener)
bus.replay(logInput, logPath.toString)
new FsApplicationHistoryInfo(
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId.getOrElse(""),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog),
appListener.appAttemptId.getOrElse(""))
isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -437,15 +457,20 @@ private object FsHistoryProvider {
val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}

private class FsApplicationHistoryInfo(
private class FsApplicationAttemptInfo(
val logPath: String,
id: String,
val appId: String,
attemptId: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true,
appAttemptId: String ="")
extends ApplicationHistoryInfo(
id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId)
completed: Boolean = true)
extends ApplicationAttemptInfo(
attemptId, name, startTime, endTime, lastUpdated, sparkUser, completed)

private class FsApplicationHistoryInfo(
id: String,
override val attempts: List[FsApplicationAttemptInfo])
extends ApplicationHistoryInfo(id, attempts)
Loading

0 comments on commit 3245aa2

Please sign in to comment.