diff --git a/app-conf/FetcherConf.xml b/app-conf/FetcherConf.xml
index 1a6bb4a01..1f813cf15 100644
--- a/app-conf/FetcherConf.xml
+++ b/app-conf/FetcherConf.xml
@@ -56,13 +56,12 @@
mapreduce
com.linkedin.drelephant.mapreduce.fetchers.MapReduceFSFetcherHadoop2
- false
500
- PST
+ false
+ UTC
-
+
spark
Spark Job Metrics
com.linkedin.drelephant.spark.heuristics.JobsHeuristic
views.html.help.spark.helpJobsHeuristic
+
spark
Spark Stage Metrics
com.linkedin.drelephant.spark.heuristics.StagesHeuristic
views.html.help.spark.helpStagesHeuristic
+
spark
- Peak Unified Memory
+ Executor Peak Unified Memory
com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic
views.html.help.spark.helpUnifiedMemoryHeuristic
+
+
spark
- JVM Used Memory
+ Executor JVM Used Memory
com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic
views.html.help.spark.helpJvmUsedMemoryHeuristic
+
+
spark
Stages with failed tasks
com.linkedin.drelephant.spark.heuristics.StagesWithFailedTasksHeuristic
views.html.help.spark.helpStagesWithFailedTasks
+
spark
Executor GC
com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic
views.html.help.spark.helpExecutorGcHeuristic
+
+
spark
Executor spill
com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic
views.html.help.spark.helpExecutorStorageSpillHeuristic
+
+
+
+
+ spark
+ Driver Metrics
+ com.linkedin.drelephant.spark.heuristics.DriverHeuristic
+ views.html.help.spark.helpDriverHeuristic
+
diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
index 3855954d7..a5f174df7 100644
--- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
+++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
@@ -215,7 +215,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}
private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummaryImpl] = {
- val target = attemptTarget.path("executors")
+ val target = attemptTarget.path("allexecutors")
try {
get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummaryImpl]])
} catch {
diff --git a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala
index 0c0193ef5..5af1ed94f 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala
@@ -29,7 +29,7 @@ import com.linkedin.drelephant.math.Statistics
* A heuristic based on an app's known configuration.
*
* The results from this heuristic primarily inform users about key app configuration settings, including
- * driver memory, driver cores, executor cores, executor instances, executor memory, and the serializer.
+ * executor cores, executor instances, executor memory, and the serializer.
*
* It also checks whether the values specified are within threshold.
*/
@@ -55,10 +55,6 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
property.getOrElse("Not presented. Using default.")
val resultDetails = Seq(
- new HeuristicResultDetails(
- SPARK_DRIVER_MEMORY_KEY,
- formatProperty(evaluator.driverMemoryBytes.map(MemoryFormatUtils.bytesToString))
- ),
new HeuristicResultDetails(
SPARK_EXECUTOR_MEMORY_KEY,
formatProperty(evaluator.executorMemoryBytes.map(MemoryFormatUtils.bytesToString))
@@ -80,16 +76,16 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString))
),
new HeuristicResultDetails(
- SPARK_DRIVER_CORES_KEY,
- formatProperty(evaluator.driverCores.map(_.toString))
+ SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
+ evaluator.sparkYarnExecutorMemoryOverhead
),
new HeuristicResultDetails(
- SPARK_YARN_DRIVER_MEMORY_OVERHEAD,
- evaluator.sparkYarnDriverMemoryOverhead
+ SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS,
+ evaluator.dynamicMinExecutors.getOrElse(0).toString
),
new HeuristicResultDetails(
- SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
- evaluator.sparkYarnExecutorMemoryOverhead
+ SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS,
+ evaluator.dynamicMaxExecutors.getOrElse(0).toString
)
)
// Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
@@ -109,21 +105,24 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)),
"Spark shuffle service is not enabled.")
}
- if (evaluator.severityMinExecutors == Severity.CRITICAL) {
- result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")
+ if (evaluator.severityMinExecutors != Severity.NONE) {
+ result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be "+ THRESHOLD_MIN_EXECUTORS + ". Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")
}
- if (evaluator.severityMaxExecutors == Severity.CRITICAL) {
- result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=900. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")
+ if (evaluator.severityMaxExecutors != Severity.NONE) {
+ result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=" + THRESHOLD_MAX_EXECUTORS + ". Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")
}
- if (evaluator.jarsSeverity == Severity.CRITICAL) {
+ if (evaluator.jarsSeverity != Severity.NONE) {
result.addResultDetail("Jars notation", "It is recommended to not use * notation while specifying jars in the field " + SPARK_YARN_JARS)
}
- if(evaluator.severityDriverMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
- result.addResultDetail("Driver Overhead Memory", "Please do not specify excessive amount of overhead memory for Driver. Change it in the field " + SPARK_YARN_DRIVER_MEMORY_OVERHEAD)
- }
- if(evaluator.severityExecutorMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
+ if(evaluator.severityExecutorMemoryOverhead != Severity.NONE) {
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD)
}
+ if(evaluator.severityExecutorCores != Severity.NONE) {
+ result.addResultDetail("Executor cores", "The number of executor cores should be <=" + evaluator.DEFAULT_SPARK_CORES_THRESHOLDS.low + ". Please change it in the field " + SPARK_EXECUTOR_CORES_KEY)
+ }
+ if(evaluator.severityExecutorMemory != Severity.NONE) {
+ result.addResultDetail("Executor memory", "Please do not specify excessive amount of executor memory. Change it in the field " + SPARK_EXECUTOR_MEMORY_KEY)
+ }
result
}
}
@@ -134,7 +133,6 @@ object ConfigurationHeuristic {
val SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY = "serializer_if_non_null_recommendation"
- val SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores"
@@ -142,12 +140,10 @@ object ConfigurationHeuristic {
val SPARK_APPLICATION_DURATION = "spark.application.duration"
val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
- val SPARK_DRIVER_CORES_KEY = "spark.driver.cores"
val SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
val SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
val SPARK_YARN_JARS = "spark.yarn.secondary.jars"
val SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead"
- val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
val THRESHOLD_MIN_EXECUTORS: Int = 1
val THRESHOLD_MAX_EXECUTORS: Int = 900
val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key"
@@ -159,9 +155,6 @@ object ConfigurationHeuristic {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
- lazy val driverMemoryBytes: Option[Long] =
- Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)
-
lazy val executorMemoryBytes: Option[Long] =
Try(getProperty(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)
@@ -171,9 +164,6 @@ object ConfigurationHeuristic {
lazy val executorCores: Option[Int] =
Try(getProperty(SPARK_EXECUTOR_CORES_KEY).map(_.toInt)).getOrElse(None)
- lazy val driverCores: Option[Int] =
- Try(getProperty(SPARK_DRIVER_CORES_KEY).map(_.toInt)).getOrElse(None)
-
lazy val dynamicMinExecutors: Option[Int] =
Try(getProperty(SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS).map(_.toInt)).getOrElse(None)
@@ -196,8 +186,6 @@ object ConfigurationHeuristic {
lazy val sparkYarnExecutorMemoryOverhead: String = if (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0"))
- lazy val sparkYarnDriverMemoryOverhead: String = if (getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
- MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0")
lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY)
@@ -211,7 +199,7 @@ object ConfigurationHeuristic {
case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET
}
- //The following thresholds are for checking if the memory and cores values (executor and driver) are above normal. These thresholds are experimental, and may change in the future.
+ //The following thresholds are for checking if the memory and cores values of executors are above normal. These thresholds are experimental, and may change in the future.
val DEFAULT_SPARK_MEMORY_THRESHOLDS =
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
@@ -219,8 +207,6 @@ object ConfigurationHeuristic {
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)
val severityExecutorMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(executorMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
- val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
- val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityExecutorCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(executorCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityMinExecutors = if (dynamicMinExecutors.getOrElse(0).asInstanceOf[Number].intValue > THRESHOLD_MIN_EXECUTORS) {
Severity.CRITICAL
@@ -233,12 +219,10 @@ object ConfigurationHeuristic {
Severity.NONE
}
val severityExecutorMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead))
- val severityDriverMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead))
-
//Severity for the configuration thresholds
- val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityExecutorCores, severityExecutorMemory,
- severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead, severityDriverMemoryOverhead)
+ val severityConfThresholds: Severity = Severity.max(severityExecutorCores, severityExecutorMemory,
+ severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead)
/**
* The following logic computes severity based on shuffle service and dynamic allocation flags.
diff --git a/app/com/linkedin/drelephant/spark/heuristics/DriverHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/DriverHeuristic.scala
new file mode 100644
index 000000000..b52356b07
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/heuristics/DriverHeuristic.scala
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.heuristics
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters
+import scala.util.Try
+import com.linkedin.drelephant.analysis._
+import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
+import com.linkedin.drelephant.spark.data.SparkApplicationData
+import com.linkedin.drelephant.util.MemoryFormatUtils
+import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
+
+/**
+ * A heuristic based the driver's configurations and memory used.
+ * It checks whether the configuration values specified are within the threshold range.
+ * It also analyses the peak JVM memory used and time spent in GC by the job.
+ */
+class DriverHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
+ extends Heuristic[SparkApplicationData] {
+
+ import DriverHeuristic._
+ import JavaConverters._
+
+ val gcSeverityThresholds: SeverityThresholds =
+ SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_THRESHOLDS_KEY), ascending = true)
+ .getOrElse(DEFAULT_GC_SEVERITY_THRESHOLDS)
+
+ val sparkOverheadMemoryThreshold: SeverityThresholds = SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY), ascending = true)
+ .getOrElse(DEFAULT_SPARK_OVERHEAD_MEMORY_THRESHOLDS)
+
+ val sparkExecutorMemoryThreshold: String = heuristicConfigurationData.getParamMap.getOrDefault(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY, DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD)
+
+ override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData
+
+ lazy val driverPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY)
+
+ override def apply(data: SparkApplicationData): HeuristicResult = {
+ val evaluator = new Evaluator(this, data)
+
+ def formatProperty(property: Option[String]): String =
+ property.getOrElse("Not presented. Using default.")
+
+ var resultDetails = Seq(
+ new HeuristicResultDetails(
+ SPARK_DRIVER_MEMORY_KEY,
+ formatProperty(evaluator.driverMemoryBytes.map(MemoryFormatUtils.bytesToString))
+ ),
+ new HeuristicResultDetails(
+ "Ratio of time spent in GC to total time", evaluator.ratio.toString
+ ),
+ new HeuristicResultDetails(
+ SPARK_DRIVER_CORES_KEY,
+ formatProperty(evaluator.driverCores.map(_.toString))
+ ),
+ new HeuristicResultDetails(
+ SPARK_YARN_DRIVER_MEMORY_OVERHEAD,
+ evaluator.sparkYarnDriverMemoryOverhead
+ ),
+ new HeuristicResultDetails("Max driver peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxDriverPeakJvmUsedMemory))
+ )
+ if(evaluator.severityJvmUsedMemory != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Driver Peak JVM used Memory", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY_KEY + ") is much more than the peak JVM used memory by the driver.")
+ }
+ if (evaluator.severityGc != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The driver is spending too much time on GC. We recommend increasing the driver memory.")
+ }
+ if(evaluator.severityDriverCores != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Driver Cores", "Please do not specify excessive number of driver cores. Change it in the field : " + SPARK_DRIVER_CORES_KEY)
+ }
+ if(evaluator.severityDriverMemoryOverhead != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Driver Overhead Memory", "Please do not specify excessive amount of overhead memory for Driver. Change it in the field " + SPARK_YARN_DRIVER_MEMORY_OVERHEAD)
+ }
+ if(evaluator.severityDriverMemory != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Spark Driver Memory", "Please do not specify excessive amount of memory for Driver. Change it in the field " + SPARK_DRIVER_MEMORY_KEY)
+ }
+
+ // Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
+ val mutableResultDetailsArrayList = new ArrayList(resultDetails.asJava)
+ val result = new HeuristicResult(
+ heuristicConfigurationData.getClassName,
+ heuristicConfigurationData.getHeuristicName,
+ evaluator.severity,
+ 0,
+ mutableResultDetailsArrayList
+ )
+ result
+ }
+}
+
+object DriverHeuristic {
+
+ val SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory"
+ val SPARK_DRIVER_CORES_KEY = "spark.driver.cores"
+ val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
+ val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key"
+ val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold_key"
+ val EXECUTION_MEMORY = "executionMemory"
+ val STORAGE_MEMORY = "storageMemory"
+ val JVM_USED_MEMORY = "jvmUsedMemory"
+
+ // 300 * FileUtils.ONE_MB (300 * 1024 * 1024)
+ val reservedMemory : Long = 314572800
+ val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "peak_jvm_memory_threshold"
+ val GC_SEVERITY_THRESHOLDS_KEY: String = "gc_severity_threshold"
+ val DEFAULT_GC_SEVERITY_THRESHOLDS =
+ SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true)
+
+ val DEFAULT_SPARK_OVERHEAD_MEMORY_THRESHOLDS =
+ SeverityThresholds(low = MemoryFormatUtils.stringToBytes("2G"), MemoryFormatUtils.stringToBytes("4G"),
+ severe = MemoryFormatUtils.stringToBytes("6G"), critical = MemoryFormatUtils.stringToBytes("8G"), ascending = true)
+
+ val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD = "2G"
+
+ class Evaluator(driverHeuristic: DriverHeuristic, data: SparkApplicationData) {
+ lazy val appConfigurationProperties: Map[String, String] =
+ data.appConfigurationProperties
+
+ lazy val executorSummaries : Seq[ExecutorSummary] = data.executorSummaries
+ lazy val driver : ExecutorSummary = executorSummaries.find(_.id == "driver").getOrElse(null)
+
+ if(driver == null) {
+ throw new Exception("No driver found!")
+ }
+
+ //peakJvmMemory calculations
+ val maxDriverPeakJvmUsedMemory : Long = driver.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue
+
+ lazy val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS =
+ SeverityThresholds(low = 1.25 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory),
+ severe = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 3 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true)
+
+ val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(driverHeuristic.driverPeakJvmMemoryThresholdString == null) {
+ DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS
+ } else {
+ SeverityThresholds.parse(driverHeuristic.driverPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxDriverPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS)
+ }
+
+ lazy val severityJvmUsedMemory : Severity = if (driverMemoryBytes.getOrElse(0L).asInstanceOf[Number].longValue <= MemoryFormatUtils.stringToBytes(driverHeuristic.sparkExecutorMemoryThreshold)) {
+ Severity.NONE
+ } else {
+ MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0L).asInstanceOf[Number].longValue)
+ }
+
+ //Gc Calculations
+ val ratio : Double = driver.totalGCTime.toDouble / driver.totalDuration.toDouble
+ val severityGc = driverHeuristic.gcSeverityThresholds.severityOf(ratio)
+
+ lazy val driverMemoryBytes: Option[Long] =
+ Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)
+
+ lazy val driverCores: Option[Int] =
+ Try(getProperty(SPARK_DRIVER_CORES_KEY).map(_.toInt)).getOrElse(None)
+
+ lazy val sparkYarnDriverMemoryOverhead: String = if (getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
+ MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0")
+
+ //The following thresholds are for checking if the memory and cores values (driver) are above normal. These thresholds are experimental, and may change in the future.
+ val DEFAULT_SPARK_MEMORY_THRESHOLDS =
+ SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
+ severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
+ val DEFAULT_SPARK_CORES_THRESHOLDS =
+ SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)
+
+ val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
+ val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)
+ val severityDriverMemoryOverhead = driverHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead))
+
+ //Severity for the configuration thresholds
+ val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityDriverMemoryOverhead)
+ lazy val severity: Severity = Severity.max(severityConfThresholds, severityGc, severityJvmUsedMemory)
+ private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key)
+ }
+
+}
diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala
index d4ef405dc..7b8929ed7 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala
@@ -21,6 +21,8 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
+import com.linkedin.drelephant.math.Statistics
+
import scala.collection.JavaConverters
@@ -47,8 +49,8 @@ class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfi
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString),
- new HeuristicResultDetails("Total GC time", evaluator.jvmTime.toString),
- new HeuristicResultDetails("Total Executor Runtime", evaluator.executorRunTimeTotal.toString)
+ new HeuristicResultDetails("Total GC time", evaluator.msecToString(evaluator.jvmTime)),
+ new HeuristicResultDetails("Total Executor Runtime", evaluator.msecToString(evaluator.executorRunTimeTotal))
)
//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation
@@ -98,7 +100,7 @@ object ExecutorGcHeuristic {
if (executorSummaries.isEmpty) {
throw new Exception("No executor information available.")
}
-
+
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries)
@@ -122,6 +124,25 @@ object ExecutorGcHeuristic {
})
(jvmGcTimeTotal, executorRunTimeTotal)
}
+
+ //convert millisec to units
+ def msecToString (milliSec: Long): String = {
+ var value : Long = milliSec
+ if(value < Statistics.SECOND_IN_MS){
+ return value.toString + " msec"
+ } else if(value < Statistics.MINUTE_IN_MS) {
+ return (value/Statistics.SECOND_IN_MS).toString + " Seconds"
+ } else if(value < Statistics.HOUR_IN_MS) {
+ return (value/Statistics.MINUTE_IN_MS).toString + " Minutes"
+ }else {
+ var minutes = (value % Statistics.HOUR_IN_MS)/Statistics.MINUTE_IN_MS
+ if(minutes == 0) {
+ return (value/Statistics.HOUR_IN_MS).toString + " Hours"
+ } else {
+ return (value/Statistics.HOUR_IN_MS).toString + " Hours " + minutes.toString + " Minutes"
+ }
+ }
+ }
}
}
diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala
index 7001c8e27..90571ed75 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala
@@ -97,13 +97,20 @@ object ExecutorStorageSpillHeuristic {
class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries
+ if (executorAndDriverSummaries == null) {
+ throw new Exception("Executors Summary is null.")
+ }
lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver"))
+ if (executorSummaries.isEmpty) {
+ throw new Exception("No executor information available.")
+ }
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
val maxTasks: Int = executorSummaries.head.maxTasks
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size
- val totalMemorySpilledPerTask = totalMemorySpilled/(executorSummaries.map(_.totalTasks).sum)
+ lazy val totalTasks = Integer.max(executorSummaries.map(_.totalTasks).sum, 1)
+ val totalMemorySpilledPerTask = totalMemorySpilled/totalTasks
lazy val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala
index dae604124..d125bc492 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristic.scala
@@ -16,16 +16,16 @@
package com.linkedin.drelephant.spark.heuristics
-import scala.collection.JavaConverters
-import scala.collection.mutable.ArrayBuffer
-
-import com.linkedin.drelephant.analysis.{Heuristic, HeuristicResult, HeuristicResultDetails, Severity, SeverityThresholds}
+import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
+import scala.collection.JavaConverters
+import scala.collection.mutable.ArrayBuffer
+
/**
* A heuristic based on metrics for a Spark app's executors.
@@ -37,6 +37,7 @@ import com.linkedin.drelephant.util.MemoryFormatUtils
class ExecutorsHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {
import ExecutorsHeuristic._
+
import JavaConverters._
val maxToMedianRatioSeverityThresholds: SeverityThresholds =
diff --git a/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala
index f57aba948..5875151ed 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristic.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters
/**
- * A heuristic based on peak JVM used memory for the spark executors and driver
+ * A heuristic based on peak JVM used memory for the spark executors
*
*/
class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
@@ -38,25 +38,19 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData
lazy val executorPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY)
- lazy val driverPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY)
+ lazy val sparkExecutorMemoryThreshold: String = heuristicConfigurationData.getParamMap.getOrDefault(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY, DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD)
override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("Max executor peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxExecutorPeakJvmUsedMemory)),
- new HeuristicResultDetails("Max driver peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxDriverPeakJvmUsedMemory)),
- new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
- new HeuristicResultDetails("spark.driver.memory", MemoryFormatUtils.bytesToString(evaluator.sparkDriverMemory))
+ new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory))
)
- if(evaluator.severityExecutor.getValue > Severity.LOW.getValue) {
- resultDetails :+ new HeuristicResultDetails("Executor Memory", "The allocated memory for the executor (in " + SPARK_EXECUTOR_MEMORY +") is much more than the peak JVM used memory by executors.")
- resultDetails :+ new HeuristicResultDetails("Reasonable size for executor memory", ((1+BUFFER_PERCENT.toDouble/100.0)*evaluator.maxExecutorPeakJvmUsedMemory).toString)
- }
-
- if(evaluator.severityDriver.getValue > Severity.LOW.getValue) {
- resultDetails :+ new HeuristicResultDetails("Driver Memory", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY + ") is much more than the peak JVM used memory by the driver.")
+ if (evaluator.severity != Severity.NONE) {
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Executor Memory", "The allocated memory for the executor (in " + SPARK_EXECUTOR_MEMORY + ") is much more than the peak JVM used memory by executors.")
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Suggested spark.executor.memory", MemoryFormatUtils.roundOffMemoryStringToNextInteger((MemoryFormatUtils.bytesToString(((1 + BUFFER_FRACTION) * evaluator.maxExecutorPeakJvmUsedMemory).toLong))))
}
val result = new HeuristicResult(
@@ -73,52 +67,39 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
object JvmUsedMemoryHeuristic {
val JVM_USED_MEMORY = "jvmUsedMemory"
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
- val SPARK_DRIVER_MEMORY = "spark.driver.memory"
+ val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold"
+
// 300 * FileUtils.ONE_MB (300 * 1024 * 1024)
- val reservedMemory : Long = 314572800
- val BUFFER_PERCENT : Int = 20
+ val reservedMemory: Long = 314572800
+ val BUFFER_FRACTION: Double = 0.2
val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "executor_peak_jvm_memory_threshold"
- val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "driver_peak_jvm_memory_threshold"
+ lazy val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD = "2G"
class Evaluator(jvmUsedMemoryHeuristic: JvmUsedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
- lazy val driverSummary : Option[ExecutorSummary] = executorSummaries.find(_.id.equals("driver"))
- val maxDriverPeakJvmUsedMemory : Long = driverSummary.get.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue
- val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
- val sparkExecutorMemory : Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
- val sparkDriverMemory : Long = appConfigurationProperties.get(SPARK_DRIVER_MEMORY).map(MemoryFormatUtils.stringToBytes).getOrElse(0L)
+ val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
+ val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
lazy val maxExecutorPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map {
_.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue
}.max
lazy val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS =
- SeverityThresholds(low = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true)
-
- lazy val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS =
- SeverityThresholds(low = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true)
+ SeverityThresholds(low = 1.25 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 3 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true)
- val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString == null) {
+ val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS: SeverityThresholds = if (jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString == null) {
DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS
} else {
SeverityThresholds.parse(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxExecutorPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS)
}
- val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString == null) {
- DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS
+ lazy val severity = if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(jvmUsedMemoryHeuristic.sparkExecutorMemoryThreshold)) {
+ Severity.NONE
} else {
- SeverityThresholds.parse(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxDriverPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS)
+ MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
}
-
- val severityExecutor = MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
- val severityDriver = MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory)
-
- /**
- * disabling the skew check for executors
- * val severitySkew = DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS.severityOf(maxExecutorPeakJvmUsedMemory)
- */
- val severity : Severity = Severity.max(severityDriver, severityExecutor)
}
+
}
diff --git a/app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala
index b2c36f90b..baa0426bd 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala
@@ -192,7 +192,8 @@ object StagesHeuristic {
}
private def averageExecutorRuntimeAndSeverityOf(stageData: StageData): (Long, Severity) = {
- val averageExecutorRuntime = stageData.executorRunTime / executorInstances
+ val allTasks : Int = Integer.max((stageData.numActiveTasks + stageData.numCompleteTasks + stageData.numFailedTasks), 1)
+ val averageExecutorRuntime = stageData.executorRunTime / allTasks
(averageExecutorRuntime, stageRuntimeMillisSeverityThresholds.severityOf(averageExecutorRuntime))
}
}
diff --git a/app/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristic.scala
index 162d1021a..c062db2c1 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristic.scala
@@ -43,7 +43,7 @@ class StagesWithFailedTasksHeuristic(private val heuristicConfigurationData: Heu
new HeuristicResultDetails("Stages with Overhead memory errors", evaluator.stagesWithOverheadError.toString)
)
if (evaluator.severityOverheadStages.getValue >= Severity.MODERATE.getValue)
- resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Some tasks have failed due to overhead memory error. Please try increasing spark.yarn.executor.memoryOverhead by 500MB in spark.yarn.executor.memoryOverhead")
+ resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Some tasks have failed due to overhead memory error. Please try increasing spark.yarn.executor.memoryOverhead by " + increaseMemoryBy +" in spark.yarn.executor.memoryOverhead")
//TODO: refine recommendations
if (evaluator.severityOOMStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("OOM errors", "Some tasks have failed due to OOM error. Try increasing spark.executor.memory or decreasing spark.memory.fraction (take a look at unified memory heuristic) or decreasing number of cores.")
@@ -63,6 +63,7 @@ object StagesWithFailedTasksHeuristic {
val OOM_ERROR = "java.lang.OutOfMemoryError"
val OVERHEAD_MEMORY_ERROR = "killed by YARN for exceeding memory limits"
val ratioThreshold: Double = 2
+ val increaseMemoryBy: String = "1G"
class Evaluator(memoryFractionHeuristic: StagesWithFailedTasksHeuristic, data: SparkApplicationData) {
lazy val stagesWithFailedTasks: Seq[StageData] = data.stagesWithFailedTasks
diff --git a/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala
index 3b6f54cfb..53d261d9b 100644
--- a/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala
+++ b/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala
@@ -39,6 +39,7 @@ class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData
lazy val peakUnifiedMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(PEAK_UNIFIED_MEMORY_THRESHOLD_KEY)
+ val sparkExecutorMemoryThreshold: String = heuristicConfigurationData.getParamMap.getOrDefault(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY, DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD)
override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
@@ -69,6 +70,8 @@ object UnifiedMemoryHeuristic {
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_MEMORY_FRACTION_KEY = "spark.memory.fraction"
val PEAK_UNIFIED_MEMORY_THRESHOLD_KEY = "peak_unified_memory_threshold"
+ val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold"
+ val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD = "2G"
class Evaluator(unifiedMemoryHeuristic: UnifiedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
@@ -95,39 +98,28 @@ object UnifiedMemoryHeuristic {
SeverityThresholds.parse(unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString.split(",").map(_.toDouble * maxMemory).toString, ascending = false).getOrElse(DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD)
}
- def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
- return PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
- + executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
- }
-
val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
- val sparkMemoryFraction: Double = appConfigurationProperties.getOrElse(SPARK_MEMORY_FRACTION_KEY, 0.6D).asInstanceOf[Number].doubleValue
+ lazy val sparkMemoryFraction: Double = appConfigurationProperties.getOrElse(SPARK_MEMORY_FRACTION_KEY, "0.6").toDouble
lazy val meanUnifiedMemory: Long = (executorList.map {
executorSummary => {
- executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
- +executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
+ (executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ + executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
}
}.sum) / executorList.size
lazy val maxUnifiedMemory: Long = executorList.map {
executorSummary => {
- executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
- +executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
+ (executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ + executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
}
}.max
- lazy val severity: Severity = {
- var severityPeakUnifiedMemoryVariable: Severity = Severity.NONE
- for (executorSummary <- executorList) {
- var peakUnifiedMemoryExecutorSeverity: Severity = getPeakUnifiedMemoryExecutorSeverity(executorSummary)
- if (peakUnifiedMemoryExecutorSeverity.getValue > severityPeakUnifiedMemoryVariable.getValue) {
- severityPeakUnifiedMemoryVariable = peakUnifiedMemoryExecutorSeverity
- }
- }
- severityPeakUnifiedMemoryVariable
+ lazy val severity: Severity = if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(unifiedMemoryHeuristic.sparkExecutorMemoryThreshold)) {
+ Severity.NONE
+ } else {
+ PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(maxUnifiedMemory)
}
}
-
}
diff --git a/app/com/linkedin/drelephant/util/MemoryFormatUtils.java b/app/com/linkedin/drelephant/util/MemoryFormatUtils.java
index 8ed49fbc0..0d7a8f584 100644
--- a/app/com/linkedin/drelephant/util/MemoryFormatUtils.java
+++ b/app/com/linkedin/drelephant/util/MemoryFormatUtils.java
@@ -90,7 +90,7 @@ public static String bytesToString(long value) {
* @return The bytes value
*/
public static long stringToBytes(String formattedString) {
- if (formattedString == null) {
+ if (formattedString == null || formattedString.isEmpty()) {
return 0L;
}
@@ -124,4 +124,41 @@ public static long stringToBytes(String formattedString) {
+ "] does not match any unit. The supported units are (case-insensitive, and also the 'B' is ignorable): ["
+ StringUtils.join(UNITS) + "].");
}
+
+ /**
+ * Given a memory value in string format, it rounds off the double value to next integer.
+ * @param formattedString
+ * @return : formatted String with int value to next integer.
+ */
+ public static String roundOffMemoryStringToNextInteger(String formattedString) {
+ if (formattedString == null || formattedString.isEmpty()) {
+ return "";
+ }
+
+ //handling if the string has , for eg. 1,000MB
+ formattedString = formattedString.replace(",", "");
+
+ Matcher matcher = REGEX_MATCHER.matcher(formattedString);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(
+ "The formatted string [" + formattedString + "] does not match with the regex /" + REGEX_MATCHER.toString()
+ + "/");
+ }
+ if (matcher.groupCount() != 1 && matcher.groupCount() != 2) {
+ throw new IllegalArgumentException();
+ }
+
+ double numPart = Double.parseDouble(matcher.group(1));
+ if (numPart < 0) {
+ throw new IllegalArgumentException("The number part of the memory cannot be less than zero: [" + numPart + "].");
+ }
+
+ int numPartInt = (int) Math.ceil(numPart);
+
+ String unitPart = matcher.groupCount() == 2 ? matcher.group(2).toUpperCase() : "";
+ if (!unitPart.endsWith("B")) {
+ unitPart += "B";
+ }
+ return (numPartInt + " " + unitPart);
+ }
}
diff --git a/app/views/help/spark/helpConfigurationHeuristic.scala.html b/app/views/help/spark/helpConfigurationHeuristic.scala.html
index 1d9ec968f..6bf673521 100644
--- a/app/views/help/spark/helpConfigurationHeuristic.scala.html
+++ b/app/views/help/spark/helpConfigurationHeuristic.scala.html
@@ -14,8 +14,7 @@
* the License.
*@
The results from this heuristic primarily inform you about key app
-configuration settings, including driver memory, driver cores, executor cores,
-executor instances, executor memory, and the serializer.
-It also checks the values of dynamically allocated min and max executors, the specified yarn jars, executor and driver memory overhead and whether other configuration values are within threshold.
+configuration settings, including executor cores, executor instances, executor memory, and the serializer.
+It also checks the values of dynamically allocated min and max executors, the specified yarn jars, executor memory overhead and whether other configuration values are within threshold.
Suggestions
Suggestions based on the configurations you have set are given in the heuristic result itself.
\ No newline at end of file
diff --git a/app/views/help/spark/helpDriverHeuristic.scala.html b/app/views/help/spark/helpDriverHeuristic.scala.html
new file mode 100644
index 000000000..83f65a674
--- /dev/null
+++ b/app/views/help/spark/helpDriverHeuristic.scala.html
@@ -0,0 +1,23 @@
+@*
+* Copyright 2016 LinkedIn Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the "License"); you may not
+* use this file except in compliance with the License. You may obtain a copy of
+* the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*@
+This is a heuristic for checking whether the driver is well tuned and the configurations are set to a good value.
+It checks the following properties
+Driver Max Peak JVM Used Memory
+This analyses whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the user allocated driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.
+Time spent by driver in GC
+This checks if your job spends too much time in GC. We recommend increasing spark.driver.memory if it does.
+Checking configuration thresholds
+The values of spark.driver.memory, spark.driver.cores and spark.yarn.driver.memoryOverhead are checked to verify if they are within threshold values.
\ No newline at end of file
diff --git a/app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html b/app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html
index f58462821..8db78a999 100644
--- a/app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html
+++ b/app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html
@@ -15,6 +15,5 @@
*@
This is a heuristic for peak JVM used memory.
Executor Max Peak JVM Used Memory
-This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the blocked executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.
-Driver Max Peak JVM Used Memory
-This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.
+This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the user allocated executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.
+ Note: Please note that for calculation purposes Dr. Elephant considers 1024 Bytes in 1 KB whereas the spark history server considers 1000 Bytes. So please don't get confused if you find discrepancy in values from these two places.
\ No newline at end of file
diff --git a/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html b/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
index 9ea156004..352e9e3ff 100644
--- a/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
+++ b/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
@@ -22,3 +22,4 @@ Action Items
2. If your job's Executor Memory is high, then we recommend reducing the spark.executor.memory itself which will lower the Allocated Unified Memory space.
Note:
spark.memory.fraction: This is the fraction of JVM Used Memory (Executor memory - Reserved memory) dedicated to the unified memory region (execution + storage). It basically partitions user memory from execution and storage memory.
+ Note: Please note that for calculation purposes Dr. Elephant considers 1024 Bytes in 1 KB whereas the spark history server considers 1000 Bytes. So please don't get confused if you find discrepancy in values from these two places.
\ No newline at end of file
diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
index 947fc8e7b..c20223fb8 100644
--- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
+++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
@@ -303,7 +303,7 @@ object SparkRestClientTest {
@Path("applications/{appId}/{attemptId}/stages")
def getStages(): StagesResource = new StagesResource()
- @Path("applications/{appId}/{attemptId}/executors")
+ @Path("applications/{appId}/{attemptId}/allexecutors")
def getExecutors(): ExecutorsResource = new ExecutorsResource()
@Path("applications/{appId}/{attemptId}/logs")
@@ -385,7 +385,7 @@ object SparkRestClientTest {
@Path("applications/{appId}/stages")
def getStages(): StagesResource = new StagesResource()
- @Path("applications/{appId}/executors")
+ @Path("applications/{appId}/allexecutors")
def getExecutors(): ExecutorsResource = new ExecutorsResource()
@Path("applications/{appId}/logs")
diff --git a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala
index f04777d5b..f22b5379e 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala
@@ -27,7 +27,9 @@ import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
import java.util.Date
-
+/**
+ * Test class for Configuration Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class ConfigurationHeuristicTest extends FunSpec with Matchers {
import ConfigurationHeuristicTest._
@@ -60,67 +62,50 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
it("returns the size of result details") {
- heuristicResultDetails.size() should be(9)
+ heuristicResultDetails.size() should be(8)
}
it("returns the severity") {
heuristicResult.getSeverity should be(Severity.NONE)
}
- it("returns the driver memory") {
- val details = heuristicResultDetails.get(0)
- details.getName should include("spark.driver.memory")
- details.getValue should be("2 GB")
- }
-
it("returns the executor memory") {
- val details = heuristicResultDetails.get(1)
+ val details = heuristicResultDetails.get(0)
details.getName should include("spark.executor.memory")
details.getValue should be("1 GB")
}
it("returns the executor instances") {
- val details = heuristicResultDetails.get(2)
+ val details = heuristicResultDetails.get(1)
details.getName should include("spark.executor.instances")
details.getValue should be("900")
}
it("returns the executor cores") {
- val details = heuristicResultDetails.get(3)
+ val details = heuristicResultDetails.get(2)
details.getName should include("spark.executor.cores")
details.getValue should include("default")
}
it("returns the application duration") {
- val details = heuristicResultDetails.get(4)
+ val details = heuristicResultDetails.get(3)
details.getName should include("spark.application.duration")
details.getValue should include("10")
}
it("returns the dynamic allocation flag") {
- val details = heuristicResultDetails.get(5)
+ val details = heuristicResultDetails.get(4)
details.getName should include("spark.dynamicAllocation.enabled")
details.getValue should be("true")
}
-
- it("returns the driver cores") {
- val details = heuristicResultDetails.get(6)
- details.getName should include("spark.driver.cores")
- details.getValue should include("default")
- }
-
- it("returns the driver overhead memory") {
- val details = heuristicResultDetails.get(7)
- details.getName should include("spark.yarn.driver.memoryOverhead")
- details.getValue should include("500 MB")
- }
}
describe("apply with Severity") {
val configurationProperties = Map(
"spark.serializer" -> "dummySerializer",
"spark.shuffle.service.enabled" -> "false",
- "spark.dynamicAllocation.enabled" -> "true"
+ "spark.dynamicAllocation.enabled" -> "true",
+ "spark.executor.cores" -> "5"
)
val data = newFakeSparkApplicationData(configurationProperties)
@@ -136,24 +121,30 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
}
it("returns the dynamic allocation flag") {
- val details = heuristicResultDetails.get(5)
+ val details = heuristicResultDetails.get(4)
details.getName should include("spark.dynamicAllocation.enabled")
details.getValue should be("true")
}
it("returns the serializer") {
- val details = heuristicResultDetails.get(9)
+ val details = heuristicResultDetails.get(8)
details.getName should include("spark.serializer")
details.getValue should be("dummySerializer")
details.getDetails should be("KyroSerializer is Not Enabled.")
}
it("returns the shuffle service flag") {
- val details = heuristicResultDetails.get(10)
+ val details = heuristicResultDetails.get(9)
details.getName should include("spark.shuffle.service.enabled")
details.getValue should be("false")
details.getDetails should be("Spark shuffle service is not enabled.")
}
+
+ it("returns executor cores") {
+ val details = heuristicResultDetails.get(10)
+ details.getName should include("Executor cores")
+ details.getValue should be("The number of executor cores should be <=4. Please change it in the field spark.executor.cores")
+ }
}
describe(".Evaluator") {
@@ -163,16 +154,6 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
new Evaluator(configurationHeuristic, newFakeSparkApplicationData(configurationProperties))
}
- it("has the driver memory bytes when they're present") {
- val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.driver.memory" -> "2G"))
- evaluator.driverMemoryBytes should be(Some(2L * 1024 * 1024 * 1024))
- }
-
- it("has no driver memory bytes when they're absent") {
- val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
- evaluator.driverMemoryBytes should be(None)
- }
-
it("has the executor memory bytes when they're present") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.executor.memory" -> "1g"))
evaluator.executorMemoryBytes should be(Some(1L * 1024 * 1024 * 1024))
@@ -198,21 +179,11 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
evaluator.executorCores should be(Some(2))
}
- it("has the driver cores when they're present") {
- val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.driver.cores" -> "3"))
- evaluator.driverCores should be(Some(3))
- }
-
it("has no executor cores when they're absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.executorCores should be(None)
}
- it("has no driver cores when they're absent") {
- val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
- evaluator.driverCores should be(None)
- }
-
it("has the serializer when it's present") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
diff --git a/test/com/linkedin/drelephant/spark/heuristics/DriverHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/DriverHeuristicTest.scala
new file mode 100644
index 000000000..fe0da4ec6
--- /dev/null
+++ b/test/com/linkedin/drelephant/spark/heuristics/DriverHeuristicTest.scala
@@ -0,0 +1,104 @@
+package com.linkedin.drelephant.spark.heuristics
+
+import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
+import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
+import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData}
+import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
+import com.linkedin.drelephant.spark.heuristics.DriverHeuristic.Evaluator
+import org.scalatest.{FunSpec, Matchers}
+
+import scala.collection.JavaConverters
+import scala.concurrent.duration.Duration
+
+/**
+ * Test class for Driver Metrics Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
+class DriverHeuristicTest extends FunSpec with Matchers {
+
+ import DriverHeuristicTest._
+
+ val heuristicConfigurationData = newFakeHeuristicConfigurationData()
+
+ val driverHeuristic = new DriverHeuristic(heuristicConfigurationData)
+
+ val executorData = Seq(
+ newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567), null, 0, 0),
+ newDummyExecutorData("2", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34568), null, 0, 0),
+ newDummyExecutorData("3", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 34569), null, 0, 0),
+ newDummyExecutorData("4", 400000, Map("executionMemory" -> 20000, "storageMemory" -> 3456), null, 0, 0),
+ newDummyExecutorData("5", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34564), null, 0, 0),
+ newDummyExecutorData("driver", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94561), Map("jvmUsedMemory" -> 394567123),
+ totalGCTime = Duration("2min").toMillis, totalDuration = Duration("15min").toMillis)
+ )
+ describe(".apply") {
+ val data = newFakeSparkApplicationData(executorData)
+ val heuristicResult = driverHeuristic.apply(data)
+ val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
+
+ it("has severity") {
+ heuristicResult.getSeverity should be(Severity.SEVERE)
+ }
+
+ describe("Evaluator") {
+ val evaluator = new Evaluator(driverHeuristic, data)
+ it("has max driver peak JVM memory") {
+ evaluator.maxDriverPeakJvmUsedMemory should be(394567123)
+ }
+ it("ratio of time spend in Gc to total duration") {
+ evaluator.ratio should be(0.13333333333333333)
+ }
+ }
+ }
+}
+
+object DriverHeuristicTest {
+
+ import JavaConverters._
+
+ def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
+ new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)
+
+ def newDummyExecutorData(
+ id: String,
+ maxMemory: Long,
+ peakUnifiedMemory: Map[String, Long],
+ peakJvmUsedMemory: Map[String, Long],
+ totalGCTime: Long,
+ totalDuration: Long
+ ): ExecutorSummaryImpl = new ExecutorSummaryImpl(
+ id,
+ hostPort = "",
+ rddBlocks = 0,
+ memoryUsed = 0,
+ diskUsed = 0,
+ activeTasks = 0,
+ failedTasks = 0,
+ completedTasks = 0,
+ totalTasks = 0,
+ maxTasks = 0,
+ totalDuration,
+ totalInputBytes = 0,
+ totalShuffleRead = 0,
+ totalShuffleWrite = 0,
+ maxMemory,
+ totalGCTime,
+ totalMemoryBytesSpilled = 0,
+ executorLogs = Map.empty,
+ peakJvmUsedMemory,
+ peakUnifiedMemory
+ )
+
+ def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
+ val appId = "application_1"
+ val restDerivedData = SparkRestDerivedData(
+ new ApplicationInfoImpl(appId, name = "app", Seq.empty),
+ jobDatas = Seq.empty,
+ stageDatas = Seq.empty,
+ executorSummaries = executorSummaries,
+ stagesWithFailedTasks = Seq.empty
+ )
+
+ SparkApplicationData(appId, restDerivedData, logDerivedData = None)
+ }
+}
+
diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala
index 8bec2b3fa..2204922c1 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala
@@ -26,18 +26,14 @@ import org.scalatest.{FunSpec, Matchers}
import scala.concurrent.duration.Duration
-
+/**
+ * Test class for Executor GC Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class ExecutorGcHeuristicTest extends FunSpec with Matchers {
import ExecutorGcHeuristicTest._
describe("ExecutorGcHeuristic") {
- val heuristicConfigurationData = newFakeHeuristicConfigurationData(
- Map(
- "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
- "ignore_max_bytes_less_than_threshold" -> "4000000",
- "ignore_max_millis_less_than_threshold" -> "4000001"
- )
- )
+ val heuristicConfigurationData = newFakeHeuristicConfigurationData()
val executorGcHeuristic = new ExecutorGcHeuristic(heuristicConfigurationData)
val executorSummaries = Seq(
@@ -63,10 +59,21 @@ class ExecutorGcHeuristicTest extends FunSpec with Matchers {
)
)
+ val executorSummaries1 = Seq(
+ newFakeExecutorSummary(
+ id = "1",
+ totalGCTime = 500,
+ totalDuration = 700
+ )
+ )
+
describe(".apply") {
- val data1 = newFakeSparkApplicationData(executorSummaries)
- val heuristicResult = executorGcHeuristic.apply(data1)
+ val data = newFakeSparkApplicationData(executorSummaries)
+ val data1 = newFakeSparkApplicationData(executorSummaries1)
+ val heuristicResult = executorGcHeuristic.apply(data)
+ val heuristicResult1 = executorGcHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
+ val heuristicResultDetails1 = heuristicResult1.getHeuristicResultDetails
it("returns the severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
@@ -81,13 +88,25 @@ class ExecutorGcHeuristicTest extends FunSpec with Matchers {
it("returns the total GC time") {
val details = heuristicResultDetails.get(1)
details.getName should include("Total GC time")
- details.getValue should be("1200000")
+ details.getValue should be("20 Minutes")
}
it("returns the executor's run time") {
val details = heuristicResultDetails.get(2)
details.getName should include("Total Executor Runtime")
- details.getValue should be("4740000")
+ details.getValue should be("1 Hours 19 Minutes")
+ }
+
+ it("returns total Gc Time in millisec") {
+ val details = heuristicResultDetails1.get(1)
+ details.getName should include("Total GC time")
+ details.getValue should be("500 msec")
+ }
+
+ it("returns executor run Time in millisec") {
+ val details = heuristicResultDetails1.get(2)
+ details.getName should include("Total Executor Runtime")
+ details.getValue should be("700 msec")
}
}
}
diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala
index 6b083c5d5..e02e4f2fb 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala
@@ -21,10 +21,13 @@ import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThre
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl}
+import com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic.Evaluator
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
-
+/**
+ * Test class for Executor Storage Spill Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
import ExecutorStorageSpillHeuristicTest._
@@ -59,6 +62,7 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties)
val heuristicResult = executorStorageSpillHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
+ val evaluator = new Evaluator(executorStorageSpillHeuristic, data1)
it("returns the severity") {
heuristicResult.getSeverity should be(Severity.SEVERE)
@@ -81,6 +85,10 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
details.getName should include("Mean memory spilled")
details.getValue should be("195.31 KB")
}
+
+ it("has the memory spilled per task") {
+ evaluator.totalMemorySpilledPerTask should be(800000)
+ }
}
}
}
@@ -103,7 +111,7 @@ object ExecutorStorageSpillHeuristicTest {
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
- totalTasks = 10,
+ totalTasks = 0,
maxTasks = 10,
totalDuration=0,
totalInputBytes=0,
diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala
index 2ef2580e9..c1996fe41 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala
@@ -25,7 +25,9 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl,
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
-
+/**
+ * Test class for Executors Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class ExecutorsHeuristicTest extends FunSpec with Matchers {
import ExecutorsHeuristicTest._
diff --git a/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala
index c112a312e..435eddcdb 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala
@@ -26,7 +26,9 @@ import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
-
+/**
+ * Test class for Jobs Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class JobsHeuristicTest extends FunSpec with Matchers {
import JobsHeuristicTest._
diff --git a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala
index d9b2e2106..e0dc6641f 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala
@@ -9,6 +9,9 @@ import org.scalatest.{FunSpec, Matchers}
import scala.collection.JavaConverters
+/**
+ * Test class for JVM Used memory. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
import JvmUsedMemoryHeuristicTest._
@@ -17,44 +20,49 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
val peakJvmUsedMemoryHeuristic = new JvmUsedMemoryHeuristic(heuristicConfigurationData)
- val appConfigurationProperties = Map("spark.driver.memory"->"40000000000", "spark.executor.memory"->"500000000")
+ val appConfigurationProperties = Map("spark.driver.memory"->"40000000000", "spark.executor.memory"->"50000000000")
val executorData = Seq(
newDummyExecutorData("1", Map("jvmUsedMemory" -> 394567123)),
- newDummyExecutorData("2", Map("jvmUsedMemory" -> 23456834)),
- newDummyExecutorData("3", Map("jvmUsedMemory" -> 334569)),
- newDummyExecutorData("4", Map("jvmUsedMemory" -> 134563)),
- newDummyExecutorData("5", Map("jvmUsedMemory" -> 234564)),
+ newDummyExecutorData("2", Map("jvmUsedMemory" -> 2834)),
+ newDummyExecutorData("3", Map("jvmUsedMemory" -> 3945667)),
+ newDummyExecutorData("4", Map("jvmUsedMemory" -> 16367890)),
+ newDummyExecutorData("5", Map("jvmUsedMemory" -> 2345647)),
newDummyExecutorData("driver", Map("jvmUsedMemory" -> 394561))
)
describe(".apply") {
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val heuristicResult = peakJvmUsedMemoryHeuristic.apply(data)
+ val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}
+ it("has all the details") {
+ heuristicResultDetails.size() should be(4)
+ }
+
describe(".Evaluator") {
import JvmUsedMemoryHeuristic.Evaluator
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
+ val heuristicResult = peakJvmUsedMemoryHeuristic.apply(data)
+ val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
val evaluator = new Evaluator(peakJvmUsedMemoryHeuristic, data)
it("has severity executor") {
- evaluator.severityExecutor should be(Severity.NONE)
- }
-
- it("has severity driver") {
- evaluator.severityDriver should be(Severity.CRITICAL)
+ evaluator.severity should be(Severity.CRITICAL)
}
it("has max peak jvm memory") {
evaluator.maxExecutorPeakJvmUsedMemory should be (394567123)
}
- it("has max driver peak jvm memory") {
- evaluator.maxDriverPeakJvmUsedMemory should be (394561)
+ it("has reasonable size") {
+ val details = heuristicResultDetails.get(3)
+ details.getName should be ("Suggested spark.executor.memory")
+ details.getValue should be ("452 MB")
}
}
}
diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala
index 723f69250..e6aae4fe1 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala
@@ -27,6 +27,9 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
+/**
+ * Test class for Stages Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class StagesHeuristicTest extends FunSpec with Matchers {
import StagesHeuristicTest._
@@ -81,13 +84,6 @@ class StagesHeuristicTest extends FunSpec with Matchers {
|stage 4, attempt 0 (task failure rate: 0.800)""".stripMargin
)
}
-
- it("returns the list of stages with long runtimes") {
- heuristicResultDetails.get(4).getValue should be(
- s"""|stage 8, attempt 0 (runtime: 45 min)
- |stage 9, attempt 0 (runtime: 1 hr)""".stripMargin
- )
- }
}
describe(".Evaluator") {
@@ -113,15 +109,6 @@ class StagesHeuristicTest extends FunSpec with Matchers {
evaluator.stagesWithHighTaskFailureRates.map { case (stageData, taskFailureRate) => (stageData.stageId, taskFailureRate) }
stageIdsAndTaskFailureRates should contain theSameElementsInOrderAs(Seq((3, 0.6D), (4, 0.8D)))
}
-
- it("has the list of stages with long average executor runtimes") {
- val stageIdsAndRuntimes =
- evaluator.stagesWithLongAverageExecutorRuntimes.map { case (stageData, runtime) => (stageData.stageId, runtime) }
- stageIdsAndRuntimes should contain theSameElementsInOrderAs(
- Seq((8, Duration("45min").toMillis), (9, Duration("60min").toMillis))
- )
- }
-
it("computes the overall severity") {
evaluator.severity should be(Severity.CRITICAL)
}
diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala
index 219e3ec05..cdfdc11ea 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala
@@ -28,7 +28,9 @@ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationDa
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, StageDataImpl, TaskDataImpl}
-
+/**
+ * Test class for Stages With Failed Tasks Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class StagesWithFailedTasksHeuristicTest extends FunSpec with Matchers {
import StagesWithFailedTasksHeuristicTest._
diff --git a/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala
index 746b32741..68f668efe 100644
--- a/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala
+++ b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala
@@ -2,36 +2,89 @@ package com.linkedin.drelephant.spark.heuristics
import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
-import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData}
+import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
+import com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic.Evaluator
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
import scala.collection.JavaConverters
+/**
+ * Test class for Unified Memory Heuristic. It checks whether all the values used in the heuristic are calculated correctly.
+ */
class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
import UnifiedMemoryHeuristicTest._
val heuristicConfigurationData = newFakeHeuristicConfigurationData()
-
- val memoryFractionHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)
+ val unifiedMemoryHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)
+ val appConfigurationProperties = Map("spark.executor.memory"->"3147483647")
+ val appConfigurationProperties1 = Map("spark.executor.memory"->"214567874847")
val executorData = Seq(
- newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
+ newDummyExecutorData("1", 999999999, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
newDummyExecutorData("2", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34568)),
newDummyExecutorData("3", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 34569)),
newDummyExecutorData("4", 400000, Map("executionMemory" -> 20000, "storageMemory" -> 3456)),
newDummyExecutorData("5", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34564)),
newDummyExecutorData("6", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94561))
)
+
+ val executorData1 = Seq(
+ newDummyExecutorData("driver", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
+ newDummyExecutorData("2", 999999999, Map("executionMemory" -> 200, "storageMemory" -> 200))
+ )
+
+ val executorData2 = Seq(
+ newDummyExecutorData("driver", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
+ newDummyExecutorData("2", 999999999, Map("executionMemory" -> 999999990, "storageMemory" -> 9))
+ )
+
describe(".apply") {
- val data = newFakeSparkApplicationData(executorData)
- val heuristicResult = memoryFractionHeuristic.apply(data)
- val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
+ val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
+ val data1 = newFakeSparkApplicationData(appConfigurationProperties1, executorData1)
+ val data2 = newFakeSparkApplicationData(appConfigurationProperties1, executorData2)
+ val heuristicResult = unifiedMemoryHeuristic.apply(data)
+ val heuristicResult1 = unifiedMemoryHeuristic.apply(data1)
+ val heuristicResult2 = unifiedMemoryHeuristic.apply(data2)
+ val evaluator = new Evaluator(unifiedMemoryHeuristic, data1)
it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}
+
+ it("has max value") {
+ val details = heuristicResult.getHeuristicResultDetails.get(2)
+ details.getName should be("Max peak unified memory")
+ details.getValue should be("385.32 KB")
+ }
+
+ it("has mean value") {
+ val details = heuristicResult.getHeuristicResultDetails.get(1)
+ details.getName should be("Mean peak unified memory")
+ details.getValue should be("263.07 KB")
+ }
+
+ it("data1 has severity") {
+ heuristicResult1.getSeverity should be(Severity.CRITICAL)
+ }
+
+ it("data1 has maxMemory") {
+ evaluator.maxMemory should be(999999999)
+ }
+
+ it("data1 has max memory") {
+ evaluator.maxUnifiedMemory should be(400)
+ }
+
+ it("data1 has mean memory") {
+ evaluator.meanUnifiedMemory should be(400)
+ }
+
+ it("has no severity when max and allocated memory are the same") {
+ heuristicResult2.getSeverity should be(Severity.NONE)
+ }
}
}
@@ -69,7 +122,10 @@ object UnifiedMemoryHeuristicTest {
peakUnifiedMemory
)
- def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
+ def newFakeSparkApplicationData(
+ appConfigurationProperties: Map[String, String],
+ executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData =
+ {
val appId = "application_1"
val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
@@ -78,7 +134,9 @@ object UnifiedMemoryHeuristicTest {
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)
-
- SparkApplicationData(appId, restDerivedData, logDerivedData = None)
+ val logDerivedData = SparkLogDerivedData(
+ SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
+ )
+ SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
}
}
diff --git a/test/com/linkedin/drelephant/util/MemoryFormatUtilsTest.java b/test/com/linkedin/drelephant/util/MemoryFormatUtilsTest.java
index 0ae064ebc..067f7427d 100644
--- a/test/com/linkedin/drelephant/util/MemoryFormatUtilsTest.java
+++ b/test/com/linkedin/drelephant/util/MemoryFormatUtilsTest.java
@@ -105,4 +105,16 @@ public void testStringToBytes() {
}
}
}
+
+ public void testRoundOffMemoryStringToNextInteger() {
+ assertEquals("157 MB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("156.1 MB"));
+ assertEquals("155 MB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("155.0 MB"));
+ assertEquals("0 MB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("0 MB"));
+ assertEquals("156 GB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("155.1 G"));
+ assertEquals("", MemoryFormatUtils.roundOffMemoryStringToNextInteger(null));
+ assertEquals("500 MB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("500M"));
+ assertEquals("600 GB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("600 gb"));
+ assertEquals("600 GB", MemoryFormatUtils.roundOffMemoryStringToNextInteger("600 g"));
+ assertEquals("", MemoryFormatUtils.roundOffMemoryStringToNextInteger(""));
+ }
}