diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 9291d66e14cd4..6a5011af17458 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.history
import org.apache.spark.ui.SparkUI
private[history] case class ApplicationAttemptInfo(
- attemptId: String,
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
@@ -45,11 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
- * @param attemptId The application attempt ID for apps with multiple attempts (or an empty
- * string for apps with a single attempt).
+ * @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
- def getAppUI(appId: String, attemptId: String): Option[SparkUI]
+ def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index a4d3308efae01..60918ec44360f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -142,7 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
- override def getAppUI(appId: String, attemptId: String): Option[SparkUI] = {
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
val attempts = appInfo.attempts.filter(_.attemptId == attemptId)
@@ -307,15 +307,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { app =>
- val toClean = app.attempts.filter(shouldClean)
+ val (toClean, toRetain) = app.attempts.partition(shouldClean)
attemptsToClean ++= toClean
if (toClean.isEmpty) {
appsToRetain += (app.id -> app)
- } else if (toClean.size < app.attempts.size) {
+ } else if (toRetain.nonEmpty) {
appsToRetain += (app.id ->
- new FsApplicationHistoryInfo(app.id, app.name,
- app.attempts.filter(!shouldClean(_)).toList))
+ new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}
@@ -397,7 +396,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
- appListener.appAttemptId.getOrElse(""),
+ appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
@@ -487,7 +486,7 @@ private class FsApplicationAttemptInfo(
val logPath: String,
val name: String,
val appId: String,
- attemptId: String,
+ attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 9595c89529280..1af233f09f385 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -42,7 +42,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val allAppsSize = allApps.size
val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
- val appsToShow = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))
+ val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize)
val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
@@ -167,9 +167,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val duration =
if (attempt.endTime > 0) {
UIUtils.formatDuration(attempt.endTime - attempt.startTime)
- } else {
- "-"
- }
+ } else {
+ "-"
+ }
val lastUpdated = UIUtils.formatDate(attempt.lastUpdated)
{
@@ -189,9 +189,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
}
{
if (renderAttemptIdColumn) {
- if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) {
+ if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
- {attempt.attemptId} |
+ {attempt.attemptId.get}
} else {
|
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1d97c6db4d007..f2883d4e2671b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -55,7 +55,7 @@ class HistoryServer(
val parts = key.split("/")
require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
val ui = provider
- .getAppUI(parts(0), if (parts.length > 1) parts(1) else "")
+ .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
.getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
@@ -222,8 +222,8 @@ object HistoryServer extends Logging {
}
}
- private[history] def getAttemptURI(appId: String, attemptId: String): String = {
- val attemptSuffix = if (!attemptId.isEmpty) s"/${attemptId}" else ""
+ private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = {
+ val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("")
s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}"
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 32a0e891f7026..443b52380092e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -111,7 +111,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long,
user: String, completed: Boolean): ApplicationHistoryInfo = {
ApplicationHistoryInfo(id, name,
- List(ApplicationAttemptInfo("", start, end, lastMod, user, completed)))
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
@@ -128,8 +128,9 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
// Make sure the UI can be rendered.
list.foreach { case info =>
- val appUi = provider.getAppUI(info.id, "")
+ val appUi = provider.getAppUI(info.id, None)
appUi should not be null
+ appUi should not be None
}
}
}
@@ -245,7 +246,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (2)
- list.head.attempts.head.attemptId should be ("attempt2")
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
}
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
@@ -259,7 +260,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
- list.head.attempts.head.attemptId should be ("attempt2")
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
}
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
@@ -272,7 +273,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
- list.head.attempts.head.attemptId should be ("attempt1")
+ list.head.attempts.head.attemptId should be (Some("attempt1"))
list.foreach { case app =>
app.attempts.foreach { attempt =>
@@ -315,7 +316,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (1)
- list.head.attempts.head.attemptId should be ("attempt2")
+ list.head.attempts.head.attemptId should be (Some("attempt2"))
}
assert(!log1.exists())
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 420e382a29cbf..71ba9c18257b8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -37,7 +37,7 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
val ui = mock[SparkUI]
val link = "/history/app1"
val info = new ApplicationHistoryInfo("app1", "app1",
- List(ApplicationAttemptInfo("", 0, 2, 1, "xxx", true)))
+ List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
when(historyServer.getApplicationList()).thenReturn(Seq(info))
when(ui.basePath).thenReturn(link)
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())