Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49038][SQL] Fix regression in Spark UI SQL operator metrics calculation to filter out invalid accumulator values correctly #47516

Closed
wants to merge 3 commits into from

Conversation

virrrat
Copy link

@virrrat virrrat commented Jul 29, 2024

What changes were proposed in this pull request?

This patch fixes an issue in the driver hosted Spark UI SQL tab DAG view where the invalid SQL metric values are not filtered out correctly and hence showing incorrect minimum and median metric values in the UI. This regression got introduced in #39311 .

Why are the changes needed?

SIZE, TIMING and NS_TIMING metrics are created with initial value -1 (given 0 is a valid metric value for them). The SQLMetrics.stringValue method filters out the invalid values using condition: value >= 0 before calculating the min, med and max values. But #39311 introduced in Spark 3.4.0 introduced a regression where the SQLMetric.value is always >= 0. This means that the invalid accumulators with value -1 are no longer invalid to get filtered out correctly. This needs to be fixed.

Does this PR introduce any user-facing change?

Yes, as end users can access accumulator values directly. Users accessing the values in the physical plan programmatically should use SQLMetric.isZero before consuming its value.

How was this patch tested?

Existing tests; Created new jar for Spark 3.5.1 and confirms that the incorrect data is shown correctly in Spark UI now.

Old UI view:
old_spark_ui_view_3_5_1.pdf

Fixed UI view:
new_spark_ui_view_3_5_1.pdf

Was this patch authored or co-authored using generative AI tooling?

No

This change correctly filters out the invalid accumulators values for SIZE and TIMING metrics before data is shown on UI.
Given SIZE and TIMING metrics are assigned with an initial value of -1, we need return it as it is so that the `SQLMetrics.stringValue` function can filter out the invalid values correctly.
@github-actions github-actions bot added the SQL label Jul 29, 2024
@abmodi
Copy link

abmodi commented Jul 31, 2024

@dongjoon-hyun could you please review - thanks!

@dongjoon-hyun
Copy link
Member

Sure, @abmodi and @virrrat .

// values in `SQLMetrics.stringValue` when calculating min, max, etc.
// However, users accessing the values in the physical plan programmatically still gets -1. They
// may use `SQLMetric.isZero` before consuming this value.
override def value: Long = _value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually, this is a direct reverting of #39311 .

  • In this case, we need a review from the author because he proposed this as a kind of regression for accumulator values.
  • In addition, could you add a new unit test case for your case? IIUC, this PR only seems to update the existing test cases.

cc @cloud-fan and @viirya and @HyukjinKwon from #39311 , too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 is an internal initial value, and we can't expose it to users. People can get the physical plan and directly access the SQLMetrics instances to get the values.

@cloud-fan
Copy link
Contributor

Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.

@virrrat
Copy link
Author

virrrat commented Jul 31, 2024

Do we have a test case to demonstrate the issue? AFAIK Spark filters out 0-value accumulators at the executor side.

SQLMetrics.stringValue has different logic for different metric types to filter out invalid accumulator values.

  1. For AVERAGE_METRIC zero and negative values are filtered out here.
  2. For SIZE, TIMING and NS_TIMING, negative values are filtered out here. Given zero is a valid metric value for them, those are not filtered out.

So the change made in #39311 basically converts invalid -1 accumulator value to a valid 0 value which isn't getting filtered out now, resulting in wrong min and median values for size and time metrics. While the max and cumulative value still matches.

I think there is no existing test for this, that's why it was never caught. Let me see if I can add one to demonstrate the issue.

@virrrat
Copy link
Author

virrrat commented Jul 31, 2024

Just to note, SIZE, TIMING and NS_TIMING metrics are created with initial value -1.

@virrrat
Copy link
Author

virrrat commented Jul 31, 2024

Do we have a test case to demonstrate the issue?

Added the test @cloud-fan

@cloud-fan
Copy link
Contributor

I'm not convinced by the added test, as it calls SQLMetrics.stringValue directly. In reality, the values parameter of this method comes from accumulator values from spark events. Are you sure it may contain -1?

