Skip to content

Commit

Permalink
Changing GC thresholds and calculation in spill heuristic (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Mar 30, 2018
1 parent 5f7fc68 commit b2e8a14
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ trait ExecutorSummary{
def failedTasks: Int
def completedTasks: Int
def totalTasks: Int
def maxTasks: Int
def totalDuration: Long
def totalInputBytes: Long
def totalShuffleRead: Long
Expand Down Expand Up @@ -291,6 +292,7 @@ class ExecutorSummaryImpl(
var failedTasks: Int,
var completedTasks: Int,
var totalTasks: Int,
var maxTasks: Int,
var totalDuration: Long,
var totalInputBytes: Long,
var totalShuffleRead: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ExecutorGcHeuristic {
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)
SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true)

/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal)
* These thresholds are experimental and are likely to change */
Expand All @@ -90,7 +90,15 @@ object ExecutorGcHeuristic {

class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) {
lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries
if (executorAndDriverSummaries == null) {
throw new Exception("Executors Summary is null.")
}

lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver"))
if (executorSummaries.isEmpty) {
throw new Exception("No executor information available.")
}

lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,30 +96,29 @@ object ExecutorStorageSpillHeuristic {
val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD : String ="10GB"

class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver"))
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
val maxTasks: Int = executorSummaries.head.maxTasks
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size
val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val totalMemorySpilledPerTask = totalMemorySpilled/(executorSummaries.map(_.totalTasks).sum)
lazy val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
if (fractionOfExecutorsHavingBytesSpilled != 0) {
if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& maxMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) {
&& totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) {
Severity.LOW
}
else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) {
} else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) {
Severity.MODERATE
}

else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) {
} else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& totalMemorySpilledPerTask < executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) {
Severity.SEVERE
}
else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& meanMemorySpilled >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) {
} else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold
&& totalMemorySpilledPerTask >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * (sparkExecutorMemory/maxTasks)) {
Severity.CRITICAL
} else Severity.NONE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ object LegacyDataConverters {
executorInfo.failedTasks,
executorInfo.completedTasks,
executorInfo.totalTasks,
executorInfo.maxTasks,
executorInfo.duration,
executorInfo.inputBytes,
executorInfo.shuffleRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static class ExecutorInfo {
public int completedTasks = 0;
public int failedTasks = 0;
public int totalTasks = 0;
public int maxTasks = 0;
public long duration = 0L;
public long inputBytes = 0L;
public long outputBytes = 0L;
Expand All @@ -49,7 +50,7 @@ public static class ExecutorInfo {

public String toString() {
return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", maxTasks" + maxTasks + ", tasksActive: "
+ activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ object SparkMetricsAggregatorTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
maxTasks = 0,
totalDuration,
totalInputBytes = 0,
totalShuffleRead = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ExecutorGcHeuristicTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
maxTasks = 0,
totalDuration,
totalInputBytes=0,
totalShuffleRead=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ object ExecutorStorageSpillHeuristicTest {
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalTasks = 10,
maxTasks = 10,
totalDuration=0,
totalInputBytes=0,
totalShuffleRead=0,
totalShuffleWrite= 0,
maxMemory= 0,
maxMemory= 2000,
totalGCTime = 0,
totalMemoryBytesSpilled,
executorLogs = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ object ExecutorsHeuristicTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
maxTasks = 0,
totalDuration,
totalInputBytes,
totalShuffleRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object JvmUsedMemoryHeuristicTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
maxTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object UnifiedMemoryHeuristicTest {
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
maxTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
Expand Down

0 comments on commit b2e8a14

Please sign in to comment.