From cf3a49e2ce59a3636462e4c2a03ce229d52c27bb Mon Sep 17 00:00:00 2001 From: Jason Li Date: Thu, 27 Jul 2023 10:29:30 -0700 Subject: [PATCH] Add a Spark UI page for Spark Connect --- .../spark/sql/connect/common/ProtoUtils.scala | 2 +- .../sql/connect/SparkConnectPlugin.scala | 2 +- .../spark/sql/connect/config/Connect.scala | 13 + .../execution/ExecuteThreadRunner.scala | 15 +- .../sql/connect/service/ExecuteHolder.scala | 39 +- .../connect/service/SparkConnectServer.scala | 3 +- .../connect/service/SparkConnectService.scala | 27 +- .../ui/SparkConnectServerAppStatusStore.scala | 130 +++++ ...parkConnectServerHistoryServerPlugin.scala | 41 ++ .../ui/SparkConnectServerListener.scala | 369 ++++++++++++ .../connect/ui/SparkConnectServerPage.scala | 529 ++++++++++++++++++ .../ui/SparkConnectServerSessionPage.scala | 119 ++++ .../connect/ui/SparkConnectServerTab.scala | 56 ++ .../spark/sql/connect/ui/ToolTips.scala | 39 ++ .../ui/SparkConnectServerListenerSuite.scala | 214 +++++++ .../ui/SparkConnectServerPageSuite.scala | 131 +++++ 16 files changed, 1715 insertions(+), 14 deletions(-) create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala create mode 100644 connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala create mode 100644 connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala index e2934b5674495..99684eef7d7dd 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala @@ -93,7 +93,7 @@ private[connect] object ProtoUtils { def throwIfInvalidTag(tag: String): Unit = { // Same format rules apply to Spark Connect execution tags as to SparkContext job tags, // because the Spark Connect job tag is also used as part of SparkContext job tag. - // See SparkContext.throwIfInvalidTag and ExecuteHolder.tagToSparkJobTag + // See SparkContext.throwIfInvalidTag and ExecuteHolderSessionTag if (tag == null) { throw new IllegalArgumentException("Spark Connect tag cannot be null.") } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala index bb694a7679890..ca8617cbe1a74 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala @@ -45,7 +45,7 @@ class SparkConnectPlugin extends SparkPlugin { override def init( sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { - SparkConnectService.start() + SparkConnectService.start(sc) Map.empty[String, String].asJava } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 31f119047e4cc..23aa42bad3044 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -121,4 +121,17 @@ object Connect { .version("3.5.0") .booleanConf .createWithDefault(false) + + val CONNECT_UI_STATEMENT_LIMIT = + ConfigBuilder("spark.sql.connect.ui.retainedStatements") + .doc("The number of statements kept in the Spark Connect UI history.") + .version("3.5.0") + .intConf + .createWithDefault(200) + + val CONNECT_UI_SESSION_LIMIT = ConfigBuilder("spark.sql.connect.ui.retainedSessions") + .doc("The number of client sessions kept in the Spark Connect UI history.") + .version("3.5.0") + .intConf + .createWithDefault(200) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 6758df0d7e6d7..dad84180a0ff2 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -27,7 +27,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.planner.SparkConnectPlanner -import org.apache.spark.sql.connect.service.ExecuteHolder +import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag} import org.apache.spark.sql.connect.utils.ErrorUtils import org.apache.spark.util.Utils @@ -95,8 +95,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends } finally { executeHolder.sessionHolder.session.sparkContext.removeJobTag(executeHolder.jobTag) executeHolder.sparkSessionTags.foreach { tag => - executeHolder.sessionHolder.session.sparkContext - .removeJobTag(executeHolder.tagToSparkJobTag(tag)) + executeHolder.sessionHolder.session.sparkContext.removeJobTag( + ExecuteSessionTag( + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + tag)) } } } catch { @@ -128,7 +131,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends session.sparkContext.addJobTag(executeHolder.jobTag) // Also set all user defined tags as Spark Job tags. executeHolder.sparkSessionTags.foreach { tag => - session.sparkContext.addJobTag(executeHolder.tagToSparkJobTag(tag)) + session.sparkContext.addJobTag( + ExecuteSessionTag( + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + tag)) } session.sparkContext.setJobDescription( s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 36c96b2617fb7..7ad68d06f9622 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -38,11 +38,7 @@ private[connect] class ExecuteHolder( * Tag that is set for this execution on SparkContext, via SparkContext.addJobTag. Used * (internally) for cancallation of the Spark Jobs ran by this execution. */ - val jobTag = - s"SparkConnect_Execute_" + - s"User_${sessionHolder.userId}_" + - s"Session_${sessionHolder.sessionId}_" + - s"Operation_${operationId}" + val jobTag = ExecuteJobTag(sessionHolder.userId, sessionHolder.sessionId, operationId) /** * Tags set by Spark Connect client users via SparkSession.addTag. Used to identify and group @@ -122,3 +118,36 @@ private[connect] class ExecuteHolder( s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}" } } + +/** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */ +object ExecuteJobTag { + private val prefix = "SparkConnect_OperationTag" + + def apply(sessionId: String, userId: String, operationId: String): String = { + s"${prefix}_" + + s"User_${userId}_" + + s"Session_${sessionId}_" + + s"Operation_${operationId}" + } + + def unapply(jobTag: String): Option[String] = { + if (jobTag.startsWith(prefix)) Some(jobTag) else None + } +} + +/** Used to identify ExecuteHolder sessionTag among SparkContext.SPARK_JOB_TAGS. */ +object ExecuteSessionTag { + private val prefix = "SparkConnect_SessionTag" + + def apply(userId: String, sessionId: String, tag: String): String = { + ProtoUtils.throwIfInvalidTag(tag) + s"${prefix}_" + + s"User_${userId}_" + + s"Session_${sessionId}_" + + s"Tag_${tag}" + } + + def unapply(sessionTag: String): Option[String] = { + if (sessionTag.startsWith(prefix)) Some(sessionTag) else None + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index 1ed8ee2ff86a7..26c1062bf3437 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -34,7 +34,7 @@ object SparkConnectServer extends Logging { val session = SparkSession.builder.getOrCreate() try { try { - SparkConnectService.start() + SparkConnectService.start(session.sparkContext) SparkConnectService.server.getListenSockets.foreach { sa => val isa = sa.asInstanceOf[InetSocketAddress] logInfo( @@ -49,6 +49,7 @@ object SparkConnectServer extends Logging { SparkConnectService.server.awaitTermination() } finally { session.stop() + SparkConnectService.uiTab.foreach(_.detach()) } } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 87e4f21732f29..b228a2514f857 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -29,13 +29,16 @@ import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver import org.apache.commons.lang3.StringUtils -import org.apache.spark.{SparkEnv, SparkSQLException} +import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} +import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore, SparkConnectServerListener, SparkConnectServerTab} import org.apache.spark.sql.connect.utils.ErrorUtils +import org.apache.spark.status.ElementTrackingStore /** * The SparkConnectService implementation. @@ -181,6 +184,9 @@ object SparkConnectService extends Logging { private[connect] var server: Server = _ + private[connect] var uiTab: Option[SparkConnectServerTab] = None + private[connect] var listener: SparkConnectServerListener = _ + // For testing purpose, it's package level private. private[connect] def localPort: Int = { assert(server != null) @@ -251,6 +257,20 @@ object SparkConnectService extends Logging { SparkSession.active.newSession() } + private def createListenerAndUI(sc: SparkContext): Unit = { + val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] + listener = new SparkConnectServerListener(kvStore, sc.conf) + sc.listenerBus.addToStatusQueue(listener) + uiTab = if (sc.getConf.get(UI_ENABLED)) { + Some( + new SparkConnectServerTab( + new SparkConnectServerAppStatusStore(kvStore), + SparkConnectServerTab.getSparkUI(sc))) + } else { + None + } + } + /** * Starts the GRPC Serivce. */ @@ -275,13 +295,15 @@ object SparkConnectService extends Logging { if (debugMode) { sb.addService(ProtoReflectionService.newInstance()) } + server = sb.build server.start() } // Starts the service - def start(): Unit = { + def start(sc: SparkContext): Unit = { startGRPCService() + createListenerAndUI(sc) } def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit = { @@ -294,6 +316,7 @@ object SparkConnectService extends Logging { } } userSessionMapping.invalidateAll() + uiTab.foreach(_.detach()) } def extractErrorMessage(st: Throwable): String = { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala new file mode 100644 index 0000000000000..0d3060e42c3ad --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerAppStatusStore.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import scala.collection.mutable.ArrayBuffer + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.status.KVUtils +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.{KVIndex, KVStore} + +class SparkConnectServerAppStatusStore(store: KVStore) { + def getSessionList: Seq[SessionInfo] = { + KVUtils.viewToSeq(store.view(classOf[SessionInfo])) + } + + def getExecutionList: Seq[ExecutionInfo] = { + KVUtils.viewToSeq(store.view(classOf[ExecutionInfo])) + } + + def getOnlineSessionNum: Int = { + KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0) + } + + def getSession(sessionId: String): Option[SessionInfo] = { + try { + Some(store.read(classOf[SessionInfo], sessionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def getExecution(executionId: String): Option[ExecutionInfo] = { + try { + Some(store.read(classOf[ExecutionInfo], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = { + KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive) + } + + def getSessionCount: Long = { + store.count(classOf[SessionInfo]) + } + + def getExecutionCount: Long = { + store.count(classOf[ExecutionInfo]) + } +} + +private[connect] class SessionInfo( + @KVIndexParam val sessionId: String, + val startTimestamp: Long, + val userId: String, + val finishTimestamp: Long, + val totalExecution: Long) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L) finishTimestamp else -1L + def totalTime: Long = { + if (finishTimestamp == 0L) { + System.currentTimeMillis - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +private[connect] class ExecutionInfo( + @KVIndexParam val jobTag: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userId: String, + val finishTimestamp: Long, + val closeTimestamp: Long, + val executePlan: String, + val detail: String, + val state: ExecutionState.Value, + val jobId: ArrayBuffer[String], + val sqlExecId: ArrayBuffer[String]) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) { + finishTimestamp + } else -1L + + @JsonIgnore @KVIndex("isExecutionActive") + def isExecutionActive: Boolean = { + !(state == ExecutionState.FAILED || + state == ExecutionState.CANCELED || + state == ExecutionState.TIMEDOUT || + state == ExecutionState.CLOSED) + } + + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { + System.currentTimeMillis - startTimestamp + } else { + endTime - startTimestamp + } + } +} + +private[connect] object ExecutionState extends Enumeration { + val STARTED, COMPILED, READY, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value + type ExecutionState = Value +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala new file mode 100644 index 0000000000000..06d470db6ec47 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerHistoryServerPlugin.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore} +import org.apache.spark.ui.SparkUI + +class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin { + + override def createListeners( + conf: SparkConf, + store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SparkConnectServerListener(store, conf)) + } + + override def setupUI(ui: SparkUI): Unit = { + val store = new SparkConnectServerAppStatusStore(ui.store.store) + if (store.getSessionCount > 0) { + new SparkConnectServerTab(store, ui) + } + } + + override def displayOrder: Int = 1 +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala new file mode 100644 index 0000000000000..2e8761b9f3bb7 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD +import org.apache.spark.scheduler._ +import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} +import org.apache.spark.sql.connect.service._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} + +private[connect] class SparkConnectServerListener( + kvstore: ElementTrackingStore, + sparkConf: SparkConf, + live: Boolean = true) + extends SparkListener + with Logging { + + private val sessionList = new mutable.LinkedHashMap[String, LiveSessionData] + private val executionList = new mutable.LinkedHashMap[String, LiveExecutionData] + + private val (retainedStatements: Int, retainedSessions: Int) = { + ( + SparkEnv.get.conf.get(CONNECT_UI_STATEMENT_LIMIT), + SparkEnv.get.conf.get(CONNECT_UI_SESSION_LIMIT)) + } + + // How often to update live entities. -1 means "never update" when replaying applications, + // meaning only the last write will happen. For live applications, this avoids a few + // operations that we can live without when rapidly processing incoming events. + private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Returns true if this listener has no live data. Exposed for tests only. + private[connect] def noLiveData(): Boolean = synchronized { + sessionList.isEmpty && executionList.isEmpty + } + + kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count => + cleanupSession(count) + } + + kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count => + cleanupExecutions(count) + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobTags = Option(jobStart.properties) + .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_TAGS)) } + .map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet) + .getOrElse(Set()) + .toSeq + .filter(!_.isEmpty) + .sorted + val executeJobTagOpt = jobTags.find { + case ExecuteJobTag(_) => true + case _ => false + } + if (executeJobTagOpt.isEmpty) { + return + } + val executeJobTag = executeJobTagOpt.get + val exec = executionList.get(executeJobTag) + val executionIdOpt: Option[String] = Option(jobStart.properties) + .flatMap { p => Option(p.getProperty(SQLExecution.EXECUTION_ID_KEY)) } + if (exec.nonEmpty) { + exec.foreach { exec => + exec.jobId += jobStart.jobId.toString + executionIdOpt.foreach { execId => exec.sqlExecId += execId } + updateLiveStore(exec) + } + } else { + // It may possible that event reordering happens, such a way that JobStart event come after + // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in + // Spark Connect Server, following code will take care. Here will come only if JobStart + // event comes after Execution End event. + val storeExecInfo = + KVUtils.viewToSeq(kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(exec => + exec.jobTag == executeJobTag) + storeExecInfo.foreach { exec => + val liveExec = getOrCreateExecution( + exec.jobTag, + exec.statement, + exec.sessionId, + exec.startTimestamp, + exec.userId) + liveExec.jobId += jobStart.jobId.toString + executionIdOpt.foreach { execId => exec.sqlExecId += execId } + updateStoreWithTriggerEnabled(liveExec) + executionList.remove(liveExec.jobTag) + } + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SparkListenerConnectOperationStarted => onOperationStarted(e) + case e: SparkListenerConnectOperationAnalyzed => onOperationAnalyzed(e) + case e: SparkListenerConnectOperationReadyForExecution => onOperationReadyForExecution(e) + case e: SparkListenerConnectOperationCanceled => onOperationCanceled(e) + case e: SparkListenerConnectOperationFailed => onOperationFailed(e) + case e: SparkListenerConnectOperationFinished => onOperationFinished(e) + case e: SparkListenerConnectOperationClosed => onOperationClosed(e) + case e: SparkListenerConnectSessionStarted => onSessionStarted(e) + case e: SparkListenerConnectSessionClosed => onSessionClosed(e) + case _ => // Ignore + } + } + + private def onOperationStarted(e: SparkListenerConnectOperationStarted) = synchronized { + val executionData = getOrCreateExecution( + e.jobTag, + e.statementText, + e.sessionId, + e.eventTime, + e.userId, + e.extraTags) + executionData.state = ExecutionState.STARTED + executionList.put(e.jobTag, executionData) + updateLiveStore(executionData) + sessionList.get(e.sessionId) match { + case Some(sessionData) => + sessionData.totalExecution += 1 + updateLiveStore(sessionData) + case None => + logWarning( + s"onOperationStart called with unknown session id: ${e.sessionId}." + + s"Regardless, the operation has been registered.") + } + } + + private def onOperationAnalyzed(e: SparkListenerConnectOperationAnalyzed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.state = ExecutionState.COMPILED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationAnalyzed called with unknown operation id: ${e.jobTag}") + } + } + + private def onOperationReadyForExecution( + e: SparkListenerConnectOperationReadyForExecution): Unit = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.state = ExecutionState.READY + updateLiveStore(executionData) + case None => + logWarning(s"onOperationReadyForExectuion called with unknown operation id: ${e.jobTag}") + } + } + + private def onOperationCanceled(e: SparkListenerConnectOperationCanceled) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.state = ExecutionState.CANCELED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationCanceled called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationFailed(e: SparkListenerConnectOperationFailed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.detail = e.errorMessage + executionData.state = ExecutionState.FAILED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationFailed called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationFinished(e: SparkListenerConnectOperationFinished) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.finishTimestamp = e.eventTime + executionData.state = ExecutionState.FINISHED + updateLiveStore(executionData) + case None => + logWarning(s"onOperationFinished called with unknown operation id: ${e.jobTag}") + } + } + private def onOperationClosed(e: SparkListenerConnectOperationClosed) = synchronized { + executionList.get(e.jobTag) match { + case Some(executionData) => + executionData.closeTimestamp = e.eventTime + executionData.state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionData) + executionList.remove(e.jobTag) + case None => + logWarning(s"onOperationClosed called with unknown operation id: ${e.jobTag}") + } + } + + private def onSessionStarted(e: SparkListenerConnectSessionStarted) = synchronized { + val session = getOrCreateSession(e.sessionId, e.userId, e.eventTime, e.extraTags) + sessionList.put(e.sessionId, session) + updateLiveStore(session) + } + + private def onSessionClosed(e: SparkListenerConnectSessionClosed) = synchronized { + sessionList.get(e.sessionId) match { + case Some(sessionData) => + sessionData.finishTimestamp = e.eventTime + updateStoreWithTriggerEnabled(sessionData) + sessionList.remove(e.sessionId) + + case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") + } + } + + // Update both live and history stores. Trigger is enabled by default, hence + // it will cleanup the entity which exceeds the threshold. + def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = synchronized { + entity.write(kvstore, System.nanoTime(), checkTriggers = true) + } + + // Update only live stores. If trigger is enabled, it will cleanup entity + // which exceeds the threshold. + def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = synchronized { + val now = System.nanoTime() + if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + entity.write(kvstore, now, checkTriggers = trigger) + } + } + + private def getOrCreateSession( + sessionId: String, + userName: String, + startTime: Long, + extraTags: Map[String, String] = Map.empty): LiveSessionData = synchronized { + sessionList.getOrElseUpdate( + sessionId, + new LiveSessionData(sessionId, startTime, userName, extraTags)) + } + + private def getOrCreateExecution( + jobTag: String, + statement: String, + sessionId: String, + startTimestamp: Long, + userId: String, + extraTags: Map[String, String] = Map.empty): LiveExecutionData = synchronized { + executionList.getOrElseUpdate( + jobTag, + new LiveExecutionData(jobTag, statement, sessionId, startTimestamp, userId, extraTags)) + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedStatements) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0 + } + toDelete.foreach { j => kvstore.delete(j.getClass, j.jobTag) } + } + + private def cleanupSession(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedSessions) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0L + } + + toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) } + } + + /** + * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done + * asynchronously, this method may return 0 in case enough items have been deleted already. + */ + private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { + if (dataSize > retainedSize) { + math.max(retainedSize / 10L, dataSize - retainedSize) + } else { + 0L + } + } +} + +private[connect] class LiveExecutionData( + val jobTag: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userId: String, + val extraTags: Map[String, String] = Map.empty) + extends LiveEntity { + + var finishTimestamp: Long = 0L + var closeTimestamp: Long = 0L + var executePlan: String = "" + var detail: String = "" + var state: ExecutionState.Value = ExecutionState.STARTED + val jobId: ArrayBuffer[String] = ArrayBuffer[String]() + var sqlExecId: ArrayBuffer[String] = ArrayBuffer[String]() + + override protected def doUpdate(): Any = { + new ExecutionInfo( + jobTag, + statement, + sessionId, + startTimestamp, + userId, + finishTimestamp, + closeTimestamp, + executePlan, + detail, + state, + jobId, + sqlExecId) + } + + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { + System.currentTimeMillis - startTimestamp + } else { + endTime - startTimestamp + } + } +} + +private[connect] class LiveSessionData( + val sessionId: String, + val startTimestamp: Long, + val userName: String, + val extraTags: Map[String, String] = Map.empty) + extends LiveEntity { + + var finishTimestamp: Long = 0L + var totalExecution: Int = 0 + + override protected def doUpdate(): Any = { + new SessionInfo(sessionId, startTimestamp, userName, finishTimestamp, totalExecution) + } + def totalTime: Long = { + if (finishTimestamp == 0L) { + System.currentTimeMillis - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala new file mode 100644 index 0000000000000..d67b21c9c0793 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPage.scala @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.commons.text.StringEscapeUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connect.ui.ToolTips._ +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Utils + +/** Page for Spark UI that shows statistics for a Spark Connect Server. */ +private[ui] class SparkConnectServerPage(parent: SparkConnectServerTab) + extends WebUIPage("") + with Logging { + private val store = parent.store + private val startTime = parent.startTime + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = store.synchronized { // make sure all parts in this page are consistent + generateBasicStats() ++ +
++ +

+ {store.getOnlineSessionNum} + session(s) are online, + running + {store.getTotalRunning} + Request(s) +

++ + generateSessionStatsTable(request) ++ + generateSQLStatsTable(request) + } + UIUtils.headerSparkPage(request, "Spark Connect", content, parent) + } + + /** Generate basic stats of the Spark Connect server */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + + } + + /** Generate stats of batch statements of the Spark Connect program */ + private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { + + val numStatement = store.getExecutionList.size + + val table = if (numStatement > 0) { + + val sqlTableTag = "sqlstat" + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SqlStatsPagedTable( + request, + parent, + store.getExecutionList, + "connect", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag, + showSessionLink = true).table(sqlTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + val content = + +

+ + Request Statistics ({numStatement}) +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ content + } + + /** Generate stats of batch sessions of the Spark Connect server */ + private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { + val numSessions = store.getSessionList.size + val table = if (numSessions > 0) { + + val sessionTableTag = "sessionstat" + + val sessionTablePage = + Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SessionStatsPagedTable( + request, + parent, + store.getSessionList, + "connect", + UIUtils.prependBaseUri(request, parent.basePath), + sessionTableTag).table(sessionTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + + val content = + +

+ + Session Statistics ({numSessions}) +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } +} + +private[ui] class SqlStatsPagedTable( + request: HttpServletRequest, + parent: SparkConnectServerTab, + data: Seq[ExecutionInfo], + subPath: String, + basePath: String, + sqlStatsTableTag: String, + showSessionLink: Boolean) + extends PagedTable[SqlStatsTableRow] { + private val (sortColumn, desc, pageSize) = + getTableParameters(request, sqlStatsTableTag, "Start Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getParameterOtherTable(request, sqlStatsTableTag)}" + + override val dataSource = new SqlStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sqlStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sqlStatsTableTag" + } + + override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sqlStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" + + s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag" + + override def headers: Seq[Node] = { + val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + if (showSessionLink) { + Seq( + ("User", true, None), + ("Job Tag", true, None), + ("Job ID", true, None), + ("SQL Query ID", true, None), + ("Session ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)), + ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)), + ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)), + ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)), + ("Statement", true, None), + ("State", true, None), + ("Detail", true, None)) + } else { + Seq( + ("User", true, None), + ("Job Tag", true, None), + ("Job ID", true, None), + ("SQL Query ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, Some(SPARK_CONNECT_SERVER_FINISH_TIME)), + ("Close Time", true, Some(SPARK_CONNECT_SERVER_CLOSE_TIME)), + ("Execution Time", true, Some(SPARK_CONNECT_SERVER_EXECUTION)), + ("Duration", true, Some(SPARK_CONNECT_SERVER_DURATION)), + ("Statement", true, None), + ("State", true, None), + ("Detail", true, None)) + } + + isSortColumnValid(sqlTableHeadersAndTooltips, sortColumn) + + headerRow( + sqlTableHeadersAndTooltips, + desc, + pageSize, + sortColumn, + parameterPath, + sqlStatsTableTag, + sqlStatsTableTag) + } + + override def row(sqlStatsTableRow: SqlStatsTableRow): Seq[Node] = { + val info = sqlStatsTableRow.executionInfo + val startTime = info.startTimestamp + val executionTime = sqlStatsTableRow.executionTime + val duration = sqlStatsTableRow.duration + + def jobLinks(jobData: Seq[String]): Seq[Node] = { + jobData.map { jobId => + [{jobId}] + } + } + def sqlLinks(sqlData: Seq[String]): Seq[Node] = { + sqlData.map { sqlExecId => + [{sqlExecId}] + } + } + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), + parent.prefix, + info.sessionId) + + + + {info.userId} + + + {info.jobTag} + + + {jobLinks(sqlStatsTableRow.jobId)} + + + {sqlLinks({ info.sqlExecId })} + + { + if (showSessionLink) { + + {info.sessionId} + + } + } + + {UIUtils.formatDate(startTime)} + + + {if (info.finishTimestamp > 0) formatDate(info.finishTimestamp)} + + + {if (info.closeTimestamp > 0) formatDate(info.closeTimestamp)} + + + + {formatDurationVerbose(executionTime)} + + + {formatDurationVerbose(duration)} + + + + {info.statement} + + + + {info.state} + + {errorMessageCell(Option(sqlStatsTableRow.detail))} + + } + + private def errorMessageCell(errorMessageOption: Option[String]): Seq[Node] = { + val errorMessage = errorMessageOption.getOrElse("") + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4(if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = detailsUINode(isMultiline, errorMessage) + + {errorSummary}{details} + + } + + private def jobURL(request: HttpServletRequest, jobId: String): String = + "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) + + private def sqlURL(request: HttpServletRequest, sqlExecId: String): String = + "%s/SQL/execution/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), sqlExecId) +} + +private[ui] class SessionStatsPagedTable( + request: HttpServletRequest, + parent: SparkConnectServerTab, + data: Seq[SessionInfo], + subPath: String, + basePath: String, + sessionStatsTableTag: String) + extends PagedTable[SessionInfo] { + + private val (sortColumn, desc, pageSize) = + getTableParameters(request, sessionStatsTableTag, "Start Time") + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val parameterPath = + s"$basePath/$subPath/?${getParameterOtherTable(request, sessionStatsTableTag)}" + + override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc) + + override def tableId: String = sessionStatsTableTag + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$sessionStatsTableTag" + } + + override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize" + + override def pageNumberFormField: String = s"$sessionStatsTableTag.page" + + override def goButtonFormPath: String = + s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn" + + s"&$sessionStatsTableTag.desc=$desc#$sessionStatsTableTag" + + override def headers: Seq[Node] = { + val sessionTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] = + Seq( + ("User", true, None), + ("Session ID", true, None), + ("Start Time", true, None), + ("Finish Time", true, None), + ("Duration", true, Some(SPARK_CONNECT_SESSION_DURATION)), + ("Total Execute", true, Some(SPARK_CONNECT_SESSION_TOTAL_EXECUTE))) + + isSortColumnValid(sessionTableHeadersAndTooltips, sortColumn) + + headerRow( + sessionTableHeadersAndTooltips, + desc, + pageSize, + sortColumn, + parameterPath, + sessionStatsTableTag, + sessionStatsTableTag) + } + + override def row(session: SessionInfo): Seq[Node] = { + val sessionLink = "%s/%s/session/?id=%s".format( + UIUtils.prependBaseUri(request, parent.basePath), + parent.prefix, + session.sessionId) + + {session.userId} + {session.sessionId} + {formatDate(session.startTimestamp)} + {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} + {formatDurationVerbose(session.totalTime)} + {session.totalExecution.toString} + + } +} + +private[ui] class SqlStatsTableRow( + val jobTag: String, + val jobId: Seq[String], + val sqlExecId: Seq[String], + val duration: Long, + val executionTime: Long, + val executionInfo: ExecutionInfo, + val detail: String) + +private[ui] class SqlStatsTableDataSource( + info: Seq[ExecutionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[SqlStatsTableRow](pageSize) { + + // Convert ExecutionInfo to SqlStatsTableRow which contains the final contents to show in + // the table so that we can avoid creating duplicate contents during sorting the data + private val data = info.map(sqlStatsTableRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = data.slice(from, to) + + private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow = { + val duration = executionInfo.totalTime(executionInfo.closeTimestamp) + val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp) + val detail = Option(executionInfo.detail) + .filter(!_.isEmpty) + .getOrElse(executionInfo.executePlan) + val jobId = executionInfo.jobId.toSeq.sorted + val sqlExecId = executionInfo.sqlExecId.toSeq.sorted + + new SqlStatsTableRow( + executionInfo.jobTag, + jobId, + sqlExecId, + duration, + executionTime, + executionInfo, + detail) + } + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SqlStatsTableRow] = { + val ordering: Ordering[SqlStatsTableRow] = sortColumn match { + case "User" => Ordering.by(_.executionInfo.userId) + case "Job Tag" => Ordering.by(_.executionInfo.jobTag) + case "Job ID" => Ordering by (_.jobId.headOption) + case "SQL Query ID" => Ordering by (_.sqlExecId.headOption) + case "Session ID" => Ordering.by(_.executionInfo.sessionId) + case "Start Time" => Ordering.by(_.executionInfo.startTimestamp) + case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp) + case "Close Time" => Ordering.by(_.executionInfo.closeTimestamp) + case "Execution Time" => Ordering.by(_.executionTime) + case "Duration" => Ordering.by(_.duration) + case "Statement" => Ordering.by(_.executionInfo.statement) + case "State" => Ordering.by(_.executionInfo.state) + case "Detail" => Ordering.by(_.detail) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } +} + +private[ui] class SessionStatsTableDataSource( + info: Seq[SessionInfo], + pageSize: Int, + sortColumn: String, + desc: Boolean) + extends PagedDataSource[SessionInfo](pageSize) { + + // Sorting SessionInfo data + private val data = info.sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[SessionInfo] = data.slice(from, to) + + /** + * Return Ordering according to sortColumn and desc. + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionInfo] = { + val ordering: Ordering[SessionInfo] = sortColumn match { + case "User" => Ordering.by(_.userId) + case "Session ID" => Ordering.by(_.sessionId) + case "Start Time" => Ordering by (_.startTimestamp) + case "Finish Time" => Ordering.by(_.finishTimestamp) + case "Duration" => Ordering.by(_.totalTime) + case "Total Execute" => Ordering.by(_.totalExecution) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala new file mode 100644 index 0000000000000..37a2899e0fbd3 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.internal.Logging +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Utils + +/** Page for Spark UI that contains information pertaining to a single Spark Connect session */ +private[ui] class SparkConnectServerSessionPage(parent: SparkConnectServerTab) + extends WebUIPage("session") + with Logging { + val store = parent.store + private val startTime = parent.startTime + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val content = store.synchronized { // make sure all parts in this page are consistent + val sessionStat = store.getSession(parameterId).orNull + require(sessionStat != null, "Invalid sessionID[" + parameterId + "]") + + generateBasicStats() ++ +
++ +

+ User {sessionStat.userId}, + Session created at {formatDate(sessionStat.startTimestamp)}, + Total run {sessionStat.totalExecution} Request(s) +

++ + generateSQLStatsTable(request, sessionStat.sessionId) + } + UIUtils.headerSparkPage(request, "Spark Connect Session", content, parent) + } + + /** Generate basic stats of the Spark Connect Server */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + + } + + /** Generate stats of batch statements of the Spark Connect server */ + private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { + val executionList = store.getExecutionList + .filter(_.sessionId == sessionID) + val numStatement = executionList.size + val table = if (numStatement > 0) { + + val sqlTableTag = "sqlsessionstat" + + val sqlTablePage = + Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1) + + try { + Some( + new SqlStatsPagedTable( + request, + parent, + executionList, + "connect/session", + UIUtils.prependBaseUri(request, parent.basePath), + sqlTableTag, + showSessionLink = false).table(sqlTablePage)) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => + Some(
+

Error while rendering job table:

+
+              {Utils.exceptionString(e)}
+            
+
) + } + } else { + None + } + val content = + +

