Skip to content

Commit

Permalink
SPARK-4705: 1) moved from directory structure to single file, as per …
Browse files Browse the repository at this point in the history
…the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II
  • Loading branch information
twinkle-g authored and Marcelo Vanzin committed Apr 7, 2015
1 parent 6b2e521 commit 318525a
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ private[history] case class ApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
completed: Boolean = false,
appAttemptId: String = "")

private[history] abstract class ApplicationHistoryProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!mergedApps.contains(info.id) ||
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
mergedApps += (info.id -> info)
val key =
if (info.appAttemptId.equals("")) {
info.id
} else {
info.id + "_" + info.appAttemptId
}
mergedApps += (key -> info)
}
}

Expand Down Expand Up @@ -343,7 +349,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
isApplicationCompleted(eventLog),
appListener.appAttemptId.getOrElse(""))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -438,5 +445,7 @@ private class FsApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
completed: Boolean = true,
appAttemptId: String ="")
extends ApplicationHistoryInfo(
id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId)
137 changes: 128 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import scala.collection.immutable.ListMap
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer

private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

Expand All @@ -34,18 +37,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

val allCompletedAppsNAttempts =
parent.getApplicationList().filter(_.completed != requestedIncomplete)
val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts)

val allAppsSize = allCompletedAppsNAttempts.size

val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
val apps =
allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))
val appWithAttemptsDisplayList =
appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))

val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)

val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val appTable =
if (hasAttemptInfo) {
UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList)
} else {
UIUtils.listingTable(appHeader, appRow, apps)
}
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
Expand All @@ -59,15 +75,15 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
if (allApps.size > 0) {
if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount,
requestedIncomplete)

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
Expand Down Expand Up @@ -113,6 +129,36 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
</div>
UIUtils.basicSparkPage(content, "History Server")
}

private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={
// Create HashMap as per the multiple attempts for one application.
// If there is no attempt specific stuff, then
// do return false, to indicate the same, so that previous UI gets displayed.
var hasAttemptInfo = false
val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]]
for( appAttempt <- appNattemptList) {
if(!appAttempt.appAttemptId.equals("")){
hasAttemptInfo = true
val attemptId = appAttempt.appAttemptId.toInt
if(appToAttemptInfo.contains(appAttempt.id)){
val currentAttempts = appToAttemptInfo.get(appAttempt.id).get
currentAttempts += appAttempt
appToAttemptInfo.put( appAttempt.id, currentAttempts)
} else {
val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put( appAttempt.id, currentAttempts )
}
}else {
val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put(appAttempt.id, currentAttempts)
}
}
val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*)
(hasAttemptInfo, sortedMap)
}


private val appHeader = Seq(
"App ID",
Expand All @@ -128,6 +174,16 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
range.filter(condition).map(nextPage =>
<a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
}

private val appWithAttemptHeader = Seq(
"App ID",
"App Name",
"Attempt ID",
"Started",
"Completed",
"Duration",
"Spark User",
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
Expand All @@ -146,6 +202,69 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}

private def getAttemptURI(attemptInfo: ApplicationHistoryInfo,
returnEmptyIfAttemptInfoNull: Boolean = true ) = {
if (attemptInfo.appAttemptId.equals("")) {
if(returnEmptyIfAttemptInfoNull) {
attemptInfo.appAttemptId
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}"
}
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
}
}

private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = {
val uiAddress =
if (attemptInfo.appAttemptId.equals("")) {
attemptInfo.appAttemptId
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
}

val startTime = UIUtils.formatDate(attemptInfo.startTime)
val endTime = UIUtils.formatDate(attemptInfo.endTime)
val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime)
val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated)
val attemptId = attemptInfo.appAttemptId
<td><a href={uiAddress}>{attemptId}</a></td>
<td sorttable_customkey={attemptInfo.startTime.toString}>{startTime}</td>
<td sorttable_customkey={attemptInfo.endTime.toString}>{endTime}</td>
<td sorttable_customkey={(attemptInfo.endTime - attemptInfo.startTime).toString}>
{duration}</td>
<td>{attemptInfo.sparkUser}</td>
<td sorttable_customkey={attemptInfo.lastUpdated.toString}>{lastUpdated}</td>
}

private def attemptRow(attemptInfo: ApplicationHistoryInfo) = {
<tr>
{firstAttemptRow(attemptInfo)}
</tr>
}

private def appWithAttemptRow(
appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = {
val applicationId = appAttemptsInfo._1
val info = appAttemptsInfo._2
val rowSpan = info.length
val rowSpanString = rowSpan.toString
val applicatioName = info(0).name
val lastAttemptURI = getAttemptURI(info(0), false)
val ttAttempts = info.slice(1, rowSpan -1)
val x = new xml.NodeBuffer
x +=
<tr>
<td rowspan={rowSpanString}><a href={lastAttemptURI}>{applicationId}</a></td>
<td rowspan={rowSpanString}>{applicatioName}</td>
{ firstAttemptRow(info(0)) }
</tr>;
for( i <- 1 until rowSpan ){
x += attemptRow(info(i))
}
x
}

private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
Expand All @@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
appAttemptId = Some(applicationStart.appAttemptId)
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,11 @@ private[spark] object EventLoggingListener extends Logging {
appAttemptId: String,
compressionCodecName: Option[String] = None): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase

if (appAttemptId.equals("")) {
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
} else {
Utils.resolveURI(logBaseDir) + "/" + appAttemptId + "/" + name.stripSuffix("/")
}
if (appAttemptId.equals("")) {
logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/")
} else {
logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + "_" + appAttemptId
}
}

def getLogPath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
sparkUser: String) extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private[spark] object JsonProtocol {
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser)
("User" -> applicationStart.sparkUser) ~
("appAttemptId" -> applicationStart.appAttemptId)
}

def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
Expand Down Expand Up @@ -562,7 +563,8 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
SparkListenerApplicationStart(appName, appId, time, sparkUser)
val appAttemptId = (json \ "appAttemptId").extract[String]
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}

def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
Expand Down

0 comments on commit 318525a

Please sign in to comment.