Skip to content

Commit

Permalink
Address comments and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonli-db committed Jul 28, 2023
1 parent cf3a49e commit 5280131
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.connect.ui

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.fasterxml.jackson.annotation.JsonIgnore
Expand Down Expand Up @@ -95,24 +96,24 @@ private[connect] class ExecutionInfo(
val sessionId: String,
val startTimestamp: Long,
val userId: String,
val operationId: String,
val sparkSessionTags: Set[String],
val finishTimestamp: Long,
val closeTimestamp: Long,
val executePlan: String,
val detail: String,
val state: ExecutionState.Value,
val jobId: ArrayBuffer[String],
val sqlExecId: ArrayBuffer[String]) {
val sqlExecId: mutable.Set[String]) {
@JsonIgnore @KVIndex("finishTime")
private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) {
finishTimestamp
} else -1L

@JsonIgnore @KVIndex("isExecutionActive")
def isExecutionActive: Boolean = {
!(state == ExecutionState.FAILED ||
state == ExecutionState.CANCELED ||
state == ExecutionState.TIMEDOUT ||
state == ExecutionState.CLOSED)
state == ExecutionState.STARTED ||
state == ExecutionState.COMPILED ||
state == ExecutionState.READY
}

def totalTime(endTime: Long): Long = {
Expand All @@ -125,6 +126,6 @@ private[connect] class ExecutionInfo(
}

private[connect] object ExecutionState extends Enumeration {
val STARTED, COMPILED, READY, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, READY, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin {
}
}

override def displayOrder: Int = 1
override def displayOrder: Int = 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ private[connect] class SparkConnectServerListener(
exec.statement,
exec.sessionId,
exec.startTimestamp,
exec.userId)
exec.userId,
exec.operationId,
exec.sparkSessionTags)
liveExec.jobId += jobStart.jobId.toString
executionIdOpt.foreach { execId => exec.sqlExecId += execId }
updateStoreWithTriggerEnabled(liveExec)
Expand Down Expand Up @@ -133,7 +135,8 @@ private[connect] class SparkConnectServerListener(
e.sessionId,
e.eventTime,
e.userId,
e.extraTags)
e.operationId,
e.sparkSessionTags)
executionData.state = ExecutionState.STARTED
executionList.put(e.jobTag, executionData)
updateLiveStore(executionData)
Expand Down Expand Up @@ -213,7 +216,7 @@ private[connect] class SparkConnectServerListener(
}

