Skip to content

Commit

Permalink
[SPARK-44394][CONNECT][WEBUI] Add a Spark UI page for Spark Connect
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Add a new Spark UI page to display session and execution information for Spark Connect. This builds of the work in SPARK-43923 (apache#41443) that adds the relevant SparkListenerEvents and mirrors the ThriftServerPage in the Spark UI for JDBC/ODBC.

<img width="1709" alt="Screenshot 2023-07-27 at 11 29 22 PM" src="https://github.com/apache/spark/assets/65624911/934b7c69-3b44-460b-8fbb-36a9eb3f0798">

<img width="1716" alt="Screenshot 2023-07-27 at 11 29 15 PM" src="https://github.com/apache/spark/assets/65624911/33dbe6ab-44bf-49a5-ad4c-5ba4a476a1f0">

### Why are the changes needed?
This gives users a way to access session and execution information for Spark Connect via the UI and provides the frontend interface for the related SparkListenerEvents.

### Does this PR introduce _any_ user-facing change?
Yes, it will add a new tab/page in the Spark UI

### How was this patch tested?
Unit tests

Closes apache#41964 from jasonli-db/spark-connect-ui.

Authored-by: Jason Li <jason.li@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
jasonli-db authored and gengliangwang committed Jul 29, 2023
1 parent 11b3b23 commit f8786f0
Show file tree
Hide file tree
Showing 20 changed files with 1,789 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[connect] object ProtoUtils {
def throwIfInvalidTag(tag: String): Unit = {
// Same format rules apply to Spark Connect execution tags as to SparkContext job tags,
// because the Spark Connect job tag is also used as part of SparkContext job tag.
// See SparkContext.throwIfInvalidTag and ExecuteHolder.tagToSparkJobTag
// See SparkContext.throwIfInvalidTag and ExecuteHolderSessionTag
if (tag == null) {
throw new IllegalArgumentException("Spark Connect tag cannot be null.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SparkConnectPlugin extends SparkPlugin {
override def init(
sc: SparkContext,
pluginContext: PluginContext): util.Map[String, String] = {
SparkConnectService.start()
SparkConnectService.start(sc)
Map.empty[String, String].asJava
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,17 @@ object Connect {
.version("3.5.0")
.booleanConf
.createWithDefault(false)

val CONNECT_UI_STATEMENT_LIMIT =
ConfigBuilder("spark.sql.connect.ui.retainedStatements")
.doc("The number of statements kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
.createWithDefault(200)

val CONNECT_UI_SESSION_LIMIT = ConfigBuilder("spark.sql.connect.ui.retainedSessions")
.doc("The number of client sessions kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
.createWithDefault(200)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.service.ExecuteHolder
import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag}
import org.apache.spark.sql.connect.utils.ErrorUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -95,8 +95,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
} finally {
executeHolder.sessionHolder.session.sparkContext.removeJobTag(executeHolder.jobTag)
executeHolder.sparkSessionTags.foreach { tag =>
executeHolder.sessionHolder.session.sparkContext
.removeJobTag(executeHolder.tagToSparkJobTag(tag))
executeHolder.sessionHolder.session.sparkContext.removeJobTag(
ExecuteSessionTag(
executeHolder.sessionHolder.userId,
executeHolder.sessionHolder.sessionId,
tag))
}
}
} catch {
Expand Down Expand Up @@ -128,7 +131,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
session.sparkContext.addJobTag(executeHolder.jobTag)
// Also set all user defined tags as Spark Job tags.
executeHolder.sparkSessionTags.foreach { tag =>
session.sparkContext.addJobTag(executeHolder.tagToSparkJobTag(tag))
session.sparkContext.addJobTag(
ExecuteSessionTag(
executeHolder.sessionHolder.userId,
executeHolder.sessionHolder.sessionId,
tag))
}
session.sparkContext.setJobDescription(
s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ private[connect] class ExecuteHolder(
* Tag that is set for this execution on SparkContext, via SparkContext.addJobTag. Used
* (internally) for cancallation of the Spark Jobs ran by this execution.
*/
val jobTag =
s"SparkConnect_Execute_" +
s"User_${sessionHolder.userId}_" +
s"Session_${sessionHolder.sessionId}_" +
s"Operation_${operationId}"
val jobTag = ExecuteJobTag(sessionHolder.userId, sessionHolder.sessionId, operationId)

/**
* Tags set by Spark Connect client users via SparkSession.addTag. Used to identify and group
Expand Down Expand Up @@ -122,3 +118,36 @@ private[connect] class ExecuteHolder(
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}"
}
}

/** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */
object ExecuteJobTag {
private val prefix = "SparkConnect_OperationTag"

def apply(sessionId: String, userId: String, operationId: String): String = {
s"${prefix}_" +
s"User_${userId}_" +
s"Session_${sessionId}_" +
s"Operation_${operationId}"
}

def unapply(jobTag: String): Option[String] = {
if (jobTag.startsWith(prefix)) Some(jobTag) else None
}
}

/** Used to identify ExecuteHolder sessionTag among SparkContext.SPARK_JOB_TAGS. */
object ExecuteSessionTag {
private val prefix = "SparkConnect_SessionTag"

def apply(userId: String, sessionId: String, tag: String): String = {
ProtoUtils.throwIfInvalidTag(tag)
s"${prefix}_" +
s"User_${userId}_" +
s"Session_${sessionId}_" +
s"Tag_${tag}"
}

def unapply(sessionTag: String): Option[String] = {
if (sessionTag.startsWith(prefix)) Some(sessionTag) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SparkConnectServer extends Logging {
val session = SparkSession.builder.getOrCreate()
try {
try {
SparkConnectService.start()
SparkConnectService.start(session.sparkContext)
SparkConnectService.server.getListenSockets.foreach { sa =>
val isa = sa.asInstanceOf[InetSocketAddress]
logInfo(
Expand All @@ -49,6 +49,7 @@ object SparkConnectServer extends Logging {
SparkConnectService.server.awaitTermination()
} finally {
session.stop()
SparkConnectService.uiTab.foreach(_.detach())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.stub.StreamObserver
import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore, SparkConnectServerListener, SparkConnectServerTab}
import org.apache.spark.sql.connect.utils.ErrorUtils
import org.apache.spark.status.ElementTrackingStore

/**
* The SparkConnectService implementation.
Expand Down Expand Up @@ -181,6 +184,9 @@ object SparkConnectService extends Logging {

private[connect] var server: Server = _

private[connect] var uiTab: Option[SparkConnectServerTab] = None
private[connect] var listener: SparkConnectServerListener = _

// For testing purpose, it's package level private.
private[connect] def localPort: Int = {
assert(server != null)
Expand Down Expand Up @@ -251,6 +257,20 @@ object SparkConnectService extends Logging {
SparkSession.active.newSession()
}

private def createListenerAndUI(sc: SparkContext): Unit = {
val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
listener = new SparkConnectServerListener(kvStore, sc.conf)
sc.listenerBus.addToStatusQueue(listener)
uiTab = if (sc.getConf.get(UI_ENABLED)) {
Some(
new SparkConnectServerTab(
new SparkConnectServerAppStatusStore(kvStore),
SparkConnectServerTab.getSparkUI(sc)))
} else {
None
}
}

/**
* Starts the GRPC Serivce.
*/
Expand Down Expand Up @@ -280,8 +300,9 @@ object SparkConnectService extends Logging {
}

// Starts the service
def start(): Unit = {
def start(sc: SparkContext): Unit = {
startGRPCService()
createListenerAndUI(sc)
}

def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit = {
Expand All @@ -294,6 +315,7 @@ object SparkConnectService extends Logging {
}
}
userSessionMapping.invalidateAll()
uiTab.foreach(_.detach())
}

def extractErrorMessage(st: Throwable): String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.ui

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.{KVIndex, KVStore}

class SparkConnectServerAppStatusStore(store: KVStore) {
def getSessionList: Seq[SessionInfo] = {
KVUtils.viewToSeq(store.view(classOf[SessionInfo]))
}

def getExecutionList: Seq[ExecutionInfo] = {
KVUtils.viewToSeq(store.view(classOf[ExecutionInfo]))
}

def getOnlineSessionNum: Int = {
KVUtils.count(store.view(classOf[SessionInfo]))(_.finishTimestamp == 0)
}

def getSession(sessionId: String): Option[SessionInfo] = {
try {
Some(store.read(classOf[SessionInfo], sessionId))
} catch {
case _: NoSuchElementException => None
}
}

def getExecution(executionId: String): Option[ExecutionInfo] = {
try {
Some(store.read(classOf[ExecutionInfo], executionId))
} catch {
case _: NoSuchElementException => None
}
}

/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = {
KVUtils.count(store.view(classOf[ExecutionInfo]))(_.isExecutionActive)
}

def getSessionCount: Long = {
store.count(classOf[SessionInfo])
}

def getExecutionCount: Long = {
store.count(classOf[ExecutionInfo])
}
}

private[connect] class SessionInfo(
@KVIndexParam val sessionId: String,
val startTimestamp: Long,
val userId: String,
val finishTimestamp: Long,
val totalExecution: Long) {
@JsonIgnore @KVIndex("finishTime")
private def finishTimeIndex: Long = if (finishTimestamp > 0L) finishTimestamp else -1L
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
}

private[connect] class ExecutionInfo(
@KVIndexParam val jobTag: String,
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userId: String,
val operationId: String,
val sparkSessionTags: Set[String],
val finishTimestamp: Long,
val closeTimestamp: Long,
val detail: String,
val state: ExecutionState.Value,
val jobId: ArrayBuffer[String],
val sqlExecId: mutable.Set[String]) {
@JsonIgnore @KVIndex("finishTime")
private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) {
finishTimestamp
} else -1L

@JsonIgnore @KVIndex("isExecutionActive")
def isExecutionActive: Boolean = {
state == ExecutionState.STARTED ||
state == ExecutionState.COMPILED ||
state == ExecutionState.READY
}

def totalTime(endTime: Long): Long = {
if (endTime == 0L) {
System.currentTimeMillis - startTimestamp
} else {
endTime - startTimestamp
}
}
}

private[connect] object ExecutionState extends Enumeration {
val STARTED, COMPILED, READY, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.ui

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
import org.apache.spark.ui.SparkUI

class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin {

override def createListeners(
conf: SparkConf,
store: ElementTrackingStore): Seq[SparkListener] = {
Seq(new SparkConnectServerListener(store, conf))
}

override def setupUI(ui: SparkUI): Unit = {
val store = new SparkConnectServerAppStatusStore(ui.store.store)
if (store.getSessionCount > 0) {
new SparkConnectServerTab(store, ui)
}
}

override def displayOrder: Int = 3
}
Loading

0 comments on commit f8786f0

Please sign in to comment.