From ebc2c2eb3719abb6f628f87d17b2ed0a66a374fc Mon Sep 17 00:00:00 2001 From: swasti Date: Thu, 7 Dec 2017 15:59:16 +0530 Subject: [PATCH] adding Gc time heuristic --- app-conf/HeuristicConf.xml | 6 ++ .../fetchers/statusapiv1/statusapiv1.scala | 4 +- .../ExecutorStorageSpillHeuristic.scala | 90 ++++++++++--------- .../legacydata/LegacyDataConverters.scala | 2 +- .../spark/legacydata/SparkExecutorData.java | 4 +- ...lpExecutorStorageSpillHeuristic.scala.html | 20 ----- .../spark/helpGcCpuTimeHeuristic.scala.html | 6 ++ .../spark/SparkMetricsAggregatorTest.scala | 2 +- .../ExecutorStorageSpillHeuristicTest.scala | 67 +++++++------- .../heuristics/ExecutorsHeuristicTest.scala | 2 +- 10 files changed, 99 insertions(+), 104 deletions(-) delete mode 100644 app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html create mode 100644 app/views/help/spark/helpGcCpuTimeHeuristic.scala.html diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index 833fa2bc3..2cd6c2b6c 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -193,5 +193,11 @@ com.linkedin.drelephant.spark.heuristics.StagesHeuristic views.html.help.spark.helpStagesHeuristic + + spark + Spark GC Time to CPU Time + com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic + views.html.help.spark.helpGcCpuTimeHeuristic + diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 75bfdc975..9c4b534a0 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,7 +87,7 @@ trait ExecutorSummary{ def totalShuffleRead: Long def totalShuffleWrite: Long def maxMemory: Long - def totalMemoryBytesSpilled: Long + def totalGCTime: Long def executorLogs: Map[String, String]} trait JobData{ @@ -293,7 +293,7 @@ class ExecutorSummaryImpl( var totalShuffleRead: Long, var totalShuffleWrite: Long, var maxMemory: Long, - var totalMemoryBytesSpilled: Long, + var totalGCTime: Long, var executorLogs: Map[String, String]) extends ExecutorSummary class JobDataImpl( diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala index affbd918a..97008a522 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -17,23 +17,20 @@ package com.linkedin.drelephant.spark.heuristics import com.linkedin.drelephant.analysis.Severity -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1._ import com.linkedin.drelephant.analysis._ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.SparkApplicationData -import com.linkedin.drelephant.util.MemoryFormatUtils - import scala.collection.JavaConverters - /** - * A heuristic based on memory spilled. + * A heuristic based on GC time and CPU run time * */ -class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) +class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) extends Heuristic[SparkApplicationData] { - import ExecutorStorageSpillHeuristic._ + import GcCpuTimeHeuristic._ import JavaConverters._ override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData @@ -41,17 +38,18 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur override def apply(data: SparkApplicationData): HeuristicResult = { val evaluator = new Evaluator(this, data) var resultDetails = Seq( - new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)), - new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)), - new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)), - new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString) + new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), + new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString), + new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString) ) - if(evaluator.severity != Severity.NONE){ - resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.") - if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) { - resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") - } + //adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation + if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory") + } + //severityTimeD corresponds to the descending severity calculation + if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory") } val result = new HeuristicResult( @@ -65,40 +63,46 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur } } -object ExecutorStorageSpillHeuristic { +object GcCpuTimeHeuristic { val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" val SPARK_EXECUTOR_CORES = "spark.executor.cores" - class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { + /** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_A_THRESHOLDS = + SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) + + /** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_D_THRESHOLDS = + SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) + + class Evaluator(memoryFractionHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties - val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores) - val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max - val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size - val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum - val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble - val severity: Severity = { - if (fractionOfExecutorsHavingBytesSpilled != 0) { - if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.LOW - } - else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.MODERATE - } - - else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.SEVERE - } - else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { - Severity.CRITICAL - } else Severity.NONE - } - else Severity.NONE - } + var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) + + var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble - lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) - lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) + lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio) + lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio) + lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD) + + /** + * returns the total JVM GC Time and total executor Run Time across all stages + * @param executorSummaries + * @return + */ + private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = { + var jvmGcTimeTotal: Long = 0 + var executorRunTimeTotal: Long = 0 + executorSummaries.foreach(executorSummary => { + jvmGcTimeTotal+=executorSummary.totalGCTime + executorRunTimeTotal+=executorSummary.totalDuration + }) + (jvmGcTimeTotal, executorRunTimeTotal) + } } } diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 719856f74..4abce291d 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -173,7 +173,7 @@ object LegacyDataConverters { executorInfo.shuffleRead, executorInfo.shuffleWrite, executorInfo.maxMem, - executorInfo.totalMemoryBytesSpilled, + executorInfo.totalGCTime, executorLogs = Map.empty ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index fd8377d73..4e2ad4de3 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -43,7 +43,7 @@ public static class ExecutorInfo { public long inputBytes = 0L; public long outputBytes = 0L; public long shuffleRead = 0L; - public long totalMemoryBytesSpilled = 0L; + public long totalGCTime = 0L; public long shuffleWrite = 0L; public String toString() { @@ -51,7 +51,7 @@ public String toString() { + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead - + ", shuffleWrite: " + shuffleWrite + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}"; + + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}"; } } diff --git a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html deleted file mode 100644 index 10f4ff9fe..000000000 --- a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html +++ /dev/null @@ -1,20 +0,0 @@ -@* -* Copyright 2016 LinkedIn Corp. -* -* Licensed under the Apache License, Version 2.0 (the "License"); you may not -* use this file except in compliance with the License. You may obtain a copy of -* the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations under -* the License. -*@ -

Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available at the task level, and each stage will have this information for its tasks. memoryBytesSpilled is cumulative for a task -- it is incremented whenever more execution memory is spilled. If execution memory is being spilled, then the warnings are as follows:

