Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49038][SQL] Fix regression in Spark UI SQL operator metrics calculation to filter out invalid accumulator values correctly #47516

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ class SQLMetric(

def +=(v: Long): Unit = add(v)

// _value may be uninitialized, in many cases being -1. We should not expose it to the user
// and instead return 0.
override def value: Long = if (isZero) 0 else _value
// We use -1 as initial value of the SIZE and TIMIMG accumulators (0 is a valid metric value).
// We need to return it as it is so that the SQL UI can filter out the invalid accumulator
// values in `SQLMetrics.stringValue` when calculating min, max, etc.
// However, users accessing the values in the physical plan programmatically still gets -1. They
// may use `SQLMetric.isZero` before consuming this value.
override def value: Long = _value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually, this is a direct reverting of #39311 .

  • In this case, we need a review from the author because he proposed this as a kind of regression for accumulator values.
  • In addition, could you add a new unit test case for your case? IIUC, this PR only seems to update the existing test cases.

cc @cloud-fan and @viirya and @HyukjinKwon from #39311 , too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 is an internal initial value, and we can't expose it to users. People can get the physical plan and directly access the SQLMetrics instances to get the values.


// Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
val allFilesNum = scan1.metrics("numFiles").value
val allFilesSize = scan1.metrics("filesSize").value
assert(scan1.metrics("numPartitions").value === numPartitions)
assert(scan1.metrics("pruningTime").value === 0)
assert(scan1.metrics("pruningTime").value === -1)

// No dynamic partition pruning, so no static metrics
// Only files from fid = 5 partition are scanned
Expand All @@ -1760,7 +1760,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
assert(0 < partFilesNum && partFilesNum < allFilesNum)
assert(0 < partFilesSize && partFilesSize < allFilesSize)
assert(scan2.metrics("numPartitions").value === 1)
assert(scan2.metrics("pruningTime").value === 0)
assert(scan2.metrics("pruningTime").value === -1)

// Dynamic partition pruning is used
// Static metrics are as-if reading the whole fact table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2291,8 +2291,16 @@ class AdaptiveQueryExecSuite
assert(aqeReads.length == 2)
aqeReads.foreach { c =>
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
assert(stats.sizeInBytes >= 0)
assert(stats.rowCount.get >= 0)
val rowCount = stats.rowCount.get
val sizeInBytes = stats.sizeInBytes
assert(rowCount >= 0)
if (rowCount == 0) {
// For empty relation, the query stage doesn't serialize any bytes.
// The SQLMetric keeps initial value.
assert(sizeInBytes == -1)
} else {
assert(sizeInBytes > 0)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

testMetricsInSparkPlanOperator(exchanges.head,
Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
// `testData2.filter($"b" === 0)` is an empty relation.
// The exchange doesn't serialize any bytes.
// The SQLMetric keeps initial value.
testMetricsInSparkPlanOperator(exchanges(1),
Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0))
}

test("Add numRows to metric of BroadcastExchangeExec") {
Expand Down Expand Up @@ -935,21 +939,46 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
assert(windowGroupLimit.get.metrics("numOutputRows").value == 2L)
}

test("SPARK-49038: Correctly filter out invalid accumulator metrics") {
virrrat marked this conversation as resolved.
Show resolved Hide resolved
val metric1 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric2 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric3 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric4 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric5 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric6 = SQLMetrics.createTimingMetric(sparkContext, name = "m")
val metric7 = SQLMetrics.createTimingMetric(sparkContext, name = "m")

metric3.add(0)
metric4.add(2)
metric5.add(4)
metric6.add(5)
metric7.add(10)

val metricTypeTiming = "timing"
val values = Array(metric1.value, metric2.value,
metric3.value, metric4.value, metric5.value, metric6.value, metric7.value)
val maxMetrics = Array(metric7.value, 2, 0, 4) // maxValue stageId attemptId taskId
val expectedOutput =
"total (min, med, max (stageId: taskId))\n21 ms (0 ms, 4 ms, 10 ms (stage 2.0: task 4))"

assert(SQLMetrics.stringValue(metricTypeTiming, values, maxMetrics).equals(expectedOutput))
virrrat marked this conversation as resolved.
Show resolved Hide resolved
}

test("Creating metrics with initial values") {
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").value === 0)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").value === -1)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === -1)

assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").isZero)
assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).isZero)

assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === 0)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === -1)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === -1)

assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").isZero)
assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).isZero)

assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === 0)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === -1)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === -1)

assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").isZero)
assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero)
Expand Down