From 2f6cca55286a97bb20ed665c741d0607ed04d0ce Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 14 Oct 2023 02:28:15 -0500 Subject: [PATCH] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR aims to reduce the memory consumption of `LiveStageMetrics.accumIdsToMetricType`, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs. In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics. Currently, that field is derived from `LiveExecutionData.metrics`, which contains metrics for _all_ operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all `LiveStageMetrics` instances will hold the same immutable `accumIdsToMetricType`. The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new `accumIdsToMetricType` map being created. This PR fixes this by changing `accumIdsToMetricType` to be a mutable `mutable.HashMap` which is shared across all `LivestageMetrics` instances belonging to the same `LiveExecutionData`. The modified classes are `private` and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code. ### Why are the changes needed? Addresses one contributing factor behind high driver memory / OOMs when executing complex queries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using benchmark that ran copies of a complex query: each test query launches ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from LiveStageMetrics. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43250 from JoshRosen/reduce-accum-ids-to-metric-type-mem-overhead. Authored-by: Josh Rosen Signed-off-by: Mridul Muralidharan gmail.com> --- .../execution/ui/SQLAppStatusListener.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7e31d40e51196..0de16a459717e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -95,7 +95,7 @@ class SQLAppStatusListener( executionData.details = sqlStoreData.details executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription executionData.modifiedConfigs = sqlStoreData.modifiedConfigs - executionData.metrics = sqlStoreData.metrics + executionData.addMetrics(sqlStoreData.metrics) executionData.submissionTime = sqlStoreData.submissionTime executionData.completionTime = sqlStoreData.completionTime executionData.jobs = sqlStoreData.jobs @@ -111,7 +111,7 @@ class SQLAppStatusListener( // Record the accumulator IDs and metric types for the stages of this job, so that the code // that keeps track of the metrics knows which accumulators to look at. - val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val accumIdsAndType = exec.metricAccumulatorIdToMetricType if (accumIdsAndType.nonEmpty) { event.stageInfos.foreach { stage => stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0, @@ -361,7 +361,7 @@ class SQLAppStatusListener( exec.details = details exec.physicalPlanDescription = physicalPlanDescription exec.modifiedConfigs = modifiedConfigs - exec.metrics = sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) exec.submissionTime = time update(exec) } @@ -383,7 +383,7 @@ class SQLAppStatusListener( val exec = getOrCreateExecution(executionId) exec.physicalPlanDescription = physicalPlanDescription - exec.metrics ++= sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) update(exec) } @@ -391,7 +391,7 @@ class SQLAppStatusListener( val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event val exec = getOrCreateExecution(executionId) - exec.metrics ++= sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) update(exec) } @@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var details: String = null var physicalPlanDescription: String = null var modifiedConfigs: Map[String, String] = _ - var metrics = collection.Seq[SQLPlanMetric]() + private var _metrics = collection.Seq[SQLPlanMetric]() + def metrics: collection.Seq[SQLPlanMetric] = _metrics + // This mapping is shared across all LiveStageMetrics instances associated with + // this LiveExecutionData, helping to reduce memory overhead by avoiding waste + // from separate immutable maps with largely overlapping sets of entries. + val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]() var submissionTime = -1L var completionTime: Option[Date] = None var errorMessage: Option[String] = None @@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metricsValues) } + def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = { + _metrics ++= newMetrics + newMetrics.foreach { m => + metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType) + } + } } private class LiveStageMetrics( val stageId: Int, val attemptId: Int, val numTasks: Int, - val accumIdsToMetricType: Map[Long, String]) { + val accumIdsToMetricType: mutable.Map[Long, String]) { /** * Mapping of task IDs to their respective index. Note this may contain more elements than the