Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Heuristics for Dr. Elephant #324

Merged
merged 14 commits into from
Feb 5, 2018
Merged
42 changes: 40 additions & 2 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,53 +175,91 @@
<classname>com.linkedin.drelephant.spark.heuristics.ConfigurationHeuristic</classname>
<viewname>views.html.help.spark.helpConfigurationHeuristic</viewname>
</heuristic>
<heuristic>

<!--<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Executor Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorsHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorsHeuristic</viewname>
</heuristic>
</heuristic>-->

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Job Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JobsHeuristic</classname>
<viewname>views.html.help.spark.helpJobsHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Stage Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Peak Unified Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpUnifiedMemoryHeuristic</viewname>
<!--<params>
<peak_unified_memory_threshold>0.7,0.6,0.4,0.2</peak_unified_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>JVM Used Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpJvmUsedMemoryHeuristic</viewname>
<!--<params>
<executor_peak_jvm_memory_threshold>1.25,1.5,2,3</executor_peak_jvm_memory_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Stages with failed tasks</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesWithFailedTasksHeuristic</classname>
<viewname>views.html.help.spark.helpStagesWithFailedTasks</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorGcHeuristic</viewname>
<!--<params>
<gc_severity_A_threshold>0.08,0.09,0.1,0.15</gc_severity_A_threshold>
<gc_severity_D_threshold>0.05,0.04,0.03,0.01</gc_severity_D_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor spill</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorStorageSpillHeuristic</viewname>
<!--<params>
<spill_fraction_of_executors_threshold>0.2</spill_fraction_of_executors_threshold>
<spill_max_memory_threshold>0.05</spill_max_memory_threshold>
<spark_executor_cores_threshold>4</spark_executor_cores_threshold>
<spark_executor_memory_threshold>10GB</spark_executor_memory_threshold>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Driver</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.DriverHeuristic</classname>
<viewname>views.html.help.spark.helpDriverHeuristic</viewname>
<!--<params>
<driver_peak_jvm_memory_threshold>1.25,1.5,2,3</driver_peak_jvm_memory_threshold>
<gc_severity_threshold>0.08,0.09,0.1,0.15</gc_severity_threshold>
<spark_executor_memory_threshold_key>2G</spark_executor_memory_threshold_key>
</params>-->
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummaryImpl] = {
val target = attemptTarget.path("executors")
val target = attemptTarget.path("allexecutors")
try {
get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummaryImpl]])
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.linkedin.drelephant.math.Statistics
* A heuristic based on an app's known configuration.
*
* The results from this heuristic primarily inform users about key app configuration settings, including
* driver memory, driver cores, executor cores, executor instances, executor memory, and the serializer.
* executor cores, executor instances, executor memory, and the serializer.
*
* It also checks whether the values specified are within threshold.
*/
Expand All @@ -55,10 +55,6 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
property.getOrElse("Not presented. Using default.")

