Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49038][SQL][3.5] SQLMetric should report the raw value in the accumulator update event #47749

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty)
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val accUpdates = accums.map(acc => acc.toInfoUpdate)

setTaskFinishedAndClearInterruptStatus()
(accums, accUpdates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ private[spark] class TaskSchedulerImpl(
executorRunTime = acc.value.asInstanceOf[Long]
}
}
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}
val taskProcessRate = if (efficientTaskCalcualtionEnabled) {
getTaskProcessRate(recordsRead, executorRunTime)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
metadata.countFailedValues
}

private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided
* values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
AccumulableInfo(id, name, internOption(update), internOption(value), isInternal,
countFailedValues)
}

/**
* Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update.
*/
private[spark] def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues)
}

final private[spark] def isAtDriverSide: Boolean = atDriverSide

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ private[spark] object JsonProtocol extends JsonUtils {
val accumUpdates = jsonOption(json.get("Accumulator Updates"))
.map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
.getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
acc.toInfo(Some(acc.value), None)
acc.toInfoUpdate
}).toArray.toSeq)
ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[spark] object AccumulatorSuite {
* Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the
* info as an accumulator update.
*/
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate

/**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
AccumulableInfo(id, name, internOption(update), internOption(value), true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}

// We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly
// filter out the invalid -1 values.
override def toInfoUpdate: AccumulableInfo = {
AccumulableInfo(id, name, internOption(Some(_value)), None, true, true,
SQLMetrics.cachedSQLAccumIdentifier)
}
}

object SQLMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class SQLAppStatusListener(
event.taskMetrics.withExternalAccums(_.flatMap { a =>
// This call may fail if the accumulator is gc'ed, so account for that.
try {
Some(a.toInfo(Some(a.value), None))
Some(a.toInfoUpdate)
} catch {
case _: IllegalAccessError => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero())
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isZero())
}

test("SQLMetric#toInfoUpdate") {
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1))
assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0))
}
}

case class CustomFileCommitProtocol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ object InputOutputMetricsHelper {

var maxOutputRows = 0L
taskEnd.taskMetrics.withExternalAccums(_.foreach { accum =>
val info = accum.toInfo(Some(accum.value), None)
val info = accum.toInfoUpdate
if (info.name.toString.contains("number of output rows")) {
info.update match {
case Some(n: Number) =>
Expand Down
Loading