-

Low: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and max size is < .05 * spark.executor.memory / spark.executor.cores.

-

Moderate: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and avg size is < .05 * spark.executor.memory / spark.executor.cores.

-

Severe: memoryBytes Spilled is greater than zero for > 20% of executors and avg size is < .05 spark.executor memory / spark.executor.cores.

-

Critical: memoryBytes Spilled is greater than zero for > 20% of executors and/or avg size is >= .05 spark.executor memory / spark.executor.cores.

diff --git a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html new file mode 100644 index 000000000..a1969ebe2 --- /dev/null +++ b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html @@ -0,0 +1,6 @@ +

The ratio of jvmGcTime to executorCpuTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).

+

The severity thresholds are as follows :

+

Low: avg (jvmGcTime / executorCpuTime) >= .08

+

Moderate: avg (jvmGcTime / executorCpuTime) >= .1

+

Critical: avg (jvmGcTime / executorCpuTime) >= .15

+

Severe:avg (jvmGcTime / executorCpuTime) >= .2

diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 5befb6500..77e3e1d29 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -194,7 +194,7 @@ object SparkMetricsAggregatorTest { totalShuffleRead = 0, totalShuffleWrite = 0, maxMemory = 0, - totalMemoryBytesSpilled = 0, + totalGCTime = 0, executorLogs = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala index 82392e88e..31a2d1635 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -24,11 +24,13 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} +import scala.concurrent.duration.Duration -class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { - import ExecutorStorageSpillHeuristicTest._ - describe("ExecutorStorageSpillHeuristic") { +class GcCpuTimeHeuristicTest extends FunSpec with Matchers { + import GcCpuTimeHeuristicTest._ + + describe("GcCpuTimeHeuristic") { val heuristicConfigurationData = newFakeHeuristicConfigurationData( Map( "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", @@ -36,60 +38,62 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { "ignore_max_millis_less_than_threshold" -> "4000001" ) ) - val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData) - - val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4") + val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData) val executorSummaries = Seq( newFakeExecutorSummary( id = "1", - totalMemoryBytesSpilled = 200000L + totalGCTime = Duration("2min").toMillis, + totalDuration = Duration("15min").toMillis ), newFakeExecutorSummary( id = "2", - totalMemoryBytesSpilled = 100000L + totalGCTime = Duration("6min").toMillis, + totalDuration = Duration("14min").toMillis ), newFakeExecutorSummary( id = "3", - totalMemoryBytesSpilled = 300000L + totalGCTime = Duration("4min").toMillis, + totalDuration = Duration("20min").toMillis ), newFakeExecutorSummary( id = "4", - totalMemoryBytesSpilled = 200000L + totalGCTime = Duration("8min").toMillis, + totalDuration = Duration("30min").toMillis ) ) describe(".apply") { - val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties) - val heuristicResult = executorStorageSpillHeuristic.apply(data1) + val data1 = newFakeSparkApplicationData(executorSummaries) + val heuristicResult = gcCpuTimeHeuristic.apply(data1) val heuristicResultDetails = heuristicResult.getHeuristicResultDetails it("returns the severity") { - heuristicResult.getSeverity should be(Severity.SEVERE) + heuristicResult.getSeverity should be(Severity.CRITICAL) } - it("returns the total memory spilled") { + it("returns the JVM GC time to Executor Run time duration") { val details = heuristicResultDetails.get(0) - details.getName should include("Total memory spilled") - details.getValue should be("781.25 KB") + details.getName should include("GC time to Executor Run time ratio") + details.getValue should include("0.2531") } - it("returns the max memory spilled") { + it("returns the total GC time") { val details = heuristicResultDetails.get(1) - details.getName should include("Max memory spilled") - details.getValue should be("292.97 KB") + details.getName should include("GC total time") + details.getValue should be("1200000") } - it("returns the mean memory spilled") { + it("returns the executor's run time") { val details = heuristicResultDetails.get(2) - details.getName should include("Mean memory spilled") - details.getValue should be("195.31 KB") + details.getName should include("Executor Run time") + details.getValue should be("4740000") } } } } -object ExecutorStorageSpillHeuristicTest { +object GcCpuTimeHeuristicTest { import JavaConverters._ def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = @@ -97,7 +101,8 @@ object ExecutorStorageSpillHeuristicTest { def newFakeExecutorSummary( id: String, - totalMemoryBytesSpilled: Long + totalGCTime: Long, + totalDuration: Long ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", @@ -108,18 +113,17 @@ object ExecutorStorageSpillHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, - totalDuration=0, + totalDuration, totalInputBytes=0, totalShuffleRead=0, totalShuffleWrite= 0, maxMemory= 0, - totalMemoryBytesSpilled, + totalGCTime, executorLogs = Map.empty ) def newFakeSparkApplicationData( - executorSummaries: Seq[ExecutorSummaryImpl], - appConfigurationProperties: Map[String, String] + executorSummaries: Seq[ExecutorSummaryImpl] ): SparkApplicationData = { val appId = "application_1" @@ -129,11 +133,6 @@ object ExecutorStorageSpillHeuristicTest { stageDatas = Seq.empty, executorSummaries = executorSummaries ) - - val logDerivedData = SparkLogDerivedData( - SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq)) - ) - - SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + SparkApplicationData(appId, restDerivedData, None) } } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index cf07fe377..7dbeea921 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -249,7 +249,7 @@ object ExecutorsHeuristicTest { totalShuffleRead, totalShuffleWrite, maxMemory, - totalMemoryBytesSpilled = 0, + totalGCTime = 0, executorLogs = Map.empty )