Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType #43250

Conversation

JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Oct 6, 2023

What changes were proposed in this pull request?

This PR aims to reduce the memory consumption of LiveStageMetrics.accumIdsToMetricType, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs.

In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics.

Currently, that field is derived from LiveExecutionData.metrics, which contains metrics for all operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all LiveStageMetrics instances will hold the same immutable accumIdsToMetricType.

The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new accumIdsToMetricType map being created.

This PR fixes this by changing accumIdsToMetricType to be a mutable mutable.HashMap which is shared across all LivestageMetrics instances belonging to the same LiveExecutionData.

The modified classes are private and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code.

Why are the changes needed?

Addresses one contributing factor behind high driver memory / OOMs when executing complex queries.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests.

To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using benchmark that ran copies of a complex query: each test query launches ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from LiveStageMetrics.

Was this patch authored or co-authored using generative AI tooling?

No.

@JoshRosen JoshRosen changed the title [SPARK-45439][SQL] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType Oct 6, 2023
Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

@JoshRosen
Copy link
Contributor Author

Hmm, it looks like the OracleIntegrationSuite tests are flaky but I don't think that's related to this PR's changes:

[info] OracleIntegrationSuite:
[info] org.apache.spark.sql.jdbc.OracleIntegrationSuite *** ABORTED *** (7 minutes, 38 seconds)
[info]   The code passed to eventually never returned normally. Attempted 429 times over 7.003095079966667 minutes. Last failure message: ORA-12514: Cannot connect to database. Service freepdb1 is not registered with the listener at host 10.1.0.126 port 45139. (CONNECTION_ID=CC2wkBm6SPGoMF7IghzCeQ==). (DockerJDBCIntegrationSuite.scala:166)
[info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
[info]   at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
[info]   at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
[info]   at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:313)
[info]   at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:312)
[info]   at org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.eventually(DockerJDBCIntegrationSuite.scala:95)

@mridulm
Copy link
Contributor

mridulm commented Oct 14, 2023

The test failures are not related - unfortunately reattempt did not fix them.
Merging to master.
Thanks for fixing this @JoshRosen !
Thanks for the reviews @jiangxb1987, @beliefer, @Ngone51 :-)

@mridulm mridulm closed this in 2f6cca5 Oct 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants