Skip to content

Commit

Permalink
adding Gc time heuristic
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Dec 7, 2017
1 parent 204d5a7 commit ebc2c2e
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 104 deletions.
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,11 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark GC Time to CPU Time</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic</classname>
<viewname>views.html.help.spark.helpGcCpuTimeHeuristic</viewname>
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalMemoryBytesSpilled: Long
def totalGCTime: Long
def executorLogs: Map[String, String]}

trait JobData{
Expand Down Expand Up @@ -293,7 +293,7 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalMemoryBytesSpilled: Long,
var totalGCTime: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary

class JobDataImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,39 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.Severity
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.MemoryFormatUtils

import scala.collection.JavaConverters


/**
* A heuristic based on memory spilled.
* A heuristic based on GC time and CPU run time
*
*/
class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import ExecutorStorageSpillHeuristic._
import GcCpuTimeHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)),
new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)),
new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)),
new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString)
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString),
new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString)
)

if(evaluator.severity != Severity.NONE){
resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.")
if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) {
resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.")
}
//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation
if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory")
}
//severityTimeD corresponds to the descending severity calculation
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory")
}

val result = new HeuristicResult(
Expand All @@ -65,40 +63,46 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
}
}

object ExecutorStorageSpillHeuristic {
object GcCpuTimeHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"

class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_D_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false)

class Evaluator(memoryFractionHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) {
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores)
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size
val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
if (fractionOfExecutorsHavingBytesSpilled != 0) {
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.LOW
}
else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.MODERATE
}

else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.SEVERE
}
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) {
Severity.CRITICAL
} else Severity.NONE
}
else Severity.NONE
}
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries)

var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble

lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio)
lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio)
lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD)

/**
* returns the total JVM GC Time and total executor Run Time across all stages
* @param executorSummaries
* @return
*/
private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = {
var jvmGcTimeTotal: Long = 0
var executorRunTimeTotal: Long = 0
executorSummaries.foreach(executorSummary => {
jvmGcTimeTotal+=executorSummary.totalGCTime
executorRunTimeTotal+=executorSummary.totalDuration
})
(jvmGcTimeTotal, executorRunTimeTotal)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorInfo.totalMemoryBytesSpilled,
executorInfo.totalGCTime,
executorLogs = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ public static class ExecutorInfo {
public long inputBytes = 0L;
public long outputBytes = 0L;
public long shuffleRead = 0L;
public long totalMemoryBytesSpilled = 0L;
public long totalGCTime = 0L;
public long shuffleWrite = 0L;

public String toString() {
return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ ", shuffleWrite: " + shuffleWrite + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}";
+ ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}";
}
}

Expand Down
20 changes: 0 additions & 20 deletions app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html

This file was deleted.

6 changes: 6 additions & 0 deletions app/views/help/spark/helpGcCpuTimeHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<p>The ratio of jvmGcTime to executorCpuTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).</p>
<p>The severity thresholds are as follows : </p>
<p>Low: avg (jvmGcTime / executorCpuTime) >= .08</p>
<p>Moderate: avg (jvmGcTime / executorCpuTime) >= .1</p>
<p>Critical: avg (jvmGcTime / executorCpuTime) >= .15</p>
<p>Severe:avg (jvmGcTime / executorCpuTime) >= .2</p>
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalMemoryBytesSpilled = 0,
totalGCTime = 0,
executorLogs = Map.empty
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,80 +24,85 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl,
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}

import scala.concurrent.duration.Duration

class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
import ExecutorStorageSpillHeuristicTest._

describe("ExecutorStorageSpillHeuristic") {
class GcCpuTimeHeuristicTest extends FunSpec with Matchers {
import GcCpuTimeHeuristicTest._

describe("GcCpuTimeHeuristic") {
val heuristicConfigurationData = newFakeHeuristicConfigurationData(
Map(
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
"ignore_max_bytes_less_than_threshold" -> "4000000",
"ignore_max_millis_less_than_threshold" -> "4000001"
)
)
val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4")
val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData)

val executorSummaries = Seq(
newFakeExecutorSummary(
id = "1",
totalMemoryBytesSpilled = 200000L
totalGCTime = Duration("2min").toMillis,
totalDuration = Duration("15min").toMillis
),
newFakeExecutorSummary(
id = "2",
totalMemoryBytesSpilled = 100000L
totalGCTime = Duration("6min").toMillis,
totalDuration = Duration("14min").toMillis
),
newFakeExecutorSummary(
id = "3",
totalMemoryBytesSpilled = 300000L
totalGCTime = Duration("4min").toMillis,
totalDuration = Duration("20min").toMillis
),
newFakeExecutorSummary(
id = "4",
totalMemoryBytesSpilled = 200000L
totalGCTime = Duration("8min").toMillis,
totalDuration = Duration("30min").toMillis
)
)

describe(".apply") {
val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties)
val heuristicResult = executorStorageSpillHeuristic.apply(data1)
val data1 = newFakeSparkApplicationData(executorSummaries)
val heuristicResult = gcCpuTimeHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.SEVERE)
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

it("returns the total memory spilled") {
it("returns the JVM GC time to Executor Run time duration") {
val details = heuristicResultDetails.get(0)
details.getName should include("Total memory spilled")
details.getValue should be("781.25 KB")
details.getName should include("GC time to Executor Run time ratio")
details.getValue should include("0.2531")
}

it("returns the max memory spilled") {
it("returns the total GC time") {
val details = heuristicResultDetails.get(1)
details.getName should include("Max memory spilled")
details.getValue should be("292.97 KB")
details.getName should include("GC total time")
details.getValue should be("1200000")
}

it("returns the mean memory spilled") {
it("returns the executor's run time") {
val details = heuristicResultDetails.get(2)
details.getName should include("Mean memory spilled")
details.getValue should be("195.31 KB")
details.getName should include("Executor Run time")
details.getValue should be("4740000")
}
}
}
}

object ExecutorStorageSpillHeuristicTest {
object GcCpuTimeHeuristicTest {
import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newFakeExecutorSummary(
id: String,
totalMemoryBytesSpilled: Long
totalGCTime: Long,
totalDuration: Long
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
Expand All @@ -108,18 +113,17 @@ object ExecutorStorageSpillHeuristicTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration=0,
totalDuration,
totalInputBytes=0,
totalShuffleRead=0,
totalShuffleWrite= 0,
maxMemory= 0,
totalMemoryBytesSpilled,
totalGCTime,
executorLogs = Map.empty
)

def newFakeSparkApplicationData(
executorSummaries: Seq[ExecutorSummaryImpl],
appConfigurationProperties: Map[String, String]
executorSummaries: Seq[ExecutorSummaryImpl]
): SparkApplicationData = {
val appId = "application_1"

Expand All @@ -129,11 +133,6 @@ object ExecutorStorageSpillHeuristicTest {
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)

val logDerivedData = SparkLogDerivedData(
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
SparkApplicationData(appId, restDerivedData, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
totalMemoryBytesSpilled = 0,
totalGCTime = 0,
executorLogs = Map.empty
)

Expand Down

0 comments on commit ebc2c2e

Please sign in to comment.