From 6be1e195f2abb005ad9fea9f5c85dc43eb7e8622 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 13 Aug 2024 22:46:48 -0700 Subject: [PATCH] [SPARK-49038][SQL][3.5] SQLMetric should report the raw value in the accumulator update event backport https://github.com/apache/spark/pull/47721 to 3.5 Some `SQLMetrics` set the initial value to `-1`, so that we can recognize no-update metrics (e.g. there is no input data and the metric is not updated at all) and filter them out later in the UI. However, there is a bug here. Spark turns accumulator updates into `AccumulableInfo`, using `AccumulatorV2#value`. To avoid exposing the internal `-1` value to end users, `SQLMetric#value` turns `-1` into `0` before returning the value. See more details in https://github.com/apache/spark/pull/39311 . UI can no longer see `-1` and filter them out. This PR fixes the bug by using the raw value of `SQLMetric` to create `AccumulableInfo`, so that UI can still see `-1` and filters it. To avoid getting the wrong min value for certain SQL metrics when some partitions have no data. Yes, if people write spark listeners to watch the `SparkListenerExecutorMetricsUpdate` event, they can see the correct value of SQL metrics. manual UI tests. We do not have an end-to-end UI test framework for SQL metrics yet. no Closes #47749 from cloud-fan/branch-3.5. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun (cherry picked from commit bd2cbd6ac5d7cbfce332cfdb3f31be27105ed36d) Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scala/org/apache/spark/util/AccumulatorV2.scala | 10 +++++++++- .../scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../test/scala/org/apache/spark/AccumulatorSuite.scala | 2 +- .../apache/spark/sql/execution/metric/SQLMetrics.scala | 7 +++++++ .../spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 5 +++++ .../sql/execution/metric/SQLMetricsTestUtils.scala | 2 +- 9 files changed, 27 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4ea11d753e119..78d7231a19789 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -478,7 +478,7 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfoUpdate) setTaskFinishedAndClearInterruptStatus() (accums, accUpdates) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 91b0c983e4a87..1970e34679329 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -898,7 +898,7 @@ private[spark] class TaskSchedulerImpl( executorRunTime = acc.value.asInstanceOf[Long] } } - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate } val taskProcessRate = if (efficientTaskCalcualtionEnabled) { getTaskProcessRate(recordsRead, executorRunTime) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 181033c9d20c8..aadde1e20226a 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { metadata.countFailedValues } + private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + /** * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided * values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) AccumulableInfo(id, name, internOption(update), internOption(value), isInternal, countFailedValues) } + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update. + */ + private[spark] def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues) + } + final private[spark] def isAtDriverSide: Boolean = atDriverSide /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c1d52484049db..e258ccb9b2cf1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1365,7 +1365,7 @@ private[spark] object JsonProtocol { val accumUpdates = jsonOption(json.get("Accumulator Updates")) .map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq) .getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => { - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate }).toArray.toSeq) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index e4dfa149a7d20..109dd7b1b5da2 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -113,7 +113,7 @@ private[spark] object AccumulatorSuite { * Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the * info as an accumulator update. */ - def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 6d2578c3576da..e645c557c4e86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato AccumulableInfo(id, name, internOption(update), internOption(value), true, true, SQLMetrics.cachedSQLAccumIdentifier) } + + // We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly + // filter out the invalid -1 values. + override def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(_value)), None, true, true, + SQLMetrics.cachedSQLAccumIdentifier) + } } object SQLMetrics { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7b9f877bdef5a..dd8606a2c902f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -181,7 +181,7 @@ class SQLAppStatusListener( event.taskMetrics.externalAccums.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { - Some(a.toInfo(Some(a.value), None)) + Some(a.toInfoUpdate) } catch { case _: IllegalAccessError => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index cb09d7e116a0e..d9072f27c2196 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -899,6 +899,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils })))) ) } + + test("SQLMetric#toInfoUpdate") { + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1)) + assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0)) + } } case class CustomFileCommitProtocol( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0b6dc6903245e..1a57f2f029d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -361,7 +361,7 @@ object InputOutputMetricsHelper { var maxOutputRows = 0L for (accum <- taskEnd.taskMetrics.externalAccums) { - val info = accum.toInfo(Some(accum.value), None) + val info = accum.toInfoUpdate if (info.name.toString.contains("number of output rows")) { info.update match { case Some(n: Number) =>