Skip to content

Commit

Permalink
adding JVM GC Time and Executor CPU time heuristic
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Aug 23, 2017
1 parent 752a94b commit fb80795
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ trait TaskData{
trait TaskMetrics{
def executorDeserializeTime: Long
def executorRunTime: Long
def executorCpuTime: Long
def resultSize: Long
def jvmGcTime: Long
def resultSerializationTime: Long
Expand Down Expand Up @@ -384,6 +385,7 @@ class TaskDataImpl(
class TaskMetricsImpl(
var executorDeserializeTime: Long,
var executorRunTime: Long,
var executorCpuTime: Long,
var resultSize: Long,
var jvmGcTime: Long,
var resultSerializationTime: Long,
Expand Down
94 changes: 80 additions & 14 deletions app/com/linkedin/drelephant/spark/heuristics/StagesHeuristic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import java.util.NoSuchElementException
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorSummary, StageData, TaskData, TaskMetricsImpl}
import scala.collection.JavaConverters
import scala.concurrent.duration
import scala.concurrent.duration.Duration

import com.linkedin.drelephant.analysis.{Heuristic, HeuristicResult, HeuristicResultDetails, Severity, SeverityThresholds}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageData
import org.apache.spark.status.api.v1.StageStatus
import scala.util.control.Exception


/**
Expand All @@ -36,7 +36,8 @@ import org.apache.spark.status.api.v1.StageStatus
* each stage.
*/
class StagesHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {
extends Heuristic[SparkApplicationData] {

import StagesHeuristic._
import JavaConverters._

Expand Down Expand Up @@ -67,14 +68,14 @@ class StagesHeuristic(private val heuristicConfigurationData: HeuristicConfigura
f"stage ${stageData.stageId}, attempt ${stageData.attemptId} (task failure rate: ${taskFailureRate}%1.3f)"

def formatStagesWithLongAverageExecutorRuntimes(stagesWithLongAverageExecutorRuntimes: Seq[(StageData, Long)]): String =
stagesWithLongAverageExecutorRuntimes
.map { case (stageData, runtime) => formatStageWithLongRuntime(stageData, runtime) }
.mkString("\n")
stagesWithLongAverageExecutorRuntimes
.map { case (stageData, runtime) => formatStageWithLongRuntime(stageData, runtime) }
.mkString("\n")

def formatStageWithLongRuntime(stageData: StageData, runtime: Long): String =
f"stage ${stageData.stageId}, attempt ${stageData.attemptId} (runtime: ${Statistics.readableTimespan(runtime)})"

val resultDetails = Seq(
var resultDetails = Seq(
new HeuristicResultDetails("Spark completed stages count", evaluator.numCompletedStages.toString),
new HeuristicResultDetails("Spark failed stages count", evaluator.numFailedStages.toString),
new HeuristicResultDetails("Spark stage failure rate", f"${evaluator.stageFailureRate.getOrElse(0.0D)}%.3f"),
Expand All @@ -85,8 +86,20 @@ class StagesHeuristic(private val heuristicConfigurationData: HeuristicConfigura
new HeuristicResultDetails(
"Spark stages with long average executor runtimes",
formatStagesWithLongAverageExecutorRuntimes(evaluator.stagesWithLongAverageExecutorRuntimes)
)
),
new HeuristicResultDetails("JVM GC time to Executor CPU time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("Jvm GC total time", evaluator.jvmTime.toString),
new HeuristicResultDetails("Executor CPU time", evaluator.executorCpuTime.toString)
)

//adding recommendations to the result
if (evaluator.severityTimeA.getValue != Severity.NONE.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory")
}
if (evaluator.severityTimeD.getValue != Severity.NONE.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory")
}

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
Expand Down Expand Up @@ -116,6 +129,15 @@ object StagesHeuristic {
critical = Duration("60min").toMillis,
ascending = true
)

/** The severity thresholds for the ratio of JVM GC Time and executor CPU Time */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

/** The severity thresholds for the ratio of JVM GC Time and executor CPU Time */
val DEFAULT_GC_SEVERITY_D_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false)


val STAGE_FAILURE_RATE_SEVERITY_THRESHOLDS_KEY = "stage_failure_rate_severity_thresholds"
val TASK_FAILURE_RATE_SEVERITY_THRESHOLDS_KEY = "stage_task_failure_rate_severity_thresholds"
Expand All @@ -131,9 +153,13 @@ object StagesHeuristic {

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries

lazy val numCompletedStages: Int = stageDatas.count { _.status == StageStatus.COMPLETE }
lazy val numCompletedStages: Int = stageDatas.count {
_.status == StageStatus.COMPLETE
}

lazy val numFailedStages: Int = stageDatas.count { _.status == StageStatus.FAILED }
lazy val numFailedStages: Int = stageDatas.count {
_.status == StageStatus.FAILED
}

lazy val stageFailureRate: Option[Double] = {
val numStages = numCompletedStages + numFailedStages
Expand All @@ -147,7 +173,17 @@ object StagesHeuristic {
stagesAndAverageExecutorRuntimeSeverities
.collect { case (stageData, runtime, severity) if severity.getValue > Severity.MODERATE.getValue => (stageData, runtime) }

lazy val severity: Severity = Severity.max((stageFailureRateSeverity +: (taskFailureRateSeverities ++ runtimeSeverities)): _*)
lazy val severity: Severity = Severity.max((stageFailureRateSeverity +: severityTimeA +: severityTimeD +: (taskFailureRateSeverities ++ runtimeSeverities)): _*)

var (jvmTime, executorCpuTime) = getTimeValues(stageDatas)

var ratio: Double = {
ratio = jvmTime.toDouble / executorCpuTime.toDouble
ratio
}
lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio)
lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio)


private lazy val stageFailureRateSeverityThresholds = stagesHeuristic.stageFailureRateSeverityThresholds

Expand Down Expand Up @@ -179,6 +215,7 @@ object StagesHeuristic {
private lazy val executorInstances: Int =
appConfigurationProperties.get(SPARK_EXECUTOR_INSTANCES_KEY).map(_.toInt).getOrElse(executorSummaries.size)


private def taskFailureRateAndSeverityOf(stageData: StageData): (Double, Severity) = {
val taskFailureRate = taskFailureRateOf(stageData).getOrElse(0.0D)
(taskFailureRate, taskFailureRateSeverityThresholds.severityOf(taskFailureRate))
Expand All @@ -196,15 +233,44 @@ object StagesHeuristic {
val averageExecutorRuntime = stageData.executorRunTime / executorInstances
(averageExecutorRuntime, stageRuntimeMillisSeverityThresholds.severityOf(averageExecutorRuntime))
}

private def getTimeValues(stageDatas: Seq[StageData]): (Long, Long) = {
var jvmGcTimeTotal: Long = 0
var executorCpuTime: Long = 0
var taskMetricsDummy: TaskMetricsImpl = new Some(new TaskMetricsImpl(
executorDeserializeTime = 0,
executorRunTime = 0,
executorCpuTime = 0,
resultSize = 0,
jvmGcTime = 0,
resultSerializationTime = 0,
memoryBytesSpilled = 0,
diskBytesSpilled = 0,
inputMetrics = None,
outputMetrics = None,
shuffleReadMetrics = None,
shuffleWriteMetrics = None)).get
//ignoring the exception as there are cases when there is no task data, in such cases, 0 is taken as the default value
Exception.ignoring(classOf[NoSuchElementException]) {
stageDatas.foreach((stageData: StageData) => {
stageData.tasks.get.values.foreach((taskData: TaskData) => {
jvmGcTimeTotal += taskData.taskMetrics.getOrElse(taskMetricsDummy).jvmGcTime
executorCpuTime += taskData.taskMetrics.getOrElse(taskMetricsDummy).executorCpuTime
})
})
}
(jvmGcTimeTotal, executorCpuTime)
}
}

def minutesSeverityThresholdsToMillisSeverityThresholds(
minutesSeverityThresholds: SeverityThresholds
): SeverityThresholds = SeverityThresholds(
minutesSeverityThresholds: SeverityThresholds
): SeverityThresholds = SeverityThresholds(
Duration(minutesSeverityThresholds.low.longValue, duration.MINUTES).toMillis,
Duration(minutesSeverityThresholds.moderate.longValue, duration.MINUTES).toMillis,
Duration(minutesSeverityThresholds.severe.longValue, duration.MINUTES).toMillis,
Duration(minutesSeverityThresholds.critical.longValue, duration.MINUTES).toMillis,
minutesSeverityThresholds.ascending
)
}

7 changes: 7 additions & 0 deletions app/views/help/spark/helpStagesHeuristic.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@

<p>Stage/task failures can occur for a number of reasons, so it is
recommended to look at the YARN application error logs.</p>
<p>
It also checks the ratio of jvmGcTime to executorCpuTime, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced). <br/>
It is recommended to increase executor memory if too much time is being spent in GC <br/>
If unified memory has too much excess capacity, it is recommended to decrease spark.memory.fraction to increase the amount allocated to user memory, instead of increasing executor memory. <br/>
We recommend decreasing executor memory if too little time is being spent in GC. <br/>
However, if there is execution or storage spill, then it is recommended to increase spark.memory.fraction, instead of decreasing executor memory.
</p>
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package com.linkedin.drelephant.spark.heuristics

import java.util.Date

import scala.collection.JavaConverters
import scala.concurrent.duration.Duration

import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl, TaskMetricsImpl, TaskDataImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.spark.status.api.v1.StageStatus
import org.scalatest.{FunSpec, Matchers}


class StagesHeuristicTest extends FunSpec with Matchers {

import StagesHeuristicTest._

describe("StagesHeuristic") {
Expand All @@ -41,16 +43,16 @@ class StagesHeuristicTest extends FunSpec with Matchers {
)
val stagesHeuristic = new StagesHeuristic(heuristicConfigurationData)
val stageDatas = Seq(
newFakeStageData(StageStatus.COMPLETE, 0, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("2min").toMillis, "foo"),
newFakeStageData(StageStatus.COMPLETE, 1, numCompleteTasks = 8, numFailedTasks = 2, executorRunTime = Duration("2min").toMillis, "bar"),
newFakeStageData(StageStatus.COMPLETE, 2, numCompleteTasks = 6, numFailedTasks = 4, executorRunTime = Duration("2min").toMillis, "baz"),
newFakeStageData(StageStatus.FAILED, 3, numCompleteTasks = 4, numFailedTasks = 6, executorRunTime = Duration("2min").toMillis, "aaa"),
newFakeStageData(StageStatus.FAILED, 4, numCompleteTasks = 2, numFailedTasks = 8, executorRunTime = Duration("2min").toMillis, "zzz"),
newFakeStageData(StageStatus.COMPLETE, 5, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("0min").toMillis, "bbb"),
newFakeStageData(StageStatus.COMPLETE, 6, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("30min").toMillis, "ccc"),
newFakeStageData(StageStatus.COMPLETE, 7, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("60min").toMillis, "ddd"),
newFakeStageData(StageStatus.COMPLETE, 8, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("90min").toMillis, "eee"),
newFakeStageData(StageStatus.COMPLETE, 9, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("120min").toMillis, "fff")
newFakeStageData(StageStatus.COMPLETE, 0, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("2min").toMillis, "foo"),
newFakeStageData(StageStatus.COMPLETE, 1, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 8, numFailedTasks = 2, executorRunTime = Duration("2min").toMillis, "bar"),
newFakeStageData(StageStatus.COMPLETE, 2, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 6, numFailedTasks = 4, executorRunTime = Duration("2min").toMillis, "baz"),
newFakeStageData(StageStatus.FAILED, 3, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 4, numFailedTasks = 6, executorRunTime = Duration("2min").toMillis, "aaa"),
newFakeStageData(StageStatus.FAILED, 4, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("2min").toMillis, numCompleteTasks = 2, numFailedTasks = 8, executorRunTime = Duration("2min").toMillis, "zzz"),
newFakeStageData(StageStatus.COMPLETE, 5, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("0min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("0min").toMillis, "bbb"),
newFakeStageData(StageStatus.COMPLETE, 6, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("30min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("30min").toMillis, "ccc"),
newFakeStageData(StageStatus.COMPLETE, 7, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("60min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("60min").toMillis, "ddd"),
newFakeStageData(StageStatus.COMPLETE, 8, jvmGcTime = Duration("1min").toMillis, executorCpuTime = Duration("90min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("90min").toMillis, "eee"),
newFakeStageData(StageStatus.COMPLETE, 9, jvmGcTime = Duration("2min").toMillis, executorCpuTime = Duration("120min").toMillis, numCompleteTasks = 10, numFailedTasks = 0, executorRunTime = Duration("120min").toMillis, "fff")
)

val appConfigurationProperties = Map("spark.executor.instances" -> "2")
Expand Down Expand Up @@ -112,38 +114,55 @@ class StagesHeuristicTest extends FunSpec with Matchers {
it("has the list of stages with high task failure rates") {
val stageIdsAndTaskFailureRates =
evaluator.stagesWithHighTaskFailureRates.map { case (stageData, taskFailureRate) => (stageData.stageId, taskFailureRate) }
stageIdsAndTaskFailureRates should contain theSameElementsInOrderAs(Seq((3, 0.6D), (4, 0.8D)))
stageIdsAndTaskFailureRates should contain theSameElementsInOrderAs (Seq((3, 0.6D), (4, 0.8D)))
}

it("has the list of stages with long average executor runtimes") {
val stageIdsAndRuntimes =
evaluator.stagesWithLongAverageExecutorRuntimes.map { case (stageData, runtime) => (stageData.stageId, runtime) }
stageIdsAndRuntimes should contain theSameElementsInOrderAs(
stageIdsAndRuntimes should contain theSameElementsInOrderAs (
Seq((8, Duration("45min").toMillis), (9, Duration("60min").toMillis))
)
)
}

it("has the ratio") {
evaluator.ratio should be(0.04516129032258064)
}

it("computes the overall severity") {
evaluator.severity should be(Severity.CRITICAL)
it("has the total Jvm Gc Time") {
evaluator.jvmTime should be(840000)
}

it("has the total executor cpu time") {
evaluator.executorCpuTime should be(18600000)
}
it("has ascending severity for ratio of JVM GC time to executor cpu time") {
evaluator.severityTimeA should be(Severity.NONE)
}
it("has descending severity for ratio of JVM GC time to executor cpu time") {
evaluator.severityTimeD should be(Severity.LOW)
}
}
}
}

object StagesHeuristicTest {

import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newFakeStageData(
status: StageStatus,
stageId: Int,
numCompleteTasks: Int,
numFailedTasks: Int,
executorRunTime: Long,
name: String
): StageDataImpl = new StageDataImpl(
status: StageStatus,
stageId: Int,
jvmGcTime: Long,
executorCpuTime: Long,
numCompleteTasks: Int,
numFailedTasks: Int,
executorRunTime: Long,
name: String
): StageDataImpl = new StageDataImpl(
status,
stageId,
attemptId = 0,
Expand All @@ -165,14 +184,38 @@ object StagesHeuristicTest {
details = "",
schedulingPool = "",
accumulatorUpdates = Seq.empty,
tasks = None,
tasks = new Some(Map(0.toLong -> new TaskDataImpl(
taskId = 0,
index = 1,
attempt = 0,
launchTime = new Date(),
executorId = "1",
host = "SomeHost",
taskLocality = "ANY",
speculative = false,
accumulatorUpdates = Seq(),
errorMessage = None,
taskMetrics = new Some(new TaskMetricsImpl(
executorDeserializeTime = 0,
executorRunTime = 0,
executorCpuTime,
resultSize = 0,
jvmGcTime,
resultSerializationTime = 0,
memoryBytesSpilled = 0,
diskBytesSpilled = 0,
inputMetrics = None,
outputMetrics = None,
shuffleReadMetrics = None,
shuffleWriteMetrics = None))
))),
executorSummary = None
)

def newFakeSparkApplicationData(
stageDatas: Seq[StageDataImpl],
appConfigurationProperties: Map[String, String]
): SparkApplicationData = {
stageDatas: Seq[StageDataImpl],
appConfigurationProperties: Map[String, String]
): SparkApplicationData = {
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
Expand Down

0 comments on commit fb80795

Please sign in to comment.