From bc885b7295ade58f1e0c3159d4e5a2cd7658ef3b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 28 Apr 2015 10:10:55 -0700 Subject: [PATCH] Review feedback. --- .../deploy/history/ApplicationHistoryProvider.scala | 7 +++---- .../spark/deploy/history/FsHistoryProvider.scala | 13 ++++++------- .../apache/spark/deploy/history/HistoryPage.scala | 12 ++++++------ .../apache/spark/deploy/history/HistoryServer.scala | 6 +++--- .../deploy/history/FsHistoryProviderSuite.scala | 13 +++++++------ .../spark/deploy/history/HistoryServerSuite.scala | 2 +- 6 files changed, 26 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 9291d66e14cd4..6a5011af17458 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.history import org.apache.spark.ui.SparkUI private[history] case class ApplicationAttemptInfo( - attemptId: String, + attemptId: Option[String], startTime: Long, endTime: Long, lastUpdated: Long, @@ -45,11 +45,10 @@ private[history] abstract class ApplicationHistoryProvider { * Returns the Spark UI for a specific application. * * @param appId The application ID. - * @param attemptId The application attempt ID for apps with multiple attempts (or an empty - * string for apps with a single attempt). + * @param attemptId The application attempt ID (or None if there is no attempt ID). * @return The application's UI, or None if application is not found. */ - def getAppUI(appId: String, attemptId: String): Option[SparkUI] + def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a4d3308efae01..60918ec44360f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -142,7 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values - override def getAppUI(appId: String, attemptId: String): Option[SparkUI] = { + override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { try { applications.get(appId).flatMap { appInfo => val attempts = appInfo.attempts.filter(_.attemptId == attemptId) @@ -307,15 +307,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. applications.values.foreach { app => - val toClean = app.attempts.filter(shouldClean) + val (toClean, toRetain) = app.attempts.partition(shouldClean) attemptsToClean ++= toClean if (toClean.isEmpty) { appsToRetain += (app.id -> app) - } else if (toClean.size < app.attempts.size) { + } else if (toRetain.nonEmpty) { appsToRetain += (app.id -> - new FsApplicationHistoryInfo(app.id, app.name, - app.attempts.filter(!shouldClean(_)).toList)) + new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) } } @@ -397,7 +396,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId.getOrElse(""), + appListener.appAttemptId, appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, @@ -487,7 +486,7 @@ private class FsApplicationAttemptInfo( val logPath: String, val name: String, val appId: String, - attemptId: String, + attemptId: Option[String], startTime: Long, endTime: Long, lastUpdated: Long, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 9595c89529280..1af233f09f385 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -42,7 +42,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val allAppsSize = allApps.size val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 - val appsToShow = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize) val actualPage = (actualFirst / pageSize) + 1 val last = Math.min(actualFirst + pageSize, allAppsSize) - 1 @@ -167,9 +167,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val duration = if (attempt.endTime > 0) { UIUtils.formatDuration(attempt.endTime - attempt.startTime) - } else { - "-" - } + } else { + "-" + } val lastUpdated = UIUtils.formatDate(attempt.lastUpdated) { @@ -189,9 +189,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } { if (renderAttemptIdColumn) { - if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) { + if (info.attempts.size > 1 && attempt.attemptId.isDefined) { - {attempt.attemptId} + {attempt.attemptId.get} } else {   } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 1d97c6db4d007..f2883d4e2671b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -55,7 +55,7 @@ class HistoryServer( val parts = key.split("/") require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") val ui = provider - .getAppUI(parts(0), if (parts.length > 1) parts(1) else "") + .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) .getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui @@ -222,8 +222,8 @@ object HistoryServer extends Logging { } } - private[history] def getAttemptURI(appId: String, attemptId: String): String = { - val attemptSuffix = if (!attemptId.isEmpty) s"/${attemptId}" else "" + private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = { + val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("") s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}" } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 32a0e891f7026..443b52380092e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -111,7 +111,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long, user: String, completed: Boolean): ApplicationHistoryInfo = { ApplicationHistoryInfo(id, name, - List(ApplicationAttemptInfo("", start, end, lastMod, user, completed))) + List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) } list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, @@ -128,8 +128,9 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers // Make sure the UI can be rendered. list.foreach { case info => - val appUi = provider.getAppUI(info.id, "") + val appUi = provider.getAppUI(info.id, None) appUi should not be null + appUi should not be None } } } @@ -245,7 +246,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be ("attempt2") + list.head.attempts.head.attemptId should be (Some("attempt2")) } val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false) @@ -259,7 +260,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers list should not be (null) list.size should be (1) list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be ("attempt2") + list.head.attempts.head.attemptId should be (Some("attempt2")) } val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false) @@ -272,7 +273,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers list.size should be (2) list.head.attempts.size should be (1) list.last.attempts.size should be (2) - list.head.attempts.head.attemptId should be ("attempt1") + list.head.attempts.head.attemptId should be (Some("attempt1")) list.foreach { case app => app.attempts.foreach { attempt => @@ -315,7 +316,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (1) - list.head.attempts.head.attemptId should be ("attempt2") + list.head.attempts.head.attemptId should be (Some("attempt2")) } assert(!log1.exists()) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 420e382a29cbf..71ba9c18257b8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -37,7 +37,7 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar { val ui = mock[SparkUI] val link = "/history/app1" val info = new ApplicationHistoryInfo("app1", "app1", - List(ApplicationAttemptInfo("", 0, 2, 1, "xxx", true))) + List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true))) when(historyServer.getApplicationList()).thenReturn(Seq(info)) when(ui.basePath).thenReturn(link) when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())