Skip to content

Commit

Permalink
[SPARK-49038][SQL][3.5] SQLMetric should report the raw value in the …
Browse files Browse the repository at this point in the history
…accumulator update event

backport apache#47721 to 3.5

Some `SQLMetrics` set the initial value to `-1`, so that we can recognize no-update metrics (e.g. there is no input data and the metric is not updated at all) and filter them out later in the UI.

However, there is a bug here. Spark turns accumulator updates into `AccumulableInfo`, using `AccumulatorV2#value`. To avoid exposing the internal `-1` value to end users, `SQLMetric#value` turns `-1` into `0` before returning the value. See more details in apache#39311 . UI can no longer see `-1` and filter them out.

This PR fixes the bug by using the raw value of `SQLMetric` to create `AccumulableInfo`, so that UI can still see `-1` and filters it.

To avoid getting the wrong min value for certain SQL metrics when some partitions have no data.

Yes, if people write spark listeners to watch the `SparkListenerExecutorMetricsUpdate` event, they can see the correct value of SQL metrics.

manual UI tests. We do not have an end-to-end UI test framework for SQL metrics yet.

no

Closes apache#47749 from cloud-fan/branch-3.5.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit bd2cbd6)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
cloud-fan authored and viirya committed Aug 17, 2024
1 parent ede986c commit 6be1e19
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val accUpdates = accums.map(acc => acc.toInfoUpdate)

setTaskFinishedAndClearInterruptStatus()
(accums, accUpdates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private[spark] class TaskSchedulerImpl(
executorRunTime = acc.value.asInstanceOf[Long]
}
}
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}
val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
getTaskProcessRate(recordsRead, executorRunTime)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
metadata.countFailedValues
}

private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
* values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
AccumulableInfo(id, name, internOption(update), internOption(value), isInternal,
countFailedValues)
}

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update.
*/
private[spark] def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues)
}

final private[spark] def isAtDriverSide: Boolean = atDriverSide

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ private[spark] object JsonProtocol {
val accumUpdates = jsonOption(json.get("Accumulator Updates"))
.map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
.getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}).toArray.toSeq)
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[spark] object AccumulatorSuite {
* Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the
* info as an accumulator update.
*/
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate

/**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
AccumulableInfo(id, name, internOption(update), internOption(value), true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}

// We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly
// filter out the invalid -1 values.
override def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(_value)), None, true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}
}

object SQLMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class SQLAppStatusListener(
event.taskMetrics.externalAccums.flatMap { a =>
// This call may fail if the accumulator is gc'ed, so account for that.
try {
Some(a.toInfo(Some(a.value), None))
Some(a.toInfoUpdate)
} catch {
case _: IllegalAccessError => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}))))
)
}

test("SQLMetric#toInfoUpdate") {
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1))
assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0))
}
}

case class CustomFileCommitProtocol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ object InputOutputMetricsHelper {

var maxOutputRows = 0L
for (accum <- taskEnd.taskMetrics.externalAccums) {
val info = accum.toInfo(Some(accum.value), None)
val info = accum.toInfoUpdate
if (info.name.toString.contains("number of output rows")) {
info.update match {
case Some(n: Number) =>
Expand Down

0 comments on commit 6be1e19

Please sign in to comment.