-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gc time #311
Gc time #311
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import com.linkedin.drelephant.analysis.Severity | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1._ | ||
import com.linkedin.drelephant.analysis._ | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.SparkApplicationData | ||
import scala.collection.JavaConverters | ||
|
||
/** | ||
* A heuristic based on GC time and CPU run time | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra line. |
||
*/ | ||
class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) | ||
extends Heuristic[SparkApplicationData] { | ||
|
||
import GcCpuTimeHeuristic._ | ||
import JavaConverters._ | ||
|
||
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData | ||
|
||
override def apply(data: SparkApplicationData): HeuristicResult = { | ||
val evaluator = new Evaluator(this, data) | ||
var resultDetails = Seq( | ||
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), | ||
new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString), | ||
new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString) | ||
) | ||
|
||
//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation | ||
if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { | ||
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory") | ||
} | ||
//severityTimeD corresponds to the descending severity calculation | ||
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { | ||
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory") | ||
} | ||
|
||
val result = new HeuristicResult( | ||
heuristicConfigurationData.getClassName, | ||
heuristicConfigurationData.getHeuristicName, | ||
evaluator.severity, | ||
0, | ||
resultDetails.asJava | ||
) | ||
result | ||
} | ||
} | ||
|
||
object GcCpuTimeHeuristic { | ||
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | ||
val SPARK_EXECUTOR_CORES = "spark.executor.cores" | ||
|
||
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) | ||
* These thresholds are experimental and are likely to change */ | ||
val DEFAULT_GC_SEVERITY_A_THRESHOLDS = | ||
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make these threshold values configurable just like the following: |
||
|
||
/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) | ||
* These thresholds are experimental and are likely to change */ | ||
val DEFAULT_GC_SEVERITY_D_THRESHOLDS = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) | ||
|
||
class Evaluator(memoryFractionHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { | ||
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries | ||
lazy val appConfigurationProperties: Map[String, String] = | ||
data.appConfigurationProperties | ||
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) | ||
|
||
var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble | ||
|
||
lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio) | ||
lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio) | ||
lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD) | ||
|
||
/** | ||
* returns the total JVM GC Time and total executor Run Time across all stages | ||
* @param executorSummaries | ||
* @return | ||
*/ | ||
private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = { | ||
var jvmGcTimeTotal: Long = 0 | ||
var executorRunTimeTotal: Long = 0 | ||
executorSummaries.foreach(executorSummary => { | ||
jvmGcTimeTotal+=executorSummary.totalGCTime | ||
executorRunTimeTotal+=executorSummary.totalDuration | ||
}) | ||
(jvmGcTimeTotal, executorRunTimeTotal) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
<p>The ratio of jvmGcTime to executorCpuTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).</p> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this mention the descending thresholds as well? |
||
<p>The severity thresholds are as follows : </p> | ||
<p>Low: avg (jvmGcTime / executorCpuTime) >= .08</p> | ||
<p>Moderate: avg (jvmGcTime / executorCpuTime) >= .1</p> | ||
<p>Critical: avg (jvmGcTime / executorCpuTime) >= .15</p> | ||
<p>Severe:avg (jvmGcTime / executorCpuTime) >= .2</p> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file name seems to be ExecutorStorageSpillHeuristicTest.scala -- should this be renamed to ExecutorGcRuntimeHeuristicTest.scala? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you push the changes you have made? The PR doesn't reflect this change. |
||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import scala.collection.JavaConverters | ||
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} | ||
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate | ||
import org.scalatest.{FunSpec, Matchers} | ||
|
||
import scala.concurrent.duration.Duration | ||
|
||
|
||
class GcCpuTimeHeuristicTest extends FunSpec with Matchers { | ||
import GcCpuTimeHeuristicTest._ | ||
|
||
describe("GcCpuTimeHeuristic") { | ||
val heuristicConfigurationData = newFakeHeuristicConfigurationData( | ||
Map( | ||
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", | ||
"ignore_max_bytes_less_than_threshold" -> "4000000", | ||
"ignore_max_millis_less_than_threshold" -> "4000001" | ||
) | ||
) | ||
val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData) | ||
|
||
val executorSummaries = Seq( | ||
newFakeExecutorSummary( | ||
id = "1", | ||
totalGCTime = Duration("2min").toMillis, | ||
totalDuration = Duration("15min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "2", | ||
totalGCTime = Duration("6min").toMillis, | ||
totalDuration = Duration("14min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "3", | ||
totalGCTime = Duration("4min").toMillis, | ||
totalDuration = Duration("20min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "4", | ||
totalGCTime = Duration("8min").toMillis, | ||
totalDuration = Duration("30min").toMillis | ||
) | ||
) | ||
|
||
describe(".apply") { | ||
val data1 = newFakeSparkApplicationData(executorSummaries) | ||
val heuristicResult = gcCpuTimeHeuristic.apply(data1) | ||
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails | ||
|
||
it("returns the severity") { | ||
heuristicResult.getSeverity should be(Severity.CRITICAL) | ||
} | ||
|
||
it("returns the JVM GC time to Executor Run time duration") { | ||
val details = heuristicResultDetails.get(0) | ||
details.getName should include("GC time to Executor Run time ratio") | ||
details.getValue should include("0.2531") | ||
} | ||
|
||
it("returns the total GC time") { | ||
val details = heuristicResultDetails.get(1) | ||
details.getName should include("GC total time") | ||
details.getValue should be("1200000") | ||
} | ||
|
||
it("returns the executor's run time") { | ||
val details = heuristicResultDetails.get(2) | ||
details.getName should include("Executor Run time") | ||
details.getValue should be("4740000") | ||
} | ||
} | ||
} | ||
} | ||
|
||
object GcCpuTimeHeuristicTest { | ||
import JavaConverters._ | ||
|
||
def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = | ||
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) | ||
|
||
def newFakeExecutorSummary( | ||
id: String, | ||
totalGCTime: Long, | ||
totalDuration: Long | ||
): ExecutorSummaryImpl = new ExecutorSummaryImpl( | ||
id, | ||
hostPort = "", | ||
rddBlocks = 0, | ||
memoryUsed=0, | ||
diskUsed = 0, | ||
activeTasks = 0, | ||
failedTasks = 0, | ||
completedTasks = 0, | ||
totalTasks = 0, | ||
totalDuration, | ||
totalInputBytes=0, | ||
totalShuffleRead=0, | ||
totalShuffleWrite= 0, | ||
maxMemory= 0, | ||
totalGCTime, | ||
executorLogs = Map.empty | ||
) | ||
|
||
def newFakeSparkApplicationData( | ||
executorSummaries: Seq[ExecutorSummaryImpl] | ||
): SparkApplicationData = { | ||
val appId = "application_1" | ||
|
||
val restDerivedData = SparkRestDerivedData( | ||
new ApplicationInfoImpl(appId, name = "app", Seq.empty), | ||
jobDatas = Seq.empty, | ||
stageDatas = Seq.empty, | ||
executorSummaries = executorSummaries | ||
) | ||
SparkApplicationData(appId, restDerivedData, None) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the references to CPUTime to runtime, since we are using executor run/duration time now.