Skip to content

Commit

Permalink
now dividing JvmGcTime by exectorRunTime instead of executorCpuTime
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Nov 15, 2017
1 parent 3a90264 commit 34ffbe2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ trait TaskData{
trait TaskMetrics{
def executorDeserializeTime: Long
def executorRunTime: Long
def executorCpuTime: Long
def resultSize: Long
def jvmGcTime: Long
def resultSerializationTime: Long
Expand Down Expand Up @@ -384,7 +383,6 @@ class TaskDataImpl(
class TaskMetricsImpl(
var executorDeserializeTime: Long,
var executorRunTime: Long,
var executorCpuTime: Long,
var resultSize: Long,
var jvmGcTime: Long,
var resultSerializationTime: Long,
Expand Down
21 changes: 10 additions & 11 deletions app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class StagesHeuristic(private val heuristicConfigurationData: HeuristicConfigura
"Spark stages with long average executor runtimes",
formatStagesWithLongAverageExecutorRuntimes(evaluator.stagesWithLongAverageExecutorRuntimes)
),
new HeuristicResultDetails("JVM GC time to Executor CPU time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("JVM GC time to Executor Run time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("Jvm GC total time", evaluator.jvmTime.toString),
new HeuristicResultDetails("Executor CPU time", evaluator.executorCpuTime.toString)
new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString)
)

//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation
Expand Down Expand Up @@ -130,12 +130,12 @@ object StagesHeuristic {
ascending = true
)

/** The ascending severity thresholds for the ratio of JVM GC Time and executor CPU Time (checking whether ratio is above normal)
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

/** The descending severity thresholds for the ratio of JVM GC Time and executor CPU Time (checking whether ratio is below normal)
/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_D_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false)
Expand Down Expand Up @@ -176,10 +176,10 @@ object StagesHeuristic {

lazy val severity: Severity = Severity.max((stageFailureRateSeverity +: severityTimeA +: severityTimeD +: (taskFailureRateSeverities ++ runtimeSeverities)): _*)

var (jvmTime, executorCpuTime) = getTimeValues(stageDatas)
var (jvmTime, executorRunTimeTotal) = getTimeValues(stageDatas)

var ratio: Double = {
ratio = jvmTime.toDouble / executorCpuTime.toDouble
ratio = jvmTime.toDouble / executorRunTimeTotal.toDouble
ratio
}
lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio)
Expand Down Expand Up @@ -236,17 +236,16 @@ object StagesHeuristic {
}

/**
* returns the total JVM GC Time and total executor CPU Time across all stages
* returns the total JVM GC Time and total executor Run Time across all stages
* @param stageDatas
* @return
*/
private def getTimeValues(stageDatas: Seq[StageData]): (Long, Long) = {
var jvmGcTimeTotal: Long = 0
var executorCpuTime: Long = 0
var executorRunTimeTotal: Long = 0
var taskMetricsDummy: TaskMetricsImpl = new Some(new TaskMetricsImpl(
executorDeserializeTime = 0,
executorRunTime = 0,
executorCpuTime = 0,
resultSize = 0,
jvmGcTime = 0,
resultSerializationTime = 0,
Expand All @@ -259,13 +258,13 @@ object StagesHeuristic {
//ignoring the exception as there are cases when there is no task data, in such cases, 0 is taken as the default value
Exception.ignoring(classOf[NoSuchElementException]) {
stageDatas.foreach((stageData: StageData) => {
executorRunTimeTotal += stageData.executorRunTime
stageData.tasks.get.values.foreach((taskData: TaskData) => {
jvmGcTimeTotal += taskData.taskMetrics.getOrElse(taskMetricsDummy).jvmGcTime
executorCpuTime += taskData.taskMetrics.getOrElse(taskMetricsDummy).executorCpuTime
})
})
}
(jvmGcTimeTotal, executorCpuTime)
(jvmGcTimeTotal, executorRunTimeTotal)
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/views/help/spark/helpStagesHeuristic.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<p>Stage/task failures can occur for a number of reasons, so it is
recommended to look at the YARN application error logs.</p>
<p>
It also checks the ratio of jvmGcTime to executorCpuTime, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced). <br/>
It also checks the ratio of jvmGcTime to executorRunTime, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced). <br/>
It is recommended to increase executor memory if too much time is being spent in GC <br/>
If unified memory has too much excess capacity, it is recommended to decrease spark.memory.fraction to increase the amount allocated to user memory, instead of increasing executor memory. <br/>
We recommend decreasing executor memory if too little time is being spent in GC. <br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ class StagesHeuristicTest extends FunSpec with Matchers {
)
val stagesHeuristic = new StagesHeuristic(heuristicConfigurationData)
val stageDatas = Seq(
newFakeStageData(StageStatus.COMPLETE, 0, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("2min").toMillis, "foo"),
newFakeStageData(StageStatus.COMPLETE, 1, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 8, numFailedTasks = 2, executorRunTime = Duration("2min").toMillis, "bar"),
newFakeStageData(StageStatus.COMPLETE, 2, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 6, numFailedTasks = 4, executorRunTime = Duration("2min").toMillis, "baz"),
newFakeStageData(StageStatus.FAILED, 3, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 4, numFailedTasks = 6, executorRunTime = Duration("2min").toMillis, "aaa"),
newFakeStageData(StageStatus.FAILED, 4, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 2, numFailedTasks = 8, executorRunTime = Duration("2min").toMillis, "zzz"),
newFakeStageData(StageStatus.COMPLETE, 5, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("0min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("0min").toMillis, "bbb"),
newFakeStageData(StageStatus.COMPLETE, 6, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("30min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("30min").toMillis, "ccc"),
newFakeStageData(StageStatus.COMPLETE, 7, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("60min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("60min").toMillis, "ddd"),
newFakeStageData(StageStatus.COMPLETE, 8, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("90min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("90min").toMillis, "eee"),
newFakeStageData(StageStatus.COMPLETE, 9, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("120min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("120min").toMillis, "fff")
newFakeStageData(StageStatus.COMPLETE, 0, jvmGcTime = Duration("2min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("2min").toMillis, "foo"),
newFakeStageData(StageStatus.COMPLETE, 1, jvmGcTime = Duration("2min").toMillis, numCompleteTasks = 8, numFailedTasks = 2, executorRunTime = Duration("2min").toMillis, "bar"),
newFakeStageData(StageStatus.COMPLETE, 2, jvmGcTime = Duration("2min").toMillis, numCompleteTasks = 6, numFailedTasks = 4, executorRunTime = Duration("2min").toMillis, "baz"),
newFakeStageData(StageStatus.FAILED, 3, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 4, numFailedTasks = 6, executorRunTime = Duration("2min").toMillis, "aaa"),
newFakeStageData(StageStatus.FAILED, 4, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 2, numFailedTasks = 8, executorRunTime = Duration("2min").toMillis, "zzz"),
newFakeStageData(StageStatus.COMPLETE, 5, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("0min").toMillis, "bbb"),
newFakeStageData(StageStatus.COMPLETE, 6, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("30min").toMillis, "ccc"),
newFakeStageData(StageStatus.COMPLETE, 7, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("60min").toMillis, "ddd"),
newFakeStageData(StageStatus.COMPLETE, 8, jvmGcTime = Duration("1min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("90min").toMillis, "eee"),
newFakeStageData(StageStatus.COMPLETE, 9, jvmGcTime = Duration("2min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("120min").toMillis, "fff")
)

val appConfigurationProperties = Map("spark.executor.instances" -> "2")
Expand Down Expand Up @@ -133,15 +133,15 @@ class StagesHeuristicTest extends FunSpec with Matchers {
evaluator.jvmTime should be(840000)
}

it("has the total executor cpu time") {
evaluator.executorCpuTime should be(18600000)
it("has the total executor Run time") {
evaluator.executorRunTimeTotal should be(18600000)
}

it("has ascending severity for ratio of JVM GC time to executor cpu time") {
it("has ascending severity for ratio of JVM GC time to executor run time") {
evaluator.severityTimeA should be(Severity.NONE)
}

it("has descending severity for ratio of JVM GC time to executor cpu time") {
it("has descending severity for ratio of JVM GC time to executor run time") {
evaluator.severityTimeD should be(Severity.LOW)
}
}
Expand All @@ -159,7 +159,6 @@ object StagesHeuristicTest {
status: StageStatus,
stageId: Int,
jvmGcTime: Long,
executorCpuTime: Long,
numCompleteTasks: Int,
numFailedTasks: Int,
executorRunTime: Long,
Expand Down Expand Up @@ -200,7 +199,6 @@ object StagesHeuristicTest {
taskMetrics = new Some(new TaskMetricsImpl(
executorDeserializeTime = 0,
executorRunTime = 0,
executorCpuTime,
resultSize = 0,
jvmGcTime,
resultSerializationTime = 0,
Expand Down

0 comments on commit 34ffbe2

Please sign in to comment.