+ + Request Statistics +

+
++ +
+ {table.getOrElse("No statistics have been generated yet.")} +
+ + content + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala new file mode 100644 index 0000000000000..9830717b7c3c6 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.Date + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[connect] class SparkConnectServerTab( + val store: SparkConnectServerAppStatusStore, + sparkUI: SparkUI) + extends SparkUITab(sparkUI, "connect") + with Logging { + override val name = "Connect" + + val parent = sparkUI + val startTime = + try { + sparkUI.store.applicationInfo().attempts.head.startTime + } catch { + case _: NoSuchElementException => new Date(System.currentTimeMillis()) + } + + attachPage(new SparkConnectServerPage(this)) + attachPage(new SparkConnectServerSessionPage(this)) + parent.attachTab(this) + def detach(): Unit = { + parent.detachTab(this) + } +} + +private[connect] object SparkConnectServerTab { + def getSparkUI(sparkContext: SparkContext): SparkUI = { + sparkContext.ui.getOrElse { + throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError() + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala new file mode 100644 index 0000000000000..9b51ace83c6c1 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +private[ui] object ToolTips { + val SPARK_CONNECT_SERVER_FINISH_TIME = + "Execution finish time, before fetching the results" + + val SPARK_CONNECT_SERVER_CLOSE_TIME = + "Operation close time after fetching the results" + + val SPARK_CONNECT_SERVER_EXECUTION = + "Difference between start time and finish time" + + val SPARK_CONNECT_SERVER_DURATION = + "Difference between start time and close time" + + val SPARK_CONNECT_SESSION_TOTAL_EXECUTE = + "Number of operations submitted in this session" + + val SPARK_CONNECT_SESSION_DURATION = + "Elapsed time since session start, or until closed if the session was closed" + +} diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala new file mode 100644 index 0000000000000..48f4c59106eef --- /dev/null +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.Properties + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SharedSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENTITY_UPDATE_PERIOD} +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} +import org.apache.spark.sql.connect.service._ +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore + +class SparkConnectServerListenerSuite + extends SparkFunSuite + with BeforeAndAfter + with SharedSparkContext { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } + + Seq(true, false).foreach { live => + test(s"listener events should store successfully (live = $live)") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(live) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "user", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + "jobTag", + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None)) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed( + "jobTag", + "operationId", + System.currentTimeMillis())) + listener.onJobStart( + SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) + listener.onOtherEvent( + SparkListenerConnectOperationFinished("jobTag", "sessionId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed("jobTag", "sessionId", System.currentTimeMillis())) + + if (live) { + assert(statusStore.getOnlineSessionNum === 1) + } + + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + + if (!live) { + // To update history store + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getExecutionList.size === 1) + + val storeExecData = statusStore.getExecutionList.head + + assert(storeExecData.jobTag === "jobTag") + assert(storeExecData.sessionId === "sessionId") + assert(storeExecData.statement === "dummy query") + assert(storeExecData.jobId === Seq("0")) + assert(listener.noLiveData()) + } + } + + Seq(true, false).foreach { live => + test(s"cleanup session if exceeds the threshold (live = $live)") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(live) + var time = 0 + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId1", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId2", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId1", "userId", time)) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId2", "userId", time)) + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId3", "user", System.currentTimeMillis())) + time += 1 + listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId3", "userId", time)) + + if (!live) { + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getSessionCount === 1) + assert(statusStore.getSession("sessionId1") === None) + assert(listener.noLiveData()) + } + } + + test("update execution info when jobstart event come after execution end event") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(true) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + "jobTag", + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None)) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationFinished("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed("jobTag", "operationId", System.currentTimeMillis())) + + listener.onJobStart( + SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + val exec = statusStore.getExecution("jobTag") + assert(exec.isDefined) + assert(exec.get.jobId === Seq("0")) + assert(listener.noLiveData()) + } + + test("SPARK-31387 - listener update methods should not throw exception with unknown input") { + val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = + createAppStatusStore(true) + + val unknownSession = "unknown_session" + val unknownJob = "unknown_job_tag" + listener.onOtherEvent(SparkListenerConnectSessionClosed(unknownSession, "userId", 0)) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + "jobTag", + "operationId", + System.currentTimeMillis(), + unknownSession, + "userId", + "userName", + "dummy query", + None)) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed( + unknownJob, + "operationId", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerConnectOperationCanceled(unknownJob, "userId", 0)) + listener.onOtherEvent( + SparkListenerConnectOperationFailed(unknownJob, "operationId", 0, "msg")) + listener.onOtherEvent(SparkListenerConnectOperationFinished(unknownJob, "operationId", 0)) + listener.onOtherEvent(SparkListenerConnectOperationClosed(unknownJob, "operationId", 0)) + } + + private def createProperties: Properties = { + val properties = new Properties() + properties.setProperty(SparkContext.SPARK_JOB_TAGS, "jobTag") + properties + } + + private def createAppStatusStore(live: Boolean) = { + val sparkConf = new SparkConf() + sparkConf + .set(ASYNC_TRACKING_ENABLED, false) + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + SparkEnv.get.conf + .set(CONNECT_UI_SESSION_LIMIT, 1) + .set(CONNECT_UI_STATEMENT_LIMIT, 10) + kvstore = new ElementTrackingStore(new InMemoryStore, sparkConf) + if (live) { + val listener = new SparkConnectServerListener(kvstore, sparkConf) + (new SparkConnectServerAppStatusStore(kvstore), listener) + } else { + ( + new SparkConnectServerAppStatusStore(kvstore), + new SparkConnectServerListener(kvstore, sparkConf, false)) + } + } +} diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala new file mode 100644 index 0000000000000..f850e98b5ac64 --- /dev/null +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.ui + +import java.util.{Calendar, Locale} +import javax.servlet.http.HttpServletRequest + +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.connect.service._ +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore + +class SparkConnectPageSuite extends SparkFunSuite with BeforeAndAfter with SharedSparkContext { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } + + /** + * Run a dummy session and return the store + */ + private def getStatusStore: SparkConnectServerAppStatusStore = { + kvstore = new ElementTrackingStore(new InMemoryStore, new SparkConf()) + // val server = mock(classOf[SparkConnectServer], RETURNS_SMART_NULLS) + val sparkConf = new SparkConf + + val listener = new SparkConnectServerListener(kvstore, sparkConf) + val statusStore = new SparkConnectServerAppStatusStore(kvstore) + + listener.onOtherEvent( + SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationStarted( + "jobTag", + "operationId", + System.currentTimeMillis(), + "sessionId", + "userId", + "userName", + "dummy query", + None)) + listener.onOtherEvent( + SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerJobStart(0, System.currentTimeMillis(), Seq())) + listener.onOtherEvent( + SparkListenerConnectOperationFinished("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectOperationClosed("jobTag", "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis())) + + statusStore + } + + test("Spark Connect Server page should load successfully") { + val store = getStatusStore + + val request = mock(classOf[HttpServletRequest]) + val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + val page = new SparkConnectServerPage(tab) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + + // session statistics and sql statistics tables should load successfully + assert(html.contains("session statistics (1)")) + assert(html.contains("request statistics (1)")) + assert(html.contains("dummy query")) + + // Pagination support + assert(html.contains("")) + + // Hiding table support + assert( + html.contains("class=\"collapse-aggregated-sessionstat" + + " collapse-table\" onclick=\"collapsetable")) + } + + test("Spark Connect Server session page should load successfully") { + val store = getStatusStore + + val request = mock(classOf[HttpServletRequest]) + when(request.getParameter("id")).thenReturn("sessionId") + val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) + when(tab.appName).thenReturn("testing") + when(tab.headerTabs).thenReturn(Seq.empty) + val page = new SparkConnectServerSessionPage(tab) + val html = page.render(request).toString().toLowerCase(Locale.ROOT) + + // session sql statistics table should load successfully + assert(html.contains("request statistics")) + assert(html.contains("userid")) + assert(html.contains("jobtag")) + + // Pagination support + assert(html.contains("")) + + // Hiding table support + assert( + html.contains("collapse-aggregated-sqlsessionstat collapse-table\"" + + " onclick=\"collapsetable")) + } +}