From 8be3bafaf3c224bed5969ea7d06ed364ccfb0d07 Mon Sep 17 00:00:00 2001 From: swasti Date: Tue, 9 Jan 2018 14:29:26 +0530 Subject: [PATCH] acknowledging review comments --- .../heuristics/JvmUsedMemoryHeuristic.scala | 35 +++++++++++++------ .../JvmUsedMemoryHeuristicTest.scala | 4 --- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala index 0489fa3b0..f57aba948 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters /** - * A heuristic based on peak JVM used memory for the spark executors and drivers + * A heuristic based on peak JVM used memory for the spark executors and driver * */ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) @@ -37,6 +37,9 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData + lazy val executorPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY) + lazy val driverPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY) + override def apply(data: SparkApplicationData): HeuristicResult = { val evaluator = new Evaluator(this, data) @@ -71,10 +74,13 @@ object JvmUsedMemoryHeuristic { val JVM_USED_MEMORY = "jvmUsedMemory" val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" val SPARK_DRIVER_MEMORY = "spark.driver.memory" + // 300 * FileUtils.ONE_MB (300 * 1024 * 1024) val reservedMemory : Long = 314572800 val BUFFER_PERCENT : Int = 20 + val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "executor_peak_jvm_memory_threshold" + val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "driver_peak_jvm_memory_threshold" - class Evaluator(memoryFractionHeuristic: JvmUsedMemoryHeuristic, data: SparkApplicationData) { + class Evaluator(jvmUsedMemoryHeuristic: JvmUsedMemoryHeuristic, data: SparkApplicationData) { lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties @@ -84,22 +90,31 @@ object JvmUsedMemoryHeuristic { val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver")) val sparkExecutorMemory : Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L) val sparkDriverMemory : Long = appConfigurationProperties.get(SPARK_DRIVER_MEMORY).map(MemoryFormatUtils.stringToBytes).getOrElse(0L) - val medianPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map { - _.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue - }.sortWith(_< _).drop(executorList.size/2).head lazy val maxExecutorPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map { _.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue }.max - val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS = + lazy val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS = SeverityThresholds(low = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true) - val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS = + lazy val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS = SeverityThresholds(low = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true) - val severityExecutor = DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory) - val severityDriver = DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory) - + val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString == null) { + DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS + } else { + SeverityThresholds.parse(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxExecutorPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS) + } + + val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString == null) { + DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS + } else { + SeverityThresholds.parse(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxDriverPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS) + } + + val severityExecutor = MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory) + val severityDriver = MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory) + /** * disabling the skew check for executors * val severitySkew = DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS.severityOf(maxExecutorPeakJvmUsedMemory) diff --git a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala index 540af3bb6..f138de427 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala @@ -49,10 +49,6 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers { evaluator.severityDriver should be(Severity.CRITICAL) } - it("has median peak jvm memory") { - evaluator.medianPeakJvmUsedMemory should be (334569) - } - it("has max peak jvm memory") { evaluator.maxExecutorPeakJvmUsedMemory should be (394567123) }