From f8786f01ad605c4549b60ba9998e35cca433665a Mon Sep 17 00:00:00 2001 From: Jason Li Date: Sat, 29 Jul 2023 09:20:13 -0700 Subject: [PATCH] [SPARK-44394][CONNECT][WEBUI] Add a Spark UI page for Spark Connect ## What changes were proposed in this pull request? Add a new Spark UI page to display session and execution information for Spark Connect. This builds of the work in SPARK-43923 (https://github.com/apache/spark/pull/41443) that adds the relevant SparkListenerEvents and mirrors the ThriftServerPage in the Spark UI for JDBC/ODBC. Screenshot 2023-07-27 at 11 29 22 PM Screenshot 2023-07-27 at 11 29 15 PM ### Why are the changes needed? This gives users a way to access session and execution information for Spark Connect via the UI and provides the frontend interface for the related SparkListenerEvents. ### Does this PR introduce _any_ user-facing change? Yes, it will add a new tab/page in the Spark UI ### How was this patch tested? Unit tests Closes #41964 from jasonli-db/spark-connect-ui. Authored-by: Jason Li Signed-off-by: Gengliang Wang --- .../spark/sql/connect/common/ProtoUtils.scala | 2 +- .../sql/connect/SparkConnectPlugin.scala | 2 +- .../spark/sql/connect/config/Connect.scala | 13 + .../execution/ExecuteThreadRunner.scala | 15 +- .../sql/connect/service/ExecuteHolder.scala | 39 +- .../connect/service/SparkConnectServer.scala | 3 +- .../connect/service/SparkConnectService.scala | 26 +- .../ui/SparkConnectServerAppStatusStore.scala | 131 +++++ ...parkConnectServerHistoryServerPlugin.scala | 41 ++ .../ui/SparkConnectServerListener.scala | 377 ++++++++++++ .../connect/ui/SparkConnectServerPage.scala | 541 ++++++++++++++++++ .../ui/SparkConnectServerSessionPage.scala | 128 +++++ .../connect/ui/SparkConnectServerTab.scala | 57 ++ .../spark/sql/connect/ui/ToolTips.scala | 39 ++ .../ui/SparkConnectServerListenerSuite.scala | 234 ++++++++ .../ui/SparkConnectServerPageSuite.scala | 135 +++++ .../scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 18 +- .../scala/org/apache/spark/ui/UISuite.scala | 2 +- .../StreamingQueryHistoryServerPlugin.scala | 2 +- 20 files changed, 1789 insertions(+), 18 deletions(-) create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala create mode 100644 connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala create mode 100644 connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala index e2934b5674495..99684eef7d7dd 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala @@ -93,7 +93,7 @@ private[connect] object ProtoUtils { def throwIfInvalidTag(tag: String): Unit = { // Same format rules apply to Spark Connect execution tags as to SparkContext job tags, // because the Spark Connect job tag is also used as part of SparkContext job tag. - // See SparkContext.throwIfInvalidTag and ExecuteHolder.tagToSparkJobTag + // See SparkContext.throwIfInvalidTag and ExecuteHolderSessionTag if (tag == null) { throw new IllegalArgumentException("Spark Connect tag cannot be null.") } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala index bb694a7679890..ca8617cbe1a74 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala @@ -45,7 +45,7 @@ class SparkConnectPlugin extends SparkPlugin { override def init( sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { - SparkConnectService.start() + SparkConnectService.start(sc) Map.empty[String, String].asJava } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 31f119047e4cc..23aa42bad3044 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -121,4 +121,17 @@ object Connect { .version("3.5.0") .booleanConf .createWithDefault(false) + + val CONNECT_UI_STATEMENT_LIMIT = + ConfigBuilder("spark.sql.connect.ui.retainedStatements") + .doc("The number of statements kept in the Spark Connect UI history.") + .version("3.5.0") + .intConf + .createWithDefault(200) + + val CONNECT_UI_SESSION_LIMIT = ConfigBuilder("spark.sql.connect.ui.retainedSessions") + .doc("The number of client sessions kept in the Spark Connect UI history.") + .version("3.5.0") + .intConf + .createWithDefault(200) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 6758df0d7e6d7..dad84180a0ff2 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -27,7 +27,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.planner.SparkConnectPlanner -import org.apache.spark.sql.connect.service.ExecuteHolder +import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag} import org.apache.spark.sql.connect.utils.ErrorUtils import org.apache.spark.util.Utils @@ -95,8 +95,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends } finally { executeHolder.sessionHolder.session.sparkContext.removeJobTag(executeHolder.jobTag) executeHolder.sparkSessionTags.foreach { tag => - executeHolder.sessionHolder.session.sparkContext - .removeJobTag(executeHolder.tagToSparkJobTag(tag)) + executeHolder.sessionHolder.session.sparkContext.removeJobTag( + ExecuteSessionTag( + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + tag)) } } } catch { @@ -128,7 +131,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends session.sparkContext.addJobTag(executeHolder.jobTag) // Also set all user defined tags as Spark Job tags. executeHolder.sparkSessionTags.foreach { tag => - session.sparkContext.addJobTag(executeHolder.tagToSparkJobTag(tag)) + session.sparkContext.addJobTag( + ExecuteSessionTag( + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + tag)) } session.sparkContext.setJobDescription( s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 36c96b2617fb7..7ad68d06f9622 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -38,11 +38,7 @@ private[connect] class ExecuteHolder( * Tag that is set for this execution on SparkContext, via SparkContext.addJobTag. Used * (internally) for cancallation of the Spark Jobs ran by this execution. */ - val jobTag = - s"SparkConnect_Execute_" + - s"User_${sessionHolder.userId}_" + - s"Session_${sessionHolder.sessionId}_" + - s"Operation_${operationId}" + val jobTag = ExecuteJobTag(sessionHolder.userId, sessionHolder.sessionId, operationId) /** * Tags set by Spark Connect client users via SparkSession.addTag. Used to identify and group @@ -122,3 +118,36 @@ private[connect] class ExecuteHolder( s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}" } } + +/** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */ +object ExecuteJobTag { + private val prefix = "SparkConnect_OperationTag" + + def apply(sessionId: String, userId: String, operationId: String): String = { + s"${prefix}_" + + s"User_${userId}_" + + s"Session_${sessionId}_" + + s"Operation_${operationId}" + } + + def unapply(jobTag: String): Option[String] = { + if (jobTag.startsWith(prefix)) Some(jobTag) else None + } +} + +/** Used to identify ExecuteHolder sessionTag among SparkContext.SPARK_JOB_TAGS. */ +object ExecuteSessionTag { + private val prefix = "SparkConnect_SessionTag" + + def apply(userId: String, sessionId: String, tag: String): String = { + ProtoUtils.throwIfInvalidTag(tag) + s"${prefix}_" + + s"User_${userId}_" + + s"Session_${sessionId}_" + + s"Tag_${tag}" + } + + def unapply(sessionTag: String): Option[String] = { + if (sessionTag.startsWith(prefix)) Some(sessionTag) else None + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index 1ed8ee2ff86a7..26c1062bf3437 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -34,7 +34,7 @@ object SparkConnectServer extends Logging { val session = SparkSession.builder.getOrCreate() try { try { - SparkConnectService.start() + SparkConnectService.start(session.sparkContext) SparkConnectService.server.getListenSockets.foreach { sa => val isa = sa.asInstanceOf[InetSocketAddress] logInfo( @@ -49,6 +49,7 @@ object SparkConnectServer extends Logging { SparkConnectService.server.awaitTermination() } finally { session.stop() + SparkConnectService.uiTab.foreach(_.detach()) } } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 87e4f21732f29..206e24714fea7 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -29,13 +29,16 @@ import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver import org.apache.commons.lang3.StringUtils -import org.apache.spark.{SparkEnv, SparkSQLException} +import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} +import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore, SparkConnectServerListener, SparkConnectServerTab} import org.apache.spark.sql.connect.utils.ErrorUtils +import org.apache.spark.status.ElementTrackingStore /** * The SparkConnectService implementation. @@ -181,6 +184,9 @@ object SparkConnectService extends Logging { private[connect] var server: Server = _ + private[connect] var uiTab: Option[SparkConnectServerTab] = None + private[connect] var listener: SparkConnectServerListener = _ + // For testing purpose, it's package level private. private[connect] def localPort: Int = { assert(server != null) @@ -251,6 +257,20 @@ object SparkConnectService extends Logging { SparkSession.active.newSession() } + private def createListenerAndUI(sc: SparkContext): Unit = { + val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] + listener = new SparkConnectServerListener(kvStore, sc.conf) + sc.listenerBus.addToStatusQueue(listener) + uiTab = if (sc.getConf.get(UI_ENABLED)) { + Some( + new SparkConnectServerTab( + new SparkConnectServerAppStatusStore(kvStore), + SparkConnectServerTab.getSparkUI(sc))) + } else { + None + } + } + /** * Starts the GRPC Serivce. */ @@ -280,8 +300,9 @@ object SparkConnectService extends Logging { } // Starts the service - def start(): Unit = { + def start(sc: SparkContext): Unit = { startGRPCService() + createListenerAndUI(sc) } def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit = { @@ -294,6 +315,7 @@ object SparkConnectService extends Logging { } } userSessionMapping.invalidateAll() + uiTab.foreach(_.detach()) } def extractErrorMessage(st: Throwable): String = { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala new file mode 100644 index 0000000000000..28812e49d1fab --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.status.KVUtils +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.{KVIndex, KVStore} + +class SparkConnectServerAppStatusStore(store: KVStore) { + def getSessionList: Seq[SessionInfo] = { + KVUtils.viewToSeq(store.view(classOf[SessionInfo])) + } + + def getExecutionList: Seq[ExecutionInfo] = { + KVUtils.viewToSeq(store.view(classOf[ExecutionInfo])) + } + + def getOnlineSessionNum: Int = { + KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0) + } + + def getSession(sessionId: String): Option[SessionInfo] = { + try { + Some(store.read(classOf[SessionInfo], sessionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def getExecution(executionId: String): Option[ExecutionInfo] = { + try { + Some(store.read(classOf[ExecutionInfo], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = { + KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive) + } + + def getSessionCount: Long = { + store.count(classOf[SessionInfo]) + } + + def getExecutionCount: Long = { + store.count(classOf[ExecutionInfo]) + } +} + +private[connect] class SessionInfo( + @KVIndexParam val sessionId: String, + val startTimestamp: Long, + val userId: String, + val finishTimestamp: Long, + val totalExecution: Long) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L) finishTimestamp else -1L + def totalTime: Long = { + if (finishTimestamp == 0L) { + System.currentTimeMillis - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +private[connect] class ExecutionInfo( + @KVIndexParam val jobTag: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userId: String, + val operationId: String, + val sparkSessionTags: Set[String], + val finishTimestamp: Long, + val closeTimestamp: Long, + val detail: String, + val state: ExecutionState.Value, + val jobId: ArrayBuffer[String], + val sqlExecId: mutable.Set[String]) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) { + finishTimestamp + } else -1L + + @JsonIgnore @KVIndex("isExecutionActive") + def isExecutionActive: Boolean = { + state == ExecutionState.STARTED || + state == ExecutionState.COMPILED || + state == ExecutionState.READY + } + + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { + System.currentTimeMillis - startTimestamp + } else { + endTime - startTimestamp + } + } +} + +private[connect] object ExecutionState extends Enumeration { + val STARTED, COMPILED, READY, CANCELED, FAILED, FINISHED, CLOSED = Value + type ExecutionState = Value +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala new file mode 100644 index 0000000000000..ba289e30a65b3 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore} +import org.apache.spark.ui.SparkUI + +class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin { + + override def createListeners( + conf: SparkConf, + store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SparkConnectServerListener(store, conf)) + } + + override def setupUI(ui: SparkUI): Unit = { + val store = new SparkConnectServerAppStatusStore(ui.store.store) + if (store.getSessionCount > 0) { + new SparkConnectServerTab(store, ui) + } + } + + override def displayOrder: Int = 3 +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala new file mode 100644 index 0000000000000..b40e847f4049f --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD +import org.apache.spark.scheduler._ +import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} +import org.apache.spark.sql.connect.service._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} + +private[connect] class SparkConnectServerListener( + kvstore: ElementTrackingStore, + sparkConf: SparkConf, + live: Boolean = true) + extends SparkListener + with Logging { + + private val sessionList = new mutable.LinkedHashMap[String, LiveSessionData] + private val executionList = new mutable.LinkedHashMap[String, LiveExecutionData] + + private val (retainedStatements: Int, retainedSessions: Int) = { + ( + SparkEnv.get.conf.get(CONNECT_UI_STATEMENT_LIMIT), + SparkEnv.get.conf.get(CONNECT_UI_SESSION_LIMIT)) + } + + // How often to update live entities. -1 means "never update" when replaying applications, + // meaning only the last write will happen. For live applications, this avoids a few + // operations that we can live without when rapidly processing incoming events. + private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Returns true if this listener has no live data. Exposed for tests only. + private[connect] def noLiveData(): Boolean = synchronized { + sessionList.isEmpty && executionList.isEmpty + } + + kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count => + cleanupSession(count) + } + + kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count => + cleanupExecutions(count) + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobTags = Option(jobStart.properties) + .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_TAGS)) } + .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet) + .getOrElse(Set()) + .toSeq + .filter(!_.isEmpty) + .sorted + val executeJobTagOpt = jobTags.find { + case ExecuteJobTag(_) => true + case _ => false + } + if (executeJobTagOpt.isEmpty) { + return + } + val executeJobTag = executeJobTagOpt.get + val exec = executionList.get(executeJobTag) + val executionIdOpt: Option[String] = Option(jobStart.properties) + .flatMap { p => Option(p.getProperty(SQLExecution.EXECUTION_ID_KEY)) } + if (exec.nonEmpty) { + exec.foreach { exec => + exec.jobId += jobStart.jobId.toString + executionIdOpt.foreach { execId => exec.sqlExecId += execId } + updateLiveStore(exec) + } + } else { + // It may possible that event reordering happens, such a way that JobStart event come after + // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in + // Spark Connect Server, following code will take care. Here will come only if JobStart + // event comes after Execution End event. + val storeExecInfo = + KVUtils.viewToSeq(kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(exec => + exec.jobTag == executeJobTag) + storeExecInfo.foreach { exec => + val liveExec = getOrCreateExecution( + exec.jobTag, + exec.statement, + exec.sessionId, + exec.startTimestamp, + exec.userId, + exec.operationId, + exec.sparkSessionTags) + liveExec.jobId += jobStart.jobId.toString + executionIdOpt.foreach { execId => exec.sqlExecId += execId } + updateStoreWithTriggerEnabled(liveExec) + executionList.remove(liveExec.jobTag) + } + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SparkListenerConnectOperationStarted => onOperationStarted(e) + case e: SparkListenerConnectOperationAnalyzed => onOperationAnalyzed(e) + case e: SparkListenerConnectOperationReadyForExecution => onOperationReadyForExecution(e) + case e: SparkListenerConnectOperationCanceled => onOperationCanceled(e) + case e: SparkListenerConnectOperationFailed => onOperationFailed(e) + case e: SparkListenerConnectOperationFinished => onOperationFinished(e) + case e: SparkListenerConnectOperationClosed => onOperationClosed(e) + case e: SparkListenerConnectSessionStarted => onSessionStarted(e) + case e: SparkListenerConnectSessionClosed => onSessionClosed(e) + case _ => // Ignore + } + } + + private def onOperationStarted(e: SparkListenerConnectOperationStarted) = synchronized { + val executionData = getOrCreateExecution( + e.jobTag, + e.statementText, + e.sessionId, + e.eventTime, + e.userId, + e.operationId, + e.sparkSessionTags) + executionData.state = ExecutionState.STARTED + executionList.put(e.jobTag, executionData) + updateLiveStore(executionData) + sessionList.get(e.sessionId) match { + case Some(sessionData) => + sessionData.totalExecution += 1 + updateLiveStore(sessionData) + case None => + logWarning( + s"onOperationStart called with unknown session id: ${e.sessionId}." + + s"Regardless, the operation has been registered.") + } + } + + private def onOperationAnalyzed(e: SparkListenerConnectOperationAnalyzed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.state = ExecutionState.COMPILED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationAnalyzed called with unknown operation id: ${e.jobTag}") + } + } + + private def onOperationReadyForExecution( + e: SparkListenerConnectOperationReadyForExecution): Unit = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.state = ExecutionState.READY + updateLiveStore(executionData) + case None => + logWarning(s"onOperationReadyForExectuion called with unknown operation id: ${e.jobTag}") + } + } + + private def onOperationCanceled(e: SparkListenerConnectOperationCanceled) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.state = ExecutionState.CANCELED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationCanceled called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationFailed(e: SparkListenerConnectOperationFailed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.detail = e.errorMessage + executionData.state = ExecutionState.FAILED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationFailed called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationFinished(e: SparkListenerConnectOperationFinished) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.state = ExecutionState.FINISHED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationFinished called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationClosed(e: SparkListenerConnectOperationClosed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.closeTimestamp = e.eventTime + executionData.state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionData) + executionList.remove(e.jobTag) + case None => + logWarning(s"onOperationClosed called with unknown operation id: ${e.jobTag}") + } + } + + private def onSessionStarted(e: SparkListenerConnectSessionStarted) = synchronized { + val session = getOrCreateSession(e.sessionId, e.userId, e.eventTime) + sessionList.put(e.sessionId, session) + updateLiveStore(session) + } + + private def onSessionClosed(e: SparkListenerConnectSessionClosed) = synchronized { + sessionList.get(e.sessionId) match { + case Some(sessionData) => + sessionData.finishTimestamp = e.eventTime + updateStoreWithTriggerEnabled(sessionData) + sessionList.remove(e.sessionId) + + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + } + } + + // Update both live and history stores. Trigger is enabled by default, hence + // it will cleanup the entity which exceeds the threshold. + def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = synchronized { + entity.write(kvstore, System.nanoTime(), checkTriggers = true) + } + + // Update only live stores. If trigger is enabled, it will cleanup entity + // which exceeds the threshold. + def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = synchronized { + val now = System.nanoTime() + if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + entity.write(kvstore, now, checkTriggers = trigger) + } + } + + private def getOrCreateSession( + sessionId: String, + userName: String, + startTime: Long): LiveSessionData = synchronized { + sessionList.getOrElseUpdate(sessionId, new LiveSessionData(sessionId, startTime, userName)) + } + + private def getOrCreateExecution( + jobTag: String, + statement: String, + sessionId: String, + startTimestamp: Long, + userId: String, + operationId: String, + sparkSessionTags: Set[String]): LiveExecutionData = synchronized { + executionList.getOrElseUpdate( + jobTag, + new LiveExecutionData( + jobTag, + statement, + sessionId, + startTimestamp, + userId, + operationId, + sparkSessionTags)) + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedStatements) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0 + } + toDelete.foreach { j => kvstore.delete(j.getClass, j.jobTag) } + } + + private def cleanupSession(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedSessions) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0L + } + + toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) } + } + + /** + * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done + * asynchronously, this method may return 0 in case enough items have been deleted already. + */ + private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { + if (dataSize > retainedSize) { + math.max(retainedSize / 10L, dataSize - retainedSize) + } else { + 0L + } + } +} + +private[connect] class LiveExecutionData( + val jobTag: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userId: String, + val operationId: String, + val sparkSessionTags: Set[String]) + extends LiveEntity { + + var finishTimestamp: Long = 0L + var closeTimestamp: Long = 0L + var detail: String = "" + var state: ExecutionState.Value = ExecutionState.STARTED + val jobId: ArrayBuffer[String] = ArrayBuffer[String]() + var sqlExecId: mutable.Set[String] = mutable.Set[String]() + + override protected def doUpdate(): Any = { + new ExecutionInfo( + jobTag, + statement, + sessionId, + startTimestamp, + userId, + operationId, + sparkSessionTags, + finishTimestamp, + closeTimestamp, + detail, + state, + jobId, + sqlExecId) + } + + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { + System.currentTimeMillis - startTimestamp + } else { + endTime - startTimestamp + } + } +} + +private[connect] class LiveSessionData( + val sessionId: String, + val startTimestamp: Long, + val userName: String) + extends LiveEntity { + + var finishTimestamp: Long = 0L + var totalExecution: Int = 0 + + override protected def doUpdate(): Any = { + new SessionInfo(sessionId, startTimestamp, userName, finishTimestamp, totalExecution) + } + def totalTime: Long = { + if (finishTimestamp == 0L) { + System.currentTimeMillis - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala new file mode 100644 index 0000000000000..57535cc06a26d --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.commons.text.StringEscapeUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connect.ui.ToolTips._ +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Utils + +/** Page for Spark UI that shows statistics for a Spark Connect Server. */ +private[ui] class SparkConnectServerPage(parent: SparkConnectServerTab) + extends WebUIPage("") + with Logging { + + private val store = parent.store + private val startTime = parent.startTime + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = store.synchronized { // make sure all parts in this page are consistent + generateBasicStats() ++ +
++ +

