Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gc time #311

Merged
merged 5 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,11 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark GC Time to Run Time</heuristicname>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer naming this Heuristic as simply Executor GC.

Also reflect this change elsewhere in the Heuristic class and Help page.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

<classname>com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic</classname>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the references to CPUTime to runtime, since we are using executor run/duration time now.

<viewname>views.html.help.spark.helpGcCpuTimeHeuristic</viewname>
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalGCTime: Long
def executorLogs: Map[String, String]}

trait JobData{
Expand Down Expand Up @@ -292,6 +293,7 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalGCTime: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary

class JobDataImpl(
Expand Down
119 changes: 119 additions & 0 deletions app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.Severity
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData

import scala.collection.JavaConverters

/**
* A heuristic based on GC time and CPU run time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase to something more intuitive. It should tell what this class/heuristic does. Refer the comment I added on the help page.

*/
class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import GcCpuTimeHeuristic._
import JavaConverters._

val gcSeverityAThresholds: SeverityThresholds =
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_A_THRESHOLDS_KEY), ascending = true)
.getOrElse(DEFAULT_GC_SEVERITY_A_THRESHOLDS)

val gcSeverityDThresholds: SeverityThresholds =
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_D_THRESHOLDS_KEY), ascending = true)
.getOrElse(DEFAULT_GC_SEVERITY_D_THRESHOLDS)

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gc total time to Total GC Time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executor Run time to Total Executor Runtimes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation
if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase: The job is spending too much time on GC. We recommend increasing the executor memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
//severityTimeD corresponds to the descending severity calculation
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this makes sense.

}

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object GcCpuTimeHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"

/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_D_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false)

val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold"
val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold"

class Evaluator(gcCpuTimeHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) {
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries)

var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble

lazy val severityTimeA: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio)
lazy val severityTimeD: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio)
lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD)

/**
* returns the total JVM GC Time and total executor Run Time across all stages
* @param executorSummaries
* @return
*/
private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = {
var jvmGcTimeTotal: Long = 0
var executorRunTimeTotal: Long = 0
executorSummaries.foreach(executorSummary => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure executorSummaries doesn't include driver info. If so, exclude the driver from the executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

jvmGcTimeTotal+=executorSummary.totalGCTime
executorRunTimeTotal+=executorSummary.totalDuration
})
(jvmGcTimeTotal, executorRunTimeTotal)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorInfo.totalGCTime,
executorLogs = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public static class ExecutorInfo {
public long inputBytes = 0L;
public long outputBytes = 0L;
public long shuffleRead = 0L;
public long totalGCTime = 0L;
public long shuffleWrite = 0L;

public String toString() {
return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ ", shuffleWrite: " + shuffleWrite + "}";
+ ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}";
}
}

Expand Down
12 changes: 12 additions & 0 deletions app/views/help/spark/helpGcCpuTimeHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<p>The ratio of jvmGcTime to executorRunTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase this. Help page should be simple and more intuitive for the users.

GC Time Heuristic flags jobs which are spending too much time on Garbage Collection.

If GC is taking too much time, then providing more memory could help. On the other hand, if GC is taking too little time, it could mean that the memory is over provisioned and it can be reduced.

Remove all the threshold values from the help page. I don't think it will help the users in ay way.

<p>The severity thresholds are as follows : </p>
<p>Low: avg (jvmGcTime / executorRunTime) >= .08</p>
<p>Moderate: avg (jvmGcTime / executorRunTime) >= .1</p>
<p>Critical: avg (jvmGcTime / executorRunTime) >= .15</p>
<p>Severe:avg (jvmGcTime / executorRunTime) >= .2</p>
<p>The severity thresholds in case it is taking too little time are as follows: </p>
<p>Low: avg (jvmGcTime / executorRunTime) < .05)</p>
<p>Moderate: avg (jvmGcTime / executorRunTime) < .04)</p>
<p>Critical: avg (jvmGcTime / executorRunTime) < .03)</p>
<p>Severe: avg (jvmGcTime / executorRunTime) < .01)</p>

Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
executorLogs = Map.empty
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file name seems to be ExecutorStorageSpillHeuristicTest.scala -- should this be renamed to ExecutorGcRuntimeHeuristicTest.scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you push the changes you have made? The PR doesn't reflect this change.

* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import scala.collection.JavaConverters
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}

import scala.concurrent.duration.Duration


class GcCpuTimeHeuristicTest extends FunSpec with Matchers {
import GcCpuTimeHeuristicTest._

describe("GcCpuTimeHeuristic") {
val heuristicConfigurationData = newFakeHeuristicConfigurationData(
Map(
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
"ignore_max_bytes_less_than_threshold" -> "4000000",
"ignore_max_millis_less_than_threshold" -> "4000001"
)
)
val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData)

val executorSummaries = Seq(
newFakeExecutorSummary(
id = "1",
totalGCTime = Duration("2min").toMillis,
totalDuration = Duration("15min").toMillis
),
newFakeExecutorSummary(
id = "2",
totalGCTime = Duration("6min").toMillis,
totalDuration = Duration("14min").toMillis
),
newFakeExecutorSummary(
id = "3",
totalGCTime = Duration("4min").toMillis,
totalDuration = Duration("20min").toMillis
),
newFakeExecutorSummary(
id = "4",
totalGCTime = Duration("8min").toMillis,
totalDuration = Duration("30min").toMillis
)
)

describe(".apply") {
val data1 = newFakeSparkApplicationData(executorSummaries)
val heuristicResult = gcCpuTimeHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

it("returns the JVM GC time to Executor Run time duration") {
val details = heuristicResultDetails.get(0)
details.getName should include("GC time to Executor Run time ratio")
details.getValue should include("0.2531")
}

it("returns the total GC time") {
val details = heuristicResultDetails.get(1)
details.getName should include("GC total time")
details.getValue should be("1200000")
}

it("returns the executor's run time") {
val details = heuristicResultDetails.get(2)
details.getName should include("Executor Run time")
details.getValue should be("4740000")
}
}
}
}

object GcCpuTimeHeuristicTest {
import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newFakeExecutorSummary(
id: String,
totalGCTime: Long,
totalDuration: Long
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed=0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration,
totalInputBytes=0,
totalShuffleRead=0,
totalShuffleWrite= 0,
maxMemory= 0,
totalGCTime,
executorLogs = Map.empty
)

def newFakeSparkApplicationData(
executorSummaries: Seq[ExecutorSummaryImpl]
): SparkApplicationData = {
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)
SparkApplicationData(appId, restDerivedData, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
totalGCTime = 0,
executorLogs = Map.empty
)

Expand Down