Skip to content

Commit

Permalink
[SPARK-44801][SQL][UI] Capture analyzing failed queries in Listener a…
Browse files Browse the repository at this point in the history
…nd UI

### What changes were proposed in this pull request?

This PR wraps the catch-block with a new execution id to QueryExecution.assertAnalyzed. It will reuse `SQLExecution.withNewExecutionId` to produce execution events to the listener and UI.

### Why are the changes needed?

The listener and UI are not able to track analyzing failed queries

### Does this PR introduce _any_ user-facing change?

Yes. UI improvements.

### How was this patch tested?

new tests

locally verified. Prior to this, there is nothing.

#### List

![image](https://github.com/apache/spark/assets/8326978/aac1fbac-e339-4781-9c11-c92a35f56633)

#### Details

![image](https://github.com/apache/spark/assets/8326978/667c2038-f8e2-4b5b-9176-f214c9e3aa0a)

Closes apache#42481 from yaooqinn/SPARK-44801.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Aug 21, 2023
1 parent 04024fd commit 1f92995
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,17 @@ class QueryExecution(
// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner

def assertAnalyzed(): Unit = analyzed
def assertAnalyzed(): Unit = {
try {
analyzed
} catch {
case e: AnalysisException =>
// Because we do eager analysis for Dataframe, there will be no execution created after
// AnalysisException occurs. So we need to explicitly create a new execution to post
// start/end events to notify the listener and UI components.
SQLExecution.withNewExecutionId(this, Some("analyze"))(throw e)
}
}

def assertSupported(): Unit = {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package org.apache.spark.sql.execution
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future => JFuture}
import java.util.concurrent.atomic.AtomicLong

import scala.util.control.NonFatal

import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
Expand All @@ -30,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH
import org.apache.spark.util.Utils

object SQLExecution {
object SQLExecution extends Logging {

val EXECUTION_ID_KEY = "spark.sql.execution.id"
val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id"
Expand Down Expand Up @@ -116,6 +119,15 @@ object SQLExecution {
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
val planInfo = try {
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
} catch {
case NonFatal(e) =>
logDebug("Failed to generate SparkPlanInfo", e)
// If the queryExecution already failed before this, we are not able to generate the
// the plan info, so we use and empty graphviz node to make the UI happy
SparkPlanInfo.EMPTY
}
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId = executionId,
rootExecutionId = Some(rootExecutionId),
Expand All @@ -124,7 +136,7 @@ object SQLExecution {
physicalPlanDescription = queryExecution.explainString(planDescriptionMode),
// `queryExecution.executedPlan` triggers query planning. If it fails, the exception
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
sparkPlanInfo = planInfo,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ private[execution] object SparkPlanInfo {
metadata,
metrics)
}

final lazy val EMPTY: SparkPlanInfo = new SparkPlanInfo("", "", Nil, Map.empty, Nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {

private var spark: SparkSession = _

private def creatSparkSessionWithUI: SparkSession = SparkSession.builder()
.master("local[1,1]")
.appName("sql ui test")
.config("spark.ui.enabled", "true")
.config("spark.ui.port", "0")
.getOrCreate()

implicit val webDriver: HtmlUnitDriver = new HtmlUnitDriver {
getWebClient.setCssErrorHandler(new SparkUICssErrorHandler)
}

private def findErrorMessageOnSQLUI(): List[String] = {
val webUrl = spark.sparkContext.uiWebUrl
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
go to s"${webUrl.get}/SQL"
findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
}

private def findExecutionIDOnSQLUI(): Int = {
val webUrl = spark.sparkContext.uiWebUrl
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
go to s"${webUrl.get}/SQL"
findAll(cssSelector("""#failed-table td""")).map(_.text).toList.head.toInt
}

override def afterAll(): Unit = {
try {
webDriver.quit()
Expand All @@ -54,21 +75,32 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser {
}

test("SPARK-44737: Should not display json format errors on SQL page for non-SparkThrowables") {
spark = SparkSession.builder()
.master("local[1,1]")
.appName("sql ui test")
.config("spark.ui.enabled", "true")
.config("spark.ui.port", "0")
.getOrCreate()
spark = creatSparkSessionWithUI

intercept[Exception](spark.sql("SET mapreduce.job.reduces = 0").isEmpty)
eventually(timeout(10.seconds), interval(100.milliseconds)) {
val webUrl = spark.sparkContext.uiWebUrl
assert(webUrl.isDefined, "please turn on spark.ui.enabled")
go to s"${webUrl.get}/SQL"
val sd = findAll(cssSelector("""#failed-table td .stacktrace-details""")).map(_.text).toList
val sd = findErrorMessageOnSQLUI()
assert(sd.size === 1, "SET mapreduce.job.reduces = 0 shall fail")
assert(sd.head.startsWith("java.lang.IllegalArgumentException:"))
}
}

test("SPARK-44801: Analyzer failure shall show the query in failed table") {
spark = creatSparkSessionWithUI

intercept[Exception](spark.sql("SELECT * FROM I_AM_A_INVISIBLE_TABLE").isEmpty)
eventually(timeout(10.seconds), interval(100.milliseconds)) {
val sd = findErrorMessageOnSQLUI()
assert(sd.size === 1, "Analyze fail shall show the query in failed table")
assert(sd.head.startsWith("[TABLE_OR_VIEW_NOT_FOUND]"))

val id = findExecutionIDOnSQLUI()
// check query detail page
go to s"${spark.sparkContext.uiWebUrl.get}/SQL/execution/?id=$id"
val planDot = findAll(cssSelector(""".dot-file""")).map(_.text).toList
assert(planDot.head.startsWith("digraph G {"))
val planDetails = findAll(cssSelector("""#physical-plan-details""")).map(_.text).toList
assert(planDetails.head.contains("TABLE_OR_VIEW_NOT_FOUND"))
}
}
}

0 comments on commit 1f92995

Please sign in to comment.