val resultDetails = Seq(
new HeuristicResultDetails(
SPARK_DRIVER_MEMORY_KEY,
formatProperty(evaluator.driverMemoryBytes.map(MemoryFormatUtils.bytesToString))
),
new HeuristicResultDetails(
SPARK_EXECUTOR_MEMORY_KEY,
formatProperty(evaluator.executorMemoryBytes.map(MemoryFormatUtils.bytesToString))
Expand All @@ -80,16 +76,16 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString))
),
new HeuristicResultDetails(
SPARK_DRIVER_CORES_KEY,
formatProperty(evaluator.driverCores.map(_.toString))
SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
evaluator.sparkYarnExecutorMemoryOverhead
),
new HeuristicResultDetails(
SPARK_YARN_DRIVER_MEMORY_OVERHEAD,
evaluator.sparkYarnDriverMemoryOverhead
SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS,
evaluator.dynamicMinExecutors.getOrElse(0).toString
),
new HeuristicResultDetails(
SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
evaluator.sparkYarnExecutorMemoryOverhead
SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS,
evaluator.dynamicMaxExecutors.getOrElse(0).toString
)
)
// Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
Expand All @@ -109,21 +105,24 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)),
"Spark shuffle service is not enabled.")
}
if (evaluator.severityMinExecutors == Severity.CRITICAL) {
if (evaluator.severityMinExecutors != Severity.NONE) {
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to "should be = 1". We don't want users to set min executors to 0 or negative values. Also, please use constant THRESHOLD_MIN_EXECUTORS instead of hard-coding the 1, in case of changes later.

}
if (evaluator.severityMaxExecutors == Severity.CRITICAL) {
if (evaluator.severityMaxExecutors != Severity.NONE) {
result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=900. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use constant THRESHOLD_MAX_EXECUTORS instead of hard-coding 900.

}
if (evaluator.jarsSeverity == Severity.CRITICAL) {
if (evaluator.jarsSeverity != Severity.NONE) {
result.addResultDetail("Jars notation", "It is recommended to not use * notation while specifying jars in the field " + SPARK_YARN_JARS)
}
if(evaluator.severityDriverMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
result.addResultDetail("Driver Overhead Memory", "Please do not specify excessive amount of overhead memory for Driver. Change it in the field " + SPARK_YARN_DRIVER_MEMORY_OVERHEAD)
}
if(evaluator.severityExecutorMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
if(evaluator.severityExecutorMemoryOverhead != Severity.NONE) {
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD)
}
if(evaluator.severityExecutorCores != Severity.NONE) {
result.addResultDetail("Executor cores", "The number of executor cores should be <=4. Please change it in the field " + SPARK_EXECUTOR_CORES_KEY)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the comment for executor cores also use a constant, or take the value from the configurations?

}
if(evaluator.severityExecutorMemory != Severity.NONE) {
result.addResultDetail("Executor memory", "Please do not specify excessive amount of executor memory. Change it in the field " + SPARK_EXECUTOR_MEMORY_KEY)
}
result
}
}
Expand All @@ -134,20 +133,17 @@ object ConfigurationHeuristic {

val SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY = "serializer_if_non_null_recommendation"

val SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores"
val SPARK_SERIALIZER_KEY = "spark.serializer"
val SPARK_APPLICATION_DURATION = "spark.application.duration"
val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
val SPARK_DRIVER_CORES_KEY = "spark.driver.cores"
val SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
val SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
val SPARK_YARN_JARS = "spark.yarn.secondary.jars"
val SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead"
val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
val THRESHOLD_MIN_EXECUTORS: Int = 1
val THRESHOLD_MAX_EXECUTORS: Int = 900
val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key"
Expand All @@ -159,9 +155,6 @@ object ConfigurationHeuristic {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val driverMemoryBytes: Option[Long] =
Try(getProperty(SPARK_DRIVER_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)

lazy val executorMemoryBytes: Option[Long] =
Try(getProperty(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(None)

Expand All @@ -171,9 +164,6 @@ object ConfigurationHeuristic {
lazy val executorCores: Option[Int] =
Try(getProperty(SPARK_EXECUTOR_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val driverCores: Option[Int] =
Try(getProperty(SPARK_DRIVER_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val dynamicMinExecutors: Option[Int] =
Try(getProperty(SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS).map(_.toInt)).getOrElse(None)

Expand All @@ -196,8 +186,6 @@ object ConfigurationHeuristic {

lazy val sparkYarnExecutorMemoryOverhead: String = if (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0"))
lazy val sparkYarnDriverMemoryOverhead: String = if (getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0")

lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY)

Expand All @@ -211,16 +199,14 @@ object ConfigurationHeuristic {
case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET
}

//The following thresholds are for checking if the memory and cores values (executor and driver) are above normal. These thresholds are experimental, and may change in the future.
//The following thresholds are for checking if the memory and cores values of executors are above normal. These thresholds are experimental, and may change in the future.
val DEFAULT_SPARK_MEMORY_THRESHOLDS =
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
val DEFAULT_SPARK_CORES_THRESHOLDS =
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)

val severityExecutorMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(executorMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityExecutorCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(executorCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityMinExecutors = if (dynamicMinExecutors.getOrElse(0).asInstanceOf[Number].intValue > THRESHOLD_MIN_EXECUTORS) {
Severity.CRITICAL
Expand All @@ -233,12 +219,10 @@ object ConfigurationHeuristic {
Severity.NONE
}
val severityExecutorMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead))
val severityDriverMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead))


//Severity for the configuration thresholds
val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityExecutorCores, severityExecutorMemory,
severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead, severityDriverMemoryOverhead)
val severityConfThresholds: Severity = Severity.max(severityExecutorCores, severityExecutorMemory,
severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead)

/**
* The following logic computes severity based on shuffle service and dynamic allocation flags.
Expand Down
Loading