diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7e31d40e51196..0de16a459717e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -95,7 +95,7 @@ class SQLAppStatusListener( executionData.details = sqlStoreData.details executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription executionData.modifiedConfigs = sqlStoreData.modifiedConfigs - executionData.metrics = sqlStoreData.metrics + executionData.addMetrics(sqlStoreData.metrics) executionData.submissionTime = sqlStoreData.submissionTime executionData.completionTime = sqlStoreData.completionTime executionData.jobs = sqlStoreData.jobs @@ -111,7 +111,7 @@ class SQLAppStatusListener( // Record the accumulator IDs and metric types for the stages of this job, so that the code // that keeps track of the metrics knows which accumulators to look at. - val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val accumIdsAndType = exec.metricAccumulatorIdToMetricType if (accumIdsAndType.nonEmpty) { event.stageInfos.foreach { stage => stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0, @@ -361,7 +361,7 @@ class SQLAppStatusListener( exec.details = details exec.physicalPlanDescription = physicalPlanDescription exec.modifiedConfigs = modifiedConfigs - exec.metrics = sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) exec.submissionTime = time update(exec) } @@ -383,7 +383,7 @@ class SQLAppStatusListener( val exec = getOrCreateExecution(executionId) exec.physicalPlanDescription = physicalPlanDescription - exec.metrics ++= sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) update(exec) } @@ -391,7 +391,7 @@ class SQLAppStatusListener( val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event val exec = getOrCreateExecution(executionId) - exec.metrics ++= sqlPlanMetrics + exec.addMetrics(sqlPlanMetrics) update(exec) } @@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var details: String = null var physicalPlanDescription: String = null var modifiedConfigs: Map[String, String] = _ - var metrics = collection.Seq[SQLPlanMetric]() + private var _metrics = collection.Seq[SQLPlanMetric]() + def metrics: collection.Seq[SQLPlanMetric] = _metrics + // This mapping is shared across all LiveStageMetrics instances associated with + // this LiveExecutionData, helping to reduce memory overhead by avoiding waste + // from separate immutable maps with largely overlapping sets of entries. + val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]() var submissionTime = -1L var completionTime: Option[Date] = None var errorMessage: Option[String] = None @@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { metricsValues) } + def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = { + _metrics ++= newMetrics + newMetrics.foreach { m => + metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType) + } + } } private class LiveStageMetrics( val stageId: Int, val attemptId: Int, val numTasks: Int, - val accumIdsToMetricType: Map[Long, String]) { + val accumIdsToMetricType: mutable.Map[Long, String]) { /** * Mapping of task IDs to their respective index. Note this may contain more elements than the