@cloud-fan
Copy link
Contributor

I don't see any problem with the current SQLMetric framework. Let me give a summary here:

  1. SQLMetric has an initial value. When executing the SQL operator on the worker side, and the metrics are not updated at all, the metrics value will remain as the initial value and be filtered and not sent back to the driver.
  2. The initial value is not always 0, as 0 can be valid as well, for max/min stats.
  3. UI is not the only way to access the metrics. People can access SQLMetric instances in physical plan programmatically. We need to hide -1 as it's an internal value for filtering metrics values at the worker side.

@virrrat
Copy link
Author

virrrat commented Aug 1, 2024

Are you sure it may contain -1?

You can run the tpc-ds benchmark q1 and check the SQL tab DAG views in driver hosted Spark UI. The minimum and median values are not correct for multiple operator metrics and they don't match with history server values either (as pointed out by @abmodi in the JIRA ticket). For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value. The reason for this is that there are extra zeros in the values parameter of SQLMetrics.stringValue method which were actually -1 and should have filtered out.

I'm not a spark expert but given SQLMetrics.stringValue method has this logic to fetch validValues itself indicates that we expect -1 in the values parameter.

People can access SQLMetric instances in physical plan programmatically.

Agree. If users are consuming values directly from listener, they may use SQLMetric.isZero before consuming it. We can't compromise the integrity of the SQLMetric.value just for this given it is causing regression in the data shown in Spark UI. Either we need to expose two different variables, one to be used for Spark UI metric data calculation and other to be used by users programmatically.

@cloud-fan
Copy link
Contributor

For incorrect metric cases, the minimum value is always zero while the median value is always less than the actual median value.

As I explained earlier, this should not happen as we will filter out -1 values at the executor side. So the 0 values in the UI may be valid values from certain tasks. Do you have a simple repro (end-to-end query) to trigger this bug?

@virrrat
Copy link
Author

virrrat commented Aug 2, 2024

Do you have a simple repro (end-to-end query) to trigger this bug?

Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.

import scala.util._

def randString() = Random.alphanumeric take 30 mkString

val x = sc.parallelize(0 until 100000, 100)
val y = sc.parallelize(100000 until 2000000, 100)

val a = x.map(x => (x,randString()))
val b = y.map(y => (y,randString()))

val df1 = spark.createDataFrame(a).toDF("col1", "col2")
val df2 = spark.createDataFrame(b).toDF("col3", "col4")

df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")


spark.sql("select * from t1, t2 where t1.col1 = t2.col3").collect

Attaching screenshots, data in spark UI is not correct and it doesn't match between spark UI and history server for Spark 3.5.0. Data in spark UI for Spark 3.3.2 is correct.

3.5.0 Spark UI: spark_ui_350.pdf
3.5.0 History Server: history_server_350.pdf
3.3.2 Spark UI: spark_ui_332.pdf

@virrrat
Copy link
Author

virrrat commented Aug 7, 2024

@cloud-fan waiting for your response to unblock the review - thanks!

@virrrat
Copy link
Author

virrrat commented Aug 12, 2024

Can you please use the below reproducer? This is join between two tables that shuffles data. This can be run in a spark-shell.

@cloud-fan were you able to reproduce the issue? It is a very simple scenario that reproduces the issue for spark 3.4.0 onwards. The issue wasn't there in the previous versions.

cc @abmodi and @dongjoon-hyun too.

@cloud-fan
Copy link
Contributor

I confirmed that the bug exists. I was wrong about executor side accumulator updates filtering. We only filter out zero values for task metrics, but not SQL metrics.

But I don't think this PR is the right fix as it makes SQLMetric#value to return the wrong result. I've proposed a different fix: #47721

@virrrat
Copy link
Author

virrrat commented Aug 12, 2024

Thanks for checking and confirming! Actually I was to able to track down that the bug started after #39311 only and hence I proposed this fix. But If there is a better way to fix this bug, I am fine with it as long as we are fixing it.

Copy link

github-actions bot commented Dec 4, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 4, 2024
@github-actions github-actions bot closed this Dec 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants