Skip to content

Commit

Permalink
Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Oct 6, 2023
1 parent 39d43e0 commit b9adb6b
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class SQLAppStatusListener(
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
executionData.modifiedConfigs = sqlStoreData.modifiedConfigs
executionData.metrics = sqlStoreData.metrics
executionData.addMetrics(sqlStoreData.metrics)
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.jobs = sqlStoreData.jobs
Expand All @@ -111,8 +111,8 @@ class SQLAppStatusListener(

// Record the accumulator IDs and metric types for the stages of this job, so that the code
// that keeps track of the metrics knows which accumulators to look at.
val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
if (accumIdsAndType.nonEmpty) {
val accumIdsAndType = exec.metricAccumulatorIdToMetricType
if (!accumIdsAndType.isEmpty) {
event.stageInfos.foreach { stage =>
stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
stage.numTasks, accumIdsAndType))
Expand Down Expand Up @@ -361,7 +361,7 @@ class SQLAppStatusListener(
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
exec.modifiedConfigs = modifiedConfigs
exec.metrics = sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
exec.submissionTime = time
update(exec)
}
Expand All @@ -383,15 +383,15 @@ class SQLAppStatusListener(

val exec = getOrCreateExecution(executionId)
exec.physicalPlanDescription = physicalPlanDescription
exec.metrics ++= sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
update(exec)
}

private def onAdaptiveSQLMetricUpdate(event: SparkListenerSQLAdaptiveSQLMetricUpdates): Unit = {
val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event

val exec = getOrCreateExecution(executionId)
exec.metrics ++= sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
update(exec)
}

Expand Down Expand Up @@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var details: String = null
var physicalPlanDescription: String = null
var modifiedConfigs: Map[String, String] = _
var metrics = collection.Seq[SQLPlanMetric]()
private var _metrics = collection.Seq[SQLPlanMetric]()
def metrics: collection.Seq[SQLPlanMetric] = _metrics
// This mapping is shared across all LiveStageMetrics instances associated with
// this LiveExecutionData, helping to reduce memory overhead by avoiding waste
// from separate immutable maps with largely overlapping sets of entries.
val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None
Expand Down Expand Up @@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metricsValues)
}

def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
_metrics ++= newMetrics
newMetrics.foreach { m =>
metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType)
}
}
}

private class LiveStageMetrics(
val stageId: Int,
val attemptId: Int,
val numTasks: Int,
val accumIdsToMetricType: Map[Long, String]) {
val accumIdsToMetricType: ConcurrentHashMap[Long, String]) {

/**
* Mapping of task IDs to their respective index. Note this may contain more elements than the
Expand Down Expand Up @@ -575,7 +586,7 @@ private class LiveStageMetrics(
}

accumUpdates
.filter { acc => acc.update.isDefined && accumIdsToMetricType.contains(acc.id) }
.filter { acc => acc.update.isDefined && accumIdsToMetricType.containsKey(acc.id) }
.foreach { acc =>
// In a live application, accumulators have Long values, but when reading from event
// logs, they have String values. For now, assume all accumulators are Long and convert
Expand All @@ -589,7 +600,7 @@ private class LiveStageMetrics(
val metricValues = taskMetrics.computeIfAbsent(acc.id, _ => new Array(numTasks))
metricValues(taskIdx) = value

if (SQLMetrics.metricNeedsMax(accumIdsToMetricType(acc.id))) {
if (SQLMetrics.metricNeedsMax(Option(accumIdsToMetricType.get(acc.id)).get)) {
val maxMetricsTaskId = metricsIdToMaxTaskValue.computeIfAbsent(acc.id, _ => Array(value,
taskId))

Expand Down

0 comments on commit b9adb6b

Please sign in to comment.