private def onSessionStarted(e: SparkListenerConnectSessionStarted) = synchronized {
val session = getOrCreateSession(e.sessionId, e.userId, e.eventTime, e.extraTags)
val session = getOrCreateSession(e.sessionId, e.userId, e.eventTime)
sessionList.put(e.sessionId, session)
updateLiveStore(session)
}
Expand Down Expand Up @@ -247,11 +250,8 @@ private[connect] class SparkConnectServerListener(
private def getOrCreateSession(
sessionId: String,
userName: String,
startTime: Long,
extraTags: Map[String, String] = Map.empty): LiveSessionData = synchronized {
sessionList.getOrElseUpdate(
sessionId,
new LiveSessionData(sessionId, startTime, userName, extraTags))
startTime: Long): LiveSessionData = synchronized {
sessionList.getOrElseUpdate(sessionId, new LiveSessionData(sessionId, startTime, userName))
}

private def getOrCreateExecution(
Expand All @@ -260,10 +260,18 @@ private[connect] class SparkConnectServerListener(
sessionId: String,
startTimestamp: Long,
userId: String,
extraTags: Map[String, String] = Map.empty): LiveExecutionData = synchronized {
operationId: String,
sparkSessionTags: Set[String]): LiveExecutionData = synchronized {
executionList.getOrElseUpdate(
jobTag,
new LiveExecutionData(jobTag, statement, sessionId, startTimestamp, userId, extraTags))
new LiveExecutionData(
jobTag,
statement,
sessionId,
startTimestamp,
userId,
operationId,
sparkSessionTags))
}

private def cleanupExecutions(count: Long): Unit = {
Expand Down Expand Up @@ -310,16 +318,16 @@ private[connect] class LiveExecutionData(
val sessionId: String,
val startTimestamp: Long,
val userId: String,
val extraTags: Map[String, String] = Map.empty)
val operationId: String,
val sparkSessionTags: Set[String])
extends LiveEntity {

var finishTimestamp: Long = 0L
var closeTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var sqlExecId: ArrayBuffer[String] = ArrayBuffer[String]()
var sqlExecId: mutable.Set[String] = mutable.Set[String]()

override protected def doUpdate(): Any = {
new ExecutionInfo(
Expand All @@ -328,9 +336,10 @@ private[connect] class LiveExecutionData(
sessionId,
startTimestamp,
userId,
operationId,
sparkSessionTags,
finishTimestamp,
closeTimestamp,
executePlan,
detail,
state,
jobId,
Expand All @@ -349,8 +358,7 @@ private[connect] class LiveExecutionData(
private[connect] class LiveSessionData(
val sessionId: String,
val startTimestamp: Long,
val userName: String,
val extraTags: Map[String, String] = Map.empty)
val userName: String)
extends LiveEntity {

var finishTimestamp: Long = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.util.Utils
private[ui] class SparkConnectServerPage(parent: SparkConnectServerTab)
extends WebUIPage("")
with Logging {

private val store = parent.store
private val startTime = parent.startTime

Expand Down Expand Up @@ -176,6 +177,7 @@ private[ui] class SqlStatsPagedTable(
sqlStatsTableTag: String,
showSessionLink: Boolean)
extends PagedTable[SqlStatsTableRow] {

private val (sortColumn, desc, pageSize) =
getTableParameters(request, sqlStatsTableTag, "Start Time")

Expand Down Expand Up @@ -213,7 +215,6 @@ private[ui] class SqlStatsPagedTable(
if (showSessionLink) {
Seq(
("User", true, None),
("Job Tag", true, None),
("Job ID", true, None),
("SQL Query ID", true, None),
("Session ID", true, None),
Expand All @@ -224,11 +225,13 @@ private[ui] class SqlStatsPagedTable(
("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)),
("Statement", true, None),
("State", true, None),
("Operation ID", true, None),
("Job Tag", true, None),
("Spark Session Tags", true, None),
("Detail", true, None))
} else {
Seq(
("User", true, None),
("Job Tag", true, None),
("Job ID", true, None),
("SQL Query ID", true, None),
("Start Time", true, None),
Expand All @@ -238,6 +241,9 @@ private[ui] class SqlStatsPagedTable(
("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)),
("Statement", true, None),
("State", true, None),
("Operation ID", true, None),
("Job Tag", true, None),
("Spark Session Tags", true, None),
("Detail", true, None))
}

Expand Down Expand Up @@ -278,14 +284,11 @@ private[ui] class SqlStatsPagedTable(
<td>
{info.userId}
</td>
<td>
{info.jobTag}
</td>
<td>
{jobLinks(sqlStatsTableRow.jobId)}
</td>
<td>
{sqlLinks({ info.sqlExecId })}
{sqlLinks(sqlStatsTableRow.sqlExecId)}
</td>
{
if (showSessionLink) {
Expand Down Expand Up @@ -318,7 +321,16 @@ private[ui] class SqlStatsPagedTable(
<td>
{info.state}
</td>
{errorMessageCell(Option(sqlStatsTableRow.detail))}
<td>
{info.operationId}
</td>
<td>
{info.jobTag}
</td>
<td>
{sqlStatsTableRow.sparkSessionTags.mkString(", ")}
</td>
{errorMessageCell(Option(info.detail))}
</tr>
}

Expand Down Expand Up @@ -428,8 +440,8 @@ private[ui] class SqlStatsTableRow(
val sqlExecId: Seq[String],
val duration: Long,
val executionTime: Long,
val executionInfo: ExecutionInfo,
val detail: String)
val sparkSessionTags: Seq[String],
val executionInfo: ExecutionInfo)

private[ui] class SqlStatsTableDataSource(
info: Seq[ExecutionInfo],
Expand All @@ -449,20 +461,18 @@ private[ui] class SqlStatsTableDataSource(
private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow = {
val duration = executionInfo.totalTime(executionInfo.closeTimestamp)
val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp)
val detail = Option(executionInfo.detail)
.filter(!_.isEmpty)
.getOrElse(executionInfo.executePlan)
val jobId = executionInfo.jobId.toSeq.sorted
val sqlExecId = executionInfo.sqlExecId.toSeq.sorted
val sparkSessionTags = executionInfo.sparkSessionTags.toSeq.sorted

new SqlStatsTableRow(
executionInfo.jobTag,
jobId,
sqlExecId,
duration,
executionTime,
executionInfo,
detail)
sparkSessionTags,
executionInfo)
}

/**
Expand All @@ -471,9 +481,9 @@ private[ui] class SqlStatsTableDataSource(
private def ordering(sortColumn: String, desc: Boolean): Ordering[SqlStatsTableRow] = {
val ordering: Ordering[SqlStatsTableRow] = sortColumn match {
case "User" => Ordering.by(_.executionInfo.userId)
case "Job Tag" => Ordering.by(_.executionInfo.jobTag)
case "Job ID" => Ordering by (_.jobId.headOption)
case "SQL Query ID" => Ordering by (_.sqlExecId.headOption)
case "Operation ID" => Ordering.by(_.executionInfo.operationId)
case "Job ID" => Ordering.by(_.jobId.headOption)
case "SQL Query ID" => Ordering.by(_.sqlExecId.headOption)
case "Session ID" => Ordering.by(_.executionInfo.sessionId)
case "Start Time" => Ordering.by(_.executionInfo.startTimestamp)
case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp)
Expand All @@ -482,7 +492,9 @@ private[ui] class SqlStatsTableDataSource(
case "Duration" => Ordering.by(_.duration)
case "Statement" => Ordering.by(_.executionInfo.statement)
case "State" => Ordering.by(_.executionInfo.state)
case "Detail" => Ordering.by(_.detail)
case "Detail" => Ordering.by(_.executionInfo.detail)
case "Job Tag" => Ordering.by(_.executionInfo.jobTag)
case "Spark Session Tags" => Ordering.by(_.sparkSessionTags.headOption)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
Expand Down Expand Up @@ -514,7 +526,7 @@ private[ui] class SessionStatsTableDataSource(
val ordering: Ordering[SessionInfo] = sortColumn match {
case "User" => Ordering.by(_.userId)
case "Session ID" => Ordering.by(_.sessionId)
case "Start Time" => Ordering by (_.startTimestamp)
case "Start Time" => Ordering.by(_.startTimestamp)
case "Finish Time" => Ordering.by(_.finishTimestamp)
case "Duration" => Ordering.by(_.totalTime)
case "Total Execute" => Ordering.by(_.totalExecution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,35 @@ import org.apache.spark.util.Utils
private[ui] class SparkConnectServerSessionPage(parent: SparkConnectServerTab)
extends WebUIPage("session")
with Logging {

val store = parent.store
private val startTime = parent.startTime

/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val sessionId = request.getParameter("id")
require(sessionId != null && sessionId.nonEmpty, "Missing id parameter")

val content = store.synchronized { // make sure all parts in this page are consistent
val sessionStat = store.getSession(parameterId).orNull
require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")

generateBasicStats() ++
<br/> ++
<h4>
User {sessionStat.userId},
Session created at {formatDate(sessionStat.startTimestamp)},
Total run {sessionStat.totalExecution} Request(s)
</h4> ++
generateSQLStatsTable(request, sessionStat.sessionId)
store
.getSession(sessionId)
.map { sessionStat =>
generateBasicStats() ++
<br/> ++
<h4>
User
{sessionStat.userId}
,
Session created at
{formatDate(sessionStat.startTimestamp)}
,
Total run
{sessionStat.totalExecution}
Request(s)
</h4> ++
generateSQLStatsTable(request, sessionStat.sessionId)
}
.getOrElse(<div>No information to display for session {sessionId}</div>)
}
UIUtils.headerSparkPage(request, "Spark Connect Session", content, parent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private[connect] class SparkConnectServerTab(
sparkUI: SparkUI)
extends SparkUITab(sparkUI, "connect")
with Logging {

override val name = "Connect"

val parent = sparkUI
Expand Down
Loading

0 comments on commit 5280131

Please sign in to comment.