Skip to content

Commit

Permalink
[SPARK-49038][SQL] SQLMetric should report the raw value in the accum…
Browse files Browse the repository at this point in the history
…ulator update event

### What changes were proposed in this pull request?

Some `SQLMetrics` set the initial value to `-1`, so that we can recognize no-update metrics (e.g. there is no input data and the metric is not updated at all) and filter them out later in the UI.

However, there is a bug here. Spark turns accumulator updates into `AccumulableInfo`, using `AccumulatorV2#value`. To avoid exposing the internal `-1` value to end users, `SQLMetric#value` turns `-1` into `0` before returning the value. See more details in apache#39311 . UI can no longer see `-1` and filter them out.

This PR fixes the bug by using the raw value of `SQLMetric` to create `AccumulableInfo`, so that UI can still see `-1` and filters it.

### Why are the changes needed?

To avoid getting the wrong min value for certain SQL metrics when some partitions have no data.

### Does this PR introduce _any_ user-facing change?

Yes, if people write spark listeners to watch the `SparkListenerExecutorMetricsUpdate` event, they can see the correct value of SQL metrics.

### How was this patch tested?

manual UI tests. We do not have an end-to-end UI test framework for SQL metrics yet.

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47721 from cloud-fan/metrics.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
cloud-fan authored and IvanK-db committed Sep 19, 2024
1 parent 949ba88 commit 25b4d26
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val accUpdates = accums.map(acc => acc.toInfoUpdate)

setTaskFinishedAndClearInterruptStatus()
(accums, accUpdates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ private[spark] class TaskSchedulerImpl(
executorRunTime = acc.value.asInstanceOf[Long]
}
}
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}
val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
getTaskProcessRate(recordsRead, executorRunTime)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
metadata.countFailedValues
}

private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
* values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
AccumulableInfo(id, name, internOption(update), internOption(value), isInternal,
countFailedValues)
}

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update.
*/
private[spark] def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues)
}

final private[spark] def isAtDriverSide: Boolean = atDriverSide

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ private[spark] object JsonProtocol extends JsonUtils {
val accumUpdates = jsonOption(json.get("Accumulator Updates"))
.map(_.extractElements.map(accumulableInfoFromJson).toArray.toImmutableArraySeq)
.getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}).toArray.toImmutableArraySeq)
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[spark] object AccumulatorSuite {
* Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the
* info as an accumulator update.
*/
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate

/**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ class SQLMetric(
AccumulableInfo(id, name, internOption(update), internOption(value), true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}

// We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly
// filter out the invalid -1 values.
override def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(_value)), None, true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}
}

object SQLMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class SQLAppStatusListener(
event.taskMetrics.withExternalAccums(_.flatMap { a =>
// This call may fail if the accumulator is gc'ed, so account for that.
try {
Some(a.toInfo(Some(a.value), None))
Some(a.toInfoUpdate)
} catch {
case _: IllegalAccessError => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5)
}
}

test("SQLMetric#toInfoUpdate") {
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1))
assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0))
}
}

case class CustomFileCommitProtocol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ object InputOutputMetricsHelper {

var maxOutputRows = 0L
taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
val info = accum.toInfo(Some(accum.value), None)
val info = accum.toInfoUpdate
if (info.name.toString.contains("number of output rows")) {
info.update match {
case Some(n: Number) =>
Expand Down

0 comments on commit 25b4d26

Please sign in to comment.