diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4ea11d753e119..78d7231a19789 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -478,7 +478,7 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val accUpdates = accums.map(acc => acc.toInfoUpdate) setTaskFinishedAndClearInterruptStatus() (accums, accUpdates) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 91b0c983e4a87..1970e34679329 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -898,7 +898,7 @@ private[spark] class TaskSchedulerImpl( executorRunTime = acc.value.asInstanceOf[Long] } } - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate } val taskProcessRate = if (efficientTaskCalcualtionEnabled) { getTaskProcessRate(recordsRead, executorRunTime) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 181033c9d20c8..aadde1e20226a 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -102,16 +102,24 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { metadata.countFailedValues } + private def isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + /** * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided * values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) AccumulableInfo(id, name, internOption(update), internOption(value), isInternal, countFailedValues) } + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] as an update. + */ + private[spark] def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(value)), None, isInternal, countFailedValues) + } + final private[spark] def isAtDriverSide: Boolean = atDriverSide /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c1d52484049db..e258ccb9b2cf1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1365,7 +1365,7 @@ private[spark] object JsonProtocol { val accumUpdates = jsonOption(json.get("Accumulator Updates")) .map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq) .getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => { - acc.toInfo(Some(acc.value), None) + acc.toInfoUpdate }).toArray.toSeq) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index e4dfa149a7d20..109dd7b1b5da2 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -113,7 +113,7 @@ private[spark] object AccumulatorSuite { * Make an `AccumulableInfo` out of an `AccumulatorV2` with the intent to use the * info as an accumulator update. */ - def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) + def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfoUpdate /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 6d2578c3576da..e645c557c4e86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -90,6 +90,13 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato AccumulableInfo(id, name, internOption(update), internOption(value), true, true, SQLMetrics.cachedSQLAccumIdentifier) } + + // We should provide the raw value which can be -1, so that `SQLMetrics.stringValue` can correctly + // filter out the invalid -1 values. + override def toInfoUpdate: AccumulableInfo = { + AccumulableInfo(id, name, internOption(Some(_value)), None, true, true, + SQLMetrics.cachedSQLAccumIdentifier) + } } object SQLMetrics { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7b9f877bdef5a..dd8606a2c902f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -181,7 +181,7 @@ class SQLAppStatusListener( event.taskMetrics.externalAccums.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { - Some(a.toInfo(Some(a.value), None)) + Some(a.toInfoUpdate) } catch { case _: IllegalAccessError => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index cb09d7e116a0e..d9072f27c2196 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -899,6 +899,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils })))) ) } + + test("SQLMetric#toInfoUpdate") { + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").toInfoUpdate.update === Some(-1)) + assert(SQLMetrics.createMetric(sparkContext, name = "m").toInfoUpdate.update === Some(0)) + } } case class CustomFileCommitProtocol( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0b6dc6903245e..1a57f2f029d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -361,7 +361,7 @@ object InputOutputMetricsHelper { var maxOutputRows = 0L for (accum <- taskEnd.taskMetrics.externalAccums) { - val info = accum.toInfo(Some(accum.value), None) + val info = accum.toInfoUpdate if (info.name.toString.contains("number of output rows")) { info.update match { case Some(n: Number) =>