Skip to content

Commit

Permalink
Decouple page rendering from loading files from disk
Browse files Browse the repository at this point in the history
... by making disk accesses asynchronous. This is a performance and
UX win in case we're loading a huge file.
  • Loading branch information
andrewor14 committed Mar 31, 2014
1 parent 1b2f391 commit 0670743
Showing 1 changed file with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.net.URI
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler
Expand Down Expand Up @@ -81,7 +84,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
}

/**
* Check for any updates to event logs in the base directory.
* Asynchronously check for any updates to event logs in the base directory.
*
* If a new finished application is found, the server renders the associated SparkUI
* from the application's event logs, attaches this UI to itself, and stores metadata
Expand All @@ -93,27 +96,32 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
def checkForLogs() {
if (logCheckReady) {
lastLogCheck = System.currentTimeMillis
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
if (!appIdToInfo.contains(appId)) {
maybeRenderUI(appId, path, lastUpdated)
val asyncCheck = future {
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
if (!appIdToInfo.contains(appId)) {
maybeRenderUI(appId, path, lastUpdated)
}
}
}

// Remove any outdated SparkUIs
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
// Remove any outdated SparkUIs
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
}
}
}
asyncCheck.onFailure { case t =>
logError("Unable to synchronize HistoryServer with files on disk: ", t)
}
}
}

Expand Down

0 comments on commit 0670743

Please sign in to comment.