-
Notifications
You must be signed in to change notification settings - Fork 858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Heuristics for Dr. Elephant #324
Conversation
app-conf/HeuristicConf.xml
Outdated
<!--<params> | ||
<driver_peak_jvm_memory_threshold>1.25,1.5,2,3</driver_peak_jvm_memory_threshold> | ||
<gc_severity_threshold>0.08,0.09,0.1,0.15</gc_severity_threshold> | ||
<peak_unified_memory_threshold>0.7,0.6,0.4,0.2</peak_unified_memory_threshold> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need to check unified memory for the driver -- it looks like the driver can have non-zero storage memory, but there isn't a separate spark.memory.fraction or spark.memory.storageFraction for the driver. There are usually more executors, so it makes sense to tune for the executors.
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary | ||
|
||
/** | ||
* A heuristic based the driver's configurations and memory used. It checks whether the configuration values specified are within the threshold range. It also analyses the peak JVM memory used, unified memory and time spent in GC by the job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this comment be broken into multiple-lines, to make it easier to read? Is there is max char limit for Dr. Elephan code? Same for the following code.
|
||
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData | ||
|
||
lazy val peakUnifiedMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(PEAK_UNIFIED_MEMORY_THRESHOLD_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted earlier, let's skip the check for driver unified memory.
if (peakUnifiedMemoryExecutorSeverity.getValue > severityPeakUnifiedMemoryVariable.getValue) { | ||
severityPeakUnifiedMemoryVariable = peakUnifiedMemoryExecutorSeverity | ||
|
||
lazy val severity: Severity = if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes("2G")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use the constant declared for JVM used memory.
DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS | ||
} else { | ||
SeverityThresholds.parse(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxExecutorPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS) | ||
} | ||
|
||
val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString == null) { | ||
DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS | ||
lazy val severity = if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes("2G")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please declare "2G" as a constant, and add some comments.
import scala.collection.JavaConverters | ||
import scala.concurrent.duration.Duration | ||
|
||
class DriverHeuristicTest extends FunSpec with Matchers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments.
<p>This is a heuristic for checking whether driver is well tuned and the configurations are set to a good value.</p> | ||
<p>It checks the following properties</p> | ||
<h4>Driver Max Peak JVM Used Memory</h4> | ||
<p>This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change "This is to analyse" to "This analyses"
@@ -192,7 +192,12 @@ object StagesHeuristic { | |||
} | |||
|
|||
private def averageExecutorRuntimeAndSeverityOf(stageData: StageData): (Long, Severity) = { | |||
val averageExecutorRuntime = stageData.executorRunTime / executorInstances | |||
val allTasks : Int = if((stageData.numActiveTasks + stageData.numCompleteTasks + stageData.numFailedTasks) == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would be max((stageData.numActiveTasks + stageData.numCompleteTasks + stageData.numFailedTasks), 1)
} | ||
|
||
//peakJvmMemory calculations | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra newline.
} | ||
|
||
//Gc Calculations | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra newline.
* A heuristic based the driver's configurations and memory used. It checks whether the configuration values specified are within the threshold range. It also analyses the peak JVM memory used, unified memory and time spent in GC by the job. | ||
* | ||
*/ | ||
class DriverHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This heuristic contains multiple checks. Is it possible to add some information about which checks are problematical? Another option could be to split into separate heuristics.
@@ -105,18 +105,24 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo | |||
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)), | |||
"Spark shuffle service is not enabled.") | |||
} | |||
if (evaluator.severityMinExecutors == Severity.CRITICAL) { | |||
if (evaluator.severityMinExecutors != Severity.NONE) { | |||
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change to "should be = 1". We don't want users to set min executors to 0 or negative values. Also, please use constant THRESHOLD_MIN_EXECUTORS instead of hard-coding the 1, in case of changes later.
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.") | ||
} | ||
if (evaluator.severityMaxExecutors == Severity.CRITICAL) { | ||
if (evaluator.severityMaxExecutors != Severity.NONE) { | ||
result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=900. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use constant THRESHOLD_MAX_EXECUTORS instead of hard-coding 900.
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD) | ||
} | ||
if(evaluator.severityExecutorCores != Severity.NONE) { | ||
result.addResultDetail("Executor cores", "The number of executor cores should be <=4. Please change it in the field " + SPARK_EXECUTOR_CORES_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the comment for executor cores also use a constant, or take the value from the configurations?
<p>It checks the following properties</p> | ||
<h4>Driver Max Peak JVM Used Memory</h4> | ||
<p>This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.</p> | ||
<h4>Driver Max Unified Memory</h4> | ||
<p>If the driver's Peak Unified Memory Consumption is much smaller than the allocated Unified Memory space, then we recommend decreasing the allocated Unified Memory Region. You can try decreasing <i>spark.driver.memory</i> which would decrease the unified memory space.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is "blocked driver memory"? Could this just be "driver memory"?
@@ -18,3 +18,4 @@ <h4>Executor Max Peak JVM Used Memory</h4> | |||
<p>This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the blocked executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.</p> | |||
<h4>Driver Max Peak JVM Used Memory</h4> | |||
<p>This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, there is a reference to "blocked driver memory". Could you please explain blocked memory, or change to "driver memory"?
@@ -18,3 +18,4 @@ <h4>Executor Max Peak JVM Used Memory</h4> | |||
<p>This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the blocked executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has a reference to "blocked executor memory." Could you please explain blocked memory, or change to "executor memory"?
… to suggested memory
The comments have been addressed. |
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
Separated out Driver checks into a separate Driver Metrics heuristic: Checks driver configurations, driver GC time and JVM used memory.
This PR adds new Spark Heuristics in Dr Elephant. There are several heuristics like:
Executor JVM Memory Heuristic: Checks whether the allocated is much more than the peak JVM used memory
Executor Peak Unified Memory: Checks whether the memory allocated for Unified Memory region is much more than the memory used by it.
Executor Spill: Checks whether Executor memory is spilled and gives suggestions.
Executor GC: checks how much time is spent in GC as compared to the total running time of the job.
Driver: checks driver configurations, driver GC time and JVM used memory.
Stages With Failed Tasks: checks for the stages having failed tasks and checks for the error messages of the tasks failed.