+ {store.getOnlineSessionNum} + session(s) are online, + running + {store.getTotalRunning} + Request(s) +

++ + generateSessionStatsTable(request) ++ + generateSQLStatsTable(request) + } + UIUtils.headerSparkPage(request, "Spark Connect", content, parent) + } + + /** Generate basic stats of the Spark Connect server */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime +
    +
  • + Started at: {formatDate(startTime)} +
  • +
  • + Time since start: {formatDurationVerbose(timeSinceStart)} +
  • +
+ } + + /** Generate stats of batch statements of the Spark Connect program */ + private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { + + val numStatement = store.getExecutionList.size + + val table = if (numStatement > 0) { + + val sqlTableTag = "sqlstat" + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SqlStatsPagedTable( + request, + parent, + store.getExecutionList, + "connect", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag, + showSessionLink = true).table(sqlTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + val content = + +

+ + Request Statistics ({numStatement}) +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ content + } + + /** Generate stats of batch sessions of the Spark Connect server */ + private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { + val numSessions = store.getSessionList.size + val table = if (numSessions > 0) { + + val sessionTableTag = "sessionstat" + + val sessionTablePage = + Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SessionStatsPagedTable( + request, + parent, + store.getSessionList, + "connect", + UIUtils.prependBaseUri(request, parent.basePath), + sessionTableTag).table(sessionTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + + val content = + +

+ + Session Statistics ({numSessions}) +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } +} + +private[ui] class SqlStatsPagedTable( + request: HttpServletRequest, + parent: SparkConnectServerTab, + data: Seq[ExecutionInfo], + subPath: String, + basePath: String, + sqlStatsTableTag: String, + showSessionLink: Boolean) + extends PagedTable[SqlStatsTableRow] { + + private val (sortColumn, desc, pageSize) = + getTableParameters(request, sqlStatsTableTag, "Start Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getParameterOtherTable(request, sqlStatsTableTag)}" + + override val dataSource = new SqlStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sqlStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sqlStatsTableTag" + } + + override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sqlStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag" + + override def headers: Seq[Node] = { + val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + if (showSessionLink) { + Seq( + ("User", true, None), + ("Job ID", true, None), + ("SQL Query ID", true, None), + ("Session ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)), + ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)), + ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)), + ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)), + ("Statement", true, None), + ("State", true, None), + ("Operation ID", true, None), + ("Job Tag", true, None), + ("Spark Session Tags", true, None), + ("Detail", true, None)) + } else { + Seq( + ("User", true, None), + ("Job ID", true, None), + ("SQL Query ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)), + ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)), + ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)), + ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)), + ("Statement", true, None), + ("State", true, None), + ("Operation ID", true, None), + ("Job Tag", true, None), + ("Spark Session Tags", true, None), + ("Detail", true, None)) + } + + isSortColumnValid(sqlTableHeadersAndTooltips, sortColumn) + + headerRow( + sqlTableHeadersAndTooltips, + desc, + pageSize, + sortColumn, + parameterPath, + sqlStatsTableTag, + sqlStatsTableTag) + } + + override def row(sqlStatsTableRow: SqlStatsTableRow): Seq[Node] = { + val info = sqlStatsTableRow.executionInfo + val startTime = info.startTimestamp + val executionTime = sqlStatsTableRow.executionTime + val duration = sqlStatsTableRow.duration + + def jobLinks(jobData: Seq[String]): Seq[Node] = { + jobData.map { jobId => + [{jobId}] + } + } + def sqlLinks(sqlData: Seq[String]): Seq[Node] = { + sqlData.map { sqlExecId => + [{sqlExecId}] + } + } + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), + parent.prefix, + info.sessionId) + + + + {info.userId} + + + {jobLinks(sqlStatsTableRow.jobId)} + + + {sqlLinks(sqlStatsTableRow.sqlExecId)} + + { + if (showSessionLink) { + + {info.sessionId} + + } + } + + {UIUtils.formatDate(startTime)} + + + {if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)} + + + {if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)} + + + + {formatDurationVerbose(executionTime)} + + + {formatDurationVerbose(duration)} + + + + {info.statement} + + + + {if (info.isExecutionActive) "RUNNING" else info.state} + + + {info.operationId} + + + {info.jobTag} + + + {sqlStatsTableRow.sparkSessionTags.mkString(", ")} + + {errorMessageCell(Option(info.detail))} + + } + + private def errorMessageCell(errorMessageOption: Option[String]): Seq[Node] = { + val errorMessage = errorMessageOption.getOrElse("") + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4(if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = detailsUINode(isMultiline, errorMessage) + + {errorSummary}{details} + + } + + private def jobURL(request: HttpServletRequest, jobId: String): String = + "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) + + private def sqlURL(request: HttpServletRequest, sqlExecId: String): String = + "%s/SQL/execution/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), sqlExecId) +} + +private[ui] class SessionStatsPagedTable( + request: HttpServletRequest, + parent: SparkConnectServerTab, + data: Seq[SessionInfo], + subPath: String, + basePath: String, + sessionStatsTableTag: String) + extends PagedTable[SessionInfo] { + + private val (sortColumn, desc, pageSize) = + getTableParameters(request, sessionStatsTableTag, "Start Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getParameterOtherTable(request, sessionStatsTableTag)}" + + override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sessionStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sessionStatsTableTag" + } + + override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sessionStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag" + + override def headers: Seq[Node] = { + val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + Seq( + ("User", true, None), + ("Session ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, None), + ("Duration", true, Some(SPARK_CONNECT_SESSION_DURATION)), + ("Total Execute", true, Some(SPARK_CONNECT_SESSION_TOTAL_EXECUTE))) + + isSortColumnValid(sessionTableHeadersAndTooltips, sortColumn) + + headerRow( + sessionTableHeadersAndTooltips, + desc, + pageSize, + sortColumn, + parameterPath, + sessionStatsTableTag, + sessionStatsTableTag) + } + + override def row(session: SessionInfo): Seq[Node] = { + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), + parent.prefix, + session.sessionId) + + {session.userId} + {session.sessionId} + {formatDate(session.startTimestamp)} + {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} + {formatDurationVerbose(session.totalTime)} + {session.totalExecution.toString} + + } +} + +private[ui] class SqlStatsTableRow( + val jobTag: String, + val jobId: Seq[String], + val sqlExecId: Seq[String], + val duration: Long, + val executionTime: Long, + val sparkSessionTags: Seq[String], + val executionInfo: ExecutionInfo) + +private[ui] class SqlStatsTableDataSource( + info: Seq[ExecutionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[SqlStatsTableRow](pageSize) { + + // Convert ExecutionInfo to SqlStatsTableRow which contains the final contents to show in + // the table so that we can avoid creating duplicate contents during sorting the data + private val data = info.map(sqlStatsTableRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = data.slice(from, to) + + private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow = { + val duration = executionInfo.totalTime(executionInfo.closeTimestamp) + val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp) + val jobId = executionInfo.jobId.toSeq.sorted + val sqlExecId = executionInfo.sqlExecId.toSeq.sorted + val sparkSessionTags = executionInfo.sparkSessionTags.toSeq.sorted + + new SqlStatsTableRow( + executionInfo.jobTag, + jobId, + sqlExecId, + duration, + executionTime, + sparkSessionTags, + executionInfo) + } + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SqlStatsTableRow] = { + val ordering: Ordering[SqlStatsTableRow] = sortColumn match { + case "User" => Ordering.by(_.executionInfo.userId) + case "Operation ID" => Ordering.by(_.executionInfo.operationId) + case "Job ID" => Ordering.by(_.jobId.headOption) + case "SQL Query ID" => Ordering.by(_.sqlExecId.headOption) + case "Session ID" => Ordering.by(_.executionInfo.sessionId) + case "Start Time" => Ordering.by(_.executionInfo.startTimestamp) + case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp) + case "Close Time" => Ordering.by(_.executionInfo.closeTimestamp) + case "Execution Time" => Ordering.by(_.executionTime) + case "Duration" => Ordering.by(_.duration) + case "Statement" => Ordering.by(_.executionInfo.statement) + case "State" => Ordering.by(_.executionInfo.state) + case "Detail" => Ordering.by(_.executionInfo.detail) + case "Job Tag" => Ordering.by(_.executionInfo.jobTag) + case "Spark Session Tags" => Ordering.by(_.sparkSessionTags.headOption) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } +} + +private[ui] class SessionStatsTableDataSource( + info: Seq[SessionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[SessionInfo](pageSize) { + + // Sorting SessionInfo data + private val data = info.sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SessionInfo] = data.slice(from, to) + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionInfo] = { + val ordering: Ordering[SessionInfo] = sortColumn match { + case "User" => Ordering.by(_.userId) + case "Session ID" => Ordering.by(_.sessionId) + case "Start Time" => Ordering.by(_.startTimestamp) + case "Finish Time" => Ordering.by(_.finishTimestamp) + case "Duration" => Ordering.by(_.totalTime) + case "Total Execute" => Ordering.by(_.totalExecution) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala new file mode 100644 index 0000000000000..fde6e8da8b63f --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.internal.Logging +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Utils + +/** Page for Spark UI that contains information pertaining to a single Spark Connect session */ +private[ui] class SparkConnectServerSessionPage(parent: SparkConnectServerTab) + extends WebUIPage("session") + with Logging { + + val store = parent.store + private val startTime = parent.startTime + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val sessionId = request.getParameter("id") + require(sessionId != null && sessionId.nonEmpty, "Missing id parameter") + + val content = store.synchronized { // make sure all parts in this page are consistent + store + .getSession(sessionId) + .map { sessionStat => + generateBasicStats() ++ +
++ +

+ User + {sessionStat.userId} + , + Session created at + {formatDate(sessionStat.startTimestamp)} + , + Total run + {sessionStat.totalExecution} + Request(s) +

++ + generateSQLStatsTable(request, sessionStat.sessionId) + } + .getOrElse(
No information to display for session {sessionId}
) + } + UIUtils.headerSparkPage(request, "Spark Connect Session", content, parent) + } + + /** Generate basic stats of the Spark Connect Server */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime +
    +
  • + Started at: {formatDate(startTime)} +
  • +
  • + Time since start: {formatDurationVerbose(timeSinceStart)} +
  • +
+ } + + /** Generate stats of batch statements of the Spark Connect server */ + private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { + val executionList = store.getExecutionList + .filter(_.sessionId == sessionID) + val numStatement = executionList.size + val table = if (numStatement > 0) { + + val sqlTableTag = "sqlsessionstat" + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SqlStatsPagedTable( + request, + parent, + executionList, + "connect/session", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag, + showSessionLink = false).table(sqlTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + val content = + +

+ + Request Statistics +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala new file mode 100644 index 0000000000000..4d1820a994c1e --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.Date + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[connect] class SparkConnectServerTab( + val store: SparkConnectServerAppStatusStore, + sparkUI: SparkUI) + extends SparkUITab(sparkUI, "connect") + with Logging { + + override val name = "Connect" + + val parent = sparkUI + val startTime = + try { + sparkUI.store.applicationInfo().attempts.head.startTime + } catch { + case _: NoSuchElementException => new Date(System.currentTimeMillis()) + } + + attachPage(new SparkConnectServerPage(this)) + attachPage(new SparkConnectServerSessionPage(this)) + parent.attachTab(this) + def detach(): Unit = { + parent.detachTab(this) + } +} + +private[connect] object SparkConnectServerTab { + def getSparkUI(sparkContext: SparkContext): SparkUI = { + sparkContext.ui.getOrElse { + throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError() + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala new file mode 100644 index 0000000000000..9b51ace83c6c1 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +private[ui] object ToolTips { + val SPARK_CONNECT_SERVER_FINISH_TIME = + "Execution finish time, before fetching the results" + + val SPARK_CONNECT_SERVER_CLOSE_TIME = + "Operation close time after fetching the results" + + val SPARK_CONNECT_SERVER_EXECUTION = + "Difference between start time and finish time" + + val SPARK_CONNECT_SERVER_DURATION = + "Difference between start time and close time" + + val SPARK_CONNECT_SESSION_TOTAL_EXECUTE = + "Number of operations submitted in this session" + + val SPARK_CONNECT_SESSION_DURATION = + "Elapsed time since session start, or until closed if the session was closed" + +} diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala new file mode 100644 index 0000000000000..9292e44f177ba --- /dev/null +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.Properties + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SharedSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENTITY_UPDATE_PERIOD} +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} +import org.apache.spark.sql.connect.service._ +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore + +class SparkConnectServerListenerSuite + extends SparkFunSuite + with BeforeAndAfter + with SharedSparkContext { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } + + Seq(true, false).foreach { live => + test(s"listener events should store successfully (live = $live)") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(live) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "user", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None, + Set())) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis())) + listener.onJobStart( + SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) + listener.onOtherEvent( + SparkListenerConnectOperationFinished( + ExecuteJobTag("sessionId", "userId", "operationId"), + "sessionId", + System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed( + ExecuteJobTag("sessionId", "userId", "operationId"), + "sessionId", + System.currentTimeMillis())) + + if (live) { + assert(statusStore.getOnlineSessionNum === 1) + } + + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + + if (!live) { + // To update history store + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getExecutionList.size === 1) + + val storeExecData = statusStore.getExecutionList.head + + assert(storeExecData.jobTag === ExecuteJobTag("sessionId", "userId", "operationId")) + assert(storeExecData.sessionId === "sessionId") + assert(storeExecData.statement === "dummy query") + assert(storeExecData.jobId === Seq("0")) + assert(listener.noLiveData()) + } + } + + Seq(true, false).foreach { live => + test(s"cleanup session if exceeds the threshold (live = $live)") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(live) + var time = 0 + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId1", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId2", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId1", "userId", time)) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId2", "userId", time)) + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId3", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId3", "userId", time)) + + if (!live) { + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getSessionCount === 1) + assert(statusStore.getSession("sessionId1") === None) + assert(listener.noLiveData()) + } + } + + test("update execution info when jobstart event come after execution end event") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(true) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None, + Set())) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationFinished( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis())) + + listener.onJobStart( + SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + val exec = statusStore.getExecution(ExecuteJobTag("sessionId", "userId", "operationId")) + assert(exec.isDefined) + assert(exec.get.jobId === Seq("0")) + assert(listener.noLiveData()) + } + + test("SPARK-31387 - listener update methods should not throw exception with unknown input") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(true) + + val unknownSession = "unknown_session" + val unknownJob = "unknown_job_tag" + listener.onOtherEvent(SparkListenerConnectSessionClosed(unknownSession, "userId", 0)) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + ExecuteJobTag("sessionId", "userId", "operationId"), + "operationId", + System.currentTimeMillis(), + unknownSession, + "userId", + "userName", + "dummy query", + None, + Set())) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed( + unknownJob, + "operationId", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerConnectOperationCanceled(unknownJob, "userId", 0)) + listener.onOtherEvent( + SparkListenerConnectOperationFailed(unknownJob, "operationId", 0, "msg")) + listener.onOtherEvent(SparkListenerConnectOperationFinished(unknownJob, "operationId", 0)) + listener.onOtherEvent(SparkListenerConnectOperationClosed(unknownJob, "operationId", 0)) + } + + private def createProperties: Properties = { + val properties = new Properties() + properties.setProperty( + SparkContext.SPARK_JOB_TAGS, + ExecuteJobTag("sessionId", "userId", "operationId")) + properties + } + + private def createAppStatusStore(live: Boolean) = { + val sparkConf = new SparkConf() + sparkConf + .set(ASYNC_TRACKING_ENABLED, false) + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + SparkEnv.get.conf + .set(CONNECT_UI_SESSION_LIMIT, 1) + .set(CONNECT_UI_STATEMENT_LIMIT, 10) + kvstore = new ElementTrackingStore(new InMemoryStore, sparkConf) + if (live) { + val listener = new SparkConnectServerListener(kvstore, sparkConf) + (new SparkConnectServerAppStatusStore(kvstore), listener) + } else { + ( + new SparkConnectServerAppStatusStore(kvstore), + new SparkConnectServerListener(kvstore, sparkConf, false)) + } + } +} diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala new file mode 100644 index 0000000000000..99d0a14f1e8df --- /dev/null +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.{Calendar, Locale} +import javax.servlet.http.HttpServletRequest + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.connect.service._ +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore + +class SparkConnectServerPageSuite + extends SparkFunSuite + with BeforeAndAfter + with SharedSparkContext { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } + + /** + * Run a dummy session and return the store + */ + private def getStatusStore: SparkConnectServerAppStatusStore = { + kvstore = new ElementTrackingStore(new InMemoryStore, new SparkConf()) + // val server = mock(classOf[SparkConnectServer], RETURNS_SMART_NULLS) + val sparkConf = new SparkConf + + val listener = new SparkConnectServerListener(kvstore, sparkConf) + val statusStore = new SparkConnectServerAppStatusStore(kvstore) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + "jobTag", + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None, + Set())) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerJobStart(0, System.currentTimeMillis(), Seq())) + listener.onOtherEvent( + SparkListenerConnectOperationFinished("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + + statusStore + } + + test("Spark Connect Server page should load successfully") { + val store = getStatusStore + + val request = mock(classOf[HttpServletRequest]) + val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + val page = new SparkConnectServerPage(tab) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + + // session statistics and sql statistics tables should load successfully + assert(html.contains("session statistics (1)")) + assert(html.contains("request statistics (1)")) + assert(html.contains("dummy query")) + + // Pagination support + assert(html.contains("")) + + // Hiding table support + assert( + html.contains("class=\"collapse-aggregated-sessionstat" + + " collapse-table\" onclick=\"collapsetable")) + } + + test("Spark Connect Server session page should load successfully") { + val store = getStatusStore + + val request = mock(classOf[HttpServletRequest]) + when(request.getParameter("id")).thenReturn("sessionId") + val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + val page = new SparkConnectServerSessionPage(tab) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + + // session sql statistics table should load successfully + assert(html.contains("request statistics")) + assert(html.contains("userid")) + assert(html.contains("jobtag")) + + // Pagination support + assert(html.contains("")) + + // Hiding table support + assert( + html.contains("collapse-aggregated-sqlsessionstat collapse-table\"" + + " onclick=\"collapsetable")) + } +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f48cb32b31974..533ce28ab4d2a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging { postApplicationStart() // After application started, attach handlers to started server and start handler. - _ui.foreach(_.attachAllHandler()) + _ui.foreach(_.attachAllHandlers()) // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ac154b7938565..685407c11208f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -67,15 +67,31 @@ private[spark] class SparkUI private ( createServletHandler("/", servlet, basePath) } + private var readyToAttachHandlers = false + /** * Attach all existing handlers to ServerInfo. */ - def attachAllHandler(): Unit = { + def attachAllHandlers(): Unit = { + // Attach all handlers that have been added already, but not yet attached. serverInfo.foreach { server => server.removeHandler(initHandler) handlers.foreach(server.addHandler(_, securityManager)) } + // Handlers attached after this can be directly started. + readyToAttachHandlers = true } + + /** Attaches a handler to this UI. + * Note: The handler will not be attached until readyToAttachHandlers is true, + * handlers added before that will be attached by attachAllHandlers */ + override def attachHandler(handler: ServletContextHandler): Unit = synchronized { + handlers += handler + if (readyToAttachHandlers) { + serverInfo.foreach(_.addHandler(handler, securityManager)) + } + } + /** Initialize all components of the server. */ def initialize(): Unit = { val jobsTab = new JobsTab(this, store) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 352c51baa8ca6..e7d57a6e6def9 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -442,7 +442,7 @@ class UISuite extends SparkFunSuite { sparkUI.bind() assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")) === "Spark is starting up. Please wait a while until it's ready.") - sparkUI.attachAllHandler() + sparkUI.attachAllHandlers() assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName)) sparkUI.stop() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala index a127fa59b7433..76f64dcb64451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala @@ -39,5 +39,5 @@ class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin { } } - override def displayOrder: Int = 1 + override def displayOrder: Int = 2 }