diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9290b5b36a8f7..69a91839bbeb5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -530,7 +530,7 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfoUpdate) setTaskFinishedAndClearInterruptStatus() (accums, accUpdates) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0cb970fd27880..5e9716dfcfe90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -901,7 +901,7 @@ private[spark] class TaskSchedulerImpl( executorRunTime = acc.value.asInstanceOf[Long] } } - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate } val taskProcessRate = if (efficientTaskCalcualtionEnabled) { getTaskProcessRate(recordsRead, executorRunTime) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 181033c9d20c8..aadde1e20226a 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { metadata.countFailedValues } + private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + /** * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided * values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) AccumulableInfo(id, name, internOption(update), internOption(value), isInternal, countFailedValues) } + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update. + */ + private[spark] def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues) + } + final private[spark] def isAtDriverSide: Boolean = atDriverSide /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 025e5d5bac94b..377caf776deb0 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1350,7 +1350,7 @@ private[spark] object JsonProtocol extends JsonUtils { val accumUpdates = jsonOption(json.get("Accumulator Updates")) .map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq) .getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => { - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate }).toArray.toSeq) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 9b70ccdf07e1b..a9d7e8a0f2eda 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -147,7 +147,7 @@ private[spark] object AccumulatorSuite { * Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the * info as an accumulator update. */ - def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 3326c5d4cb994..09406345ed770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato AccumulableInfo(id, name, internOption(update), internOption(value), true, true, SQLMetrics.cachedSQLAccumIdentifier) } + + // We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly + // filter out the invalid -1 values. + override def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(_value)), None, true, true, + SQLMetrics.cachedSQLAccumIdentifier) + } } object SQLMetrics { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 067f7305ab113..17de4d42257b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -181,7 +181,7 @@ class SQLAppStatusListener( event.taskMetrics.withExternalAccums(_.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { - Some(a.toInfo(Some(a.value), None)) + Some(a.toInfoUpdate) } catch { case _: IllegalAccessError => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6347757e178c0..5cdbdc27b3259 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -960,6 +960,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero()) assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isZero()) } + + test("SQLMetric#toInfoUpdate") { + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1)) + assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0)) + } } case class CustomFileCommitProtocol( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 46dc84c0582f8..e964867cb86ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -312,7 +312,7 @@ object InputOutputMetricsHelper { var maxOutputRows = 0L taskEnd.taskMetrics.withExternalAccums(_.foreach { accum => - val info = accum.toInfo(Some(accum.value), None) + val info = accum.toInfoUpdate if (info.name.toString.contains("number of output rows")) { info.update match { case Some(n: Number) =>