From 5bc6ffc45723419260b7f7aa06c044326a5f24b7 Mon Sep 17 00:00:00 2001 From: skakker Date: Thu, 11 Jan 2018 11:24:42 +0530 Subject: [PATCH] Changing GC thresholds and calculation in spill heuristic (#319) --- .../fetchers/statusapiv1/statusapiv1.scala | 2 ++ .../heuristics/ExecutorGcHeuristic.scala | 10 +++++++- .../ExecutorStorageSpillHeuristic.scala | 25 +++++++++---------- .../legacydata/LegacyDataConverters.scala | 1 + .../spark/legacydata/SparkExecutorData.java | 3 ++- .../spark/SparkMetricsAggregatorTest.scala | 1 + .../heuristics/ExecutorGcHeuristicTest.scala | 1 + .../ExecutorStorageSpillHeuristicTest.scala | 5 ++-- .../heuristics/ExecutorsHeuristicTest.scala | 1 + .../JvmUsedMemoryHeuristicTest.scala | 1 + .../UnifiedMemoryHeuristicTest.scala | 1 + 11 files changed, 34 insertions(+), 17 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 63d99eba4..808a3f4b0 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -81,6 +81,7 @@ trait ExecutorSummary{ def failedTasks: Int def completedTasks: Int def totalTasks: Int + def maxTasks: Int def totalDuration: Long def totalInputBytes: Long def totalShuffleRead: Long @@ -291,6 +292,7 @@ class ExecutorSummaryImpl( var failedTasks: Int, var completedTasks: Int, var totalTasks: Int, + var maxTasks: Int, var totalDuration: Long, var totalInputBytes: Long, var totalShuffleRead: Long, diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala index 23da7db28..d4ef405dc 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala @@ -78,7 +78,7 @@ object ExecutorGcHeuristic { /** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) * These thresholds are experimental and are likely to change */ val DEFAULT_GC_SEVERITY_A_THRESHOLDS = - SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) + SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true) /** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) * These thresholds are experimental and are likely to change */ @@ -90,7 +90,15 @@ object ExecutorGcHeuristic { class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) { lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries + if (executorAndDriverSummaries == null) { + throw new Exception("Executors Summary is null.") + } + lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver")) + if (executorSummaries.isEmpty) { + throw new Exception("No executor information available.") + } + lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala index a625b441e..7001c8e27 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -96,30 +96,29 @@ object ExecutorStorageSpillHeuristic { val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD : String ="10GB" class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { - lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries + lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries + lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver")) lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties + val maxTasks: Int = executorSummaries.head.maxTasks val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size - val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum + val totalMemorySpilledPerTask = totalMemorySpilled/(executorSummaries.map(_.totalTasks).sum) + lazy val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble val severity: Severity = { if (fractionOfExecutorsHavingBytesSpilled != 0) { if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold - && maxMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + && totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) { Severity.LOW - } - else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold - && meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + } else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) { Severity.MODERATE - } - - else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold - && meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + } else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) { Severity.SEVERE - } - else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold - && meanMemorySpilled >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + } else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && totalMemorySpilledPerTask >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) { Severity.CRITICAL } else Severity.NONE } diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 2403dd8ae..4c0d6f727 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -198,6 +198,7 @@ object LegacyDataConverters { executorInfo.failedTasks, executorInfo.completedTasks, executorInfo.totalTasks, + executorInfo.maxTasks, executorInfo.duration, executorInfo.inputBytes, executorInfo.shuffleRead, diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index e107a5c4d..50f27ba00 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -39,6 +39,7 @@ public static class ExecutorInfo { public int completedTasks = 0; public int failedTasks = 0; public int totalTasks = 0; + public int maxTasks = 0; public long duration = 0L; public long inputBytes = 0L; public long outputBytes = 0L; @@ -49,7 +50,7 @@ public static class ExecutorInfo { public String toString() { return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed - + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", maxTasks" + maxTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}"; diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index e708d7859..70064494c 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -185,6 +185,7 @@ object SparkMetricsAggregatorTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, + maxTasks = 0, totalDuration, totalInputBytes = 0, totalShuffleRead = 0, diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala index 373cc56b0..8bec2b3fa 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala @@ -113,6 +113,7 @@ object ExecutorGcHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, + maxTasks = 0, totalDuration, totalInputBytes=0, totalShuffleRead=0, diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala index d11ca0dd1..6b083c5d5 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -103,12 +103,13 @@ object ExecutorStorageSpillHeuristicTest { activeTasks = 0, failedTasks = 0, completedTasks = 0, - totalTasks = 0, + totalTasks = 10, + maxTasks = 10, totalDuration=0, totalInputBytes=0, totalShuffleRead=0, totalShuffleWrite= 0, - maxMemory= 0, + maxMemory= 2000, totalGCTime = 0, totalMemoryBytesSpilled, executorLogs = Map.empty, diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index 8844b3aaf..2ef2580e9 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -244,6 +244,7 @@ object ExecutorsHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, + maxTasks = 0, totalDuration, totalInputBytes, totalShuffleRead, diff --git a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala index 8f80a7a39..d9b2e2106 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala @@ -80,6 +80,7 @@ object JvmUsedMemoryHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, + maxTasks = 0, totalDuration = 0, totalInputBytes = 0, totalShuffleRead = 0, diff --git a/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala index a1685838d..746b32741 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala @@ -56,6 +56,7 @@ object UnifiedMemoryHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, + maxTasks = 0, totalDuration = 0, totalInputBytes = 0, totalShuffleRead = 0,