From 939fb3eed7d6252dd228c8fa3c52be422e7a04c0 Mon Sep 17 00:00:00 2001 From: swasti Date: Fri, 1 Dec 2017 12:37:21 +0530 Subject: [PATCH 1/5] adding spill heuristic --- .../ExecutorStorageSpillHeuristic.scala | 111 ++++++++++++++++++ ...lpExecutorStorageSpillHeuristic.scala.html | 20 ++++ 2 files changed, 131 insertions(+) create mode 100644 app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala create mode 100644 app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala new file mode 100644 index 000000000..da55bed6b --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.Severity +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, StageData} +import com.linkedin.drelephant.analysis._ +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.SparkApplicationData +import com.linkedin.drelephant.util.MemoryFormatUtils + +import scala.collection.JavaConverters + + +/** + * A heuristic based on memory spilled. + * + */ +class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) + extends Heuristic[SparkApplicationData] { + + import ExecutorStorageSpillHeuristic._ + import JavaConverters._ + + override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData + + override def apply(data: SparkApplicationData): HeuristicResult = { + val evaluator = new Evaluator(this, data) + var resultDetails = Seq( + new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)) + ) + + if(evaluator.severity != Severity.NONE){ + resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.") + if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) { + resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") + } + } + + val result = new HeuristicResult( + heuristicConfigurationData.getClassName, + heuristicConfigurationData.getHeuristicName, + evaluator.severity, + 0, + resultDetails.asJava + ) + result + } +} + +object ExecutorStorageSpillHeuristic { + val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" + val SPARK_EXECUTOR_CORES = "spark.executor.cores" + + class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { + lazy val appConfigurationProperties: Map[String, String] = + data.appConfigurationProperties + lazy val stageDatas: Seq[StageData] = data.stageDatas + val ratioMemoryCores: Double = sparkExecutorMemory.toDouble / sparkExecutorCores.toDouble + lazy val (severity, totalMemorySpilled : Long) = getExecutionSpillSeverity() + def getExecutionSpillSeverity(): (Severity, Long) = { + var bytesSpilled : Long = 0 + var executionSpillSeverity = Severity.NONE + stageDatas.foreach(stageData => { + val executorStageList: collection.Map[String, ExecutorStageSummary] = stageData.executorSummary.getOrElse(Map.empty) + val maxMemorySpilled: Long = executorStageList.values.map(_.memoryBytesSpilled).max + val meanMemorySpilled = executorStageList.values.map(_.memoryBytesSpilled).sum / executorStageList.values.size + val ratioMemoryBytesSpilled: Double = executorStageList.values.count(_.memoryBytesSpilled > 0).toDouble / executorStageList.values.size.toDouble + bytesSpilled += executorStageList.values.count(_.memoryBytesSpilled > 0).toLong + val severityExecutionSpillStage: Severity = { + if (ratioMemoryBytesSpilled != 0) { + if (ratioMemoryBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.LOW + } + else if (ratioMemoryBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.MODERATE + } + + else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.SEVERE + } + else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { + Severity.CRITICAL + } + } + Severity.NONE + } + executionSpillSeverity = Severity.max(executionSpillSeverity, severityExecutionSpillStage) + }) + (executionSpillSeverity, bytesSpilled) + } + + val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L) + val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) + } +} + diff --git a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html new file mode 100644 index 000000000..10f4ff9fe --- /dev/null +++ b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html @@ -0,0 +1,20 @@ +@* +* Copyright 2016 LinkedIn Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*@ +

Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available at the task level, and each stage will have this information for its tasks. memoryBytesSpilled is cumulative for a task -- it is incremented whenever more execution memory is spilled. If execution memory is being spilled, then the warnings are as follows:

+

Low: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and max size is < .05 * spark.executor.memory / spark.executor.cores.

+

Moderate: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and avg size is < .05 * spark.executor.memory / spark.executor.cores.

+

Severe: memoryBytes Spilled is greater than zero for > 20% of executors and avg size is < .05 spark.executor memory / spark.executor.cores.

+

Critical: memoryBytes Spilled is greater than zero for > 20% of executors and/or avg size is >= .05 spark.executor memory / spark.executor.cores.

From 050e31178c01e107ce16ca6718bb269843b0fe6b Mon Sep 17 00:00:00 2001 From: swasti Date: Thu, 7 Dec 2017 14:21:16 +0530 Subject: [PATCH 2/5] changed logic to fetch spilled bytes information from executors --- .../fetchers/statusapiv1/statusapiv1.scala | 2 + .../ExecutorStorageSpillHeuristic.scala | 63 ++++---- .../legacydata/LegacyDataConverters.scala | 1 + .../spark/legacydata/SparkExecutorData.java | 3 +- .../spark/SparkMetricsAggregatorTest.scala | 1 + .../ExecutorStorageSpillHeuristicTest.scala | 139 ++++++++++++++++++ .../heuristics/ExecutorsHeuristicTest.scala | 1 + 7 files changed, 174 insertions(+), 36 deletions(-) create mode 100644 test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 1b013c0f3..75bfdc975 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,6 +87,7 @@ trait ExecutorSummary{ def totalShuffleRead: Long def totalShuffleWrite: Long def maxMemory: Long + def totalMemoryBytesSpilled: Long def executorLogs: Map[String, String]} trait JobData{ @@ -292,6 +293,7 @@ class ExecutorSummaryImpl( var totalShuffleRead: Long, var totalShuffleWrite: Long, var maxMemory: Long, + var totalMemoryBytesSpilled: Long, var executorLogs: Map[String, String]) extends ExecutorSummary class JobDataImpl( diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala index da55bed6b..affbd918a 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -17,7 +17,7 @@ package com.linkedin.drelephant.spark.heuristics import com.linkedin.drelephant.analysis.Severity -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData} import com.linkedin.drelephant.analysis._ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.SparkApplicationData @@ -41,7 +41,10 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur override def apply(data: SparkApplicationData): HeuristicResult = { val evaluator = new Evaluator(this, data) var resultDetails = Seq( - new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)) + new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)), + new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)), + new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)), + new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString) ) if(evaluator.severity != Severity.NONE){ @@ -67,45 +70,35 @@ object ExecutorStorageSpillHeuristic { val SPARK_EXECUTOR_CORES = "spark.executor.cores" class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { + lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties - lazy val stageDatas: Seq[StageData] = data.stageDatas - val ratioMemoryCores: Double = sparkExecutorMemory.toDouble / sparkExecutorCores.toDouble - lazy val (severity, totalMemorySpilled : Long) = getExecutionSpillSeverity() - def getExecutionSpillSeverity(): (Severity, Long) = { - var bytesSpilled : Long = 0 - var executionSpillSeverity = Severity.NONE - stageDatas.foreach(stageData => { - val executorStageList: collection.Map[String, ExecutorStageSummary] = stageData.executorSummary.getOrElse(Map.empty) - val maxMemorySpilled: Long = executorStageList.values.map(_.memoryBytesSpilled).max - val meanMemorySpilled = executorStageList.values.map(_.memoryBytesSpilled).sum / executorStageList.values.size - val ratioMemoryBytesSpilled: Double = executorStageList.values.count(_.memoryBytesSpilled > 0).toDouble / executorStageList.values.size.toDouble - bytesSpilled += executorStageList.values.count(_.memoryBytesSpilled > 0).toLong - val severityExecutionSpillStage: Severity = { - if (ratioMemoryBytesSpilled != 0) { - if (ratioMemoryBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.LOW - } - else if (ratioMemoryBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.MODERATE - } + val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores) + val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max + val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size + val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum + val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble + val severity: Severity = { + if (fractionOfExecutorsHavingBytesSpilled != 0) { + if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.LOW + } + else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.MODERATE + } - else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.SEVERE - } - else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { - Severity.CRITICAL - } - } - Severity.NONE + else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { + Severity.SEVERE } - executionSpillSeverity = Severity.max(executionSpillSeverity, severityExecutionSpillStage) - }) - (executionSpillSeverity, bytesSpilled) + else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { + Severity.CRITICAL + } else Severity.NONE + } + else Severity.NONE } - val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L) - val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) + lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) + lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) } } diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 0c7412fe0..719856f74 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -173,6 +173,7 @@ object LegacyDataConverters { executorInfo.shuffleRead, executorInfo.shuffleWrite, executorInfo.maxMem, + executorInfo.totalMemoryBytesSpilled, executorLogs = Map.empty ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index 7b0fcb5c2..fd8377d73 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -43,6 +43,7 @@ public static class ExecutorInfo { public long inputBytes = 0L; public long outputBytes = 0L; public long shuffleRead = 0L; + public long totalMemoryBytesSpilled = 0L; public long shuffleWrite = 0L; public String toString() { @@ -50,7 +51,7 @@ public String toString() { + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead - + ", shuffleWrite: " + shuffleWrite + "}"; + + ", shuffleWrite: " + shuffleWrite + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}"; } } diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 3947fdf3f..5befb6500 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -194,6 +194,7 @@ object SparkMetricsAggregatorTest { totalShuffleRead = 0, totalShuffleWrite = 0, maxMemory = 0, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala new file mode 100644 index 000000000..82392e88e --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -0,0 +1,139 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import scala.collection.JavaConverters +import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.scalatest.{FunSpec, Matchers} + + +class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { + import ExecutorStorageSpillHeuristicTest._ + + describe("ExecutorStorageSpillHeuristic") { + val heuristicConfigurationData = newFakeHeuristicConfigurationData( + Map( + "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", + "ignore_max_bytes_less_than_threshold" -> "4000000", + "ignore_max_millis_less_than_threshold" -> "4000001" + ) + ) + val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData) + + val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4") + + val executorSummaries = Seq( + newFakeExecutorSummary( + id = "1", + totalMemoryBytesSpilled = 200000L + ), + newFakeExecutorSummary( + id = "2", + totalMemoryBytesSpilled = 100000L + ), + newFakeExecutorSummary( + id = "3", + totalMemoryBytesSpilled = 300000L + ), + newFakeExecutorSummary( + id = "4", + totalMemoryBytesSpilled = 200000L + ) + ) + + describe(".apply") { + val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties) + val heuristicResult = executorStorageSpillHeuristic.apply(data1) + val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + + it("returns the severity") { + heuristicResult.getSeverity should be(Severity.SEVERE) + } + + it("returns the total memory spilled") { + val details = heuristicResultDetails.get(0) + details.getName should include("Total memory spilled") + details.getValue should be("781.25 KB") + } + + it("returns the max memory spilled") { + val details = heuristicResultDetails.get(1) + details.getName should include("Max memory spilled") + details.getValue should be("292.97 KB") + } + + it("returns the mean memory spilled") { + val details = heuristicResultDetails.get(2) + details.getName should include("Mean memory spilled") + details.getValue should be("195.31 KB") + } + } + } +} + +object ExecutorStorageSpillHeuristicTest { + import JavaConverters._ + + def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = + new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) + + def newFakeExecutorSummary( + id: String, + totalMemoryBytesSpilled: Long + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( + id, + hostPort = "", + rddBlocks = 0, + memoryUsed=0, + diskUsed = 0, + activeTasks = 0, + failedTasks = 0, + completedTasks = 0, + totalTasks = 0, + totalDuration=0, + totalInputBytes=0, + totalShuffleRead=0, + totalShuffleWrite= 0, + maxMemory= 0, + totalMemoryBytesSpilled, + executorLogs = Map.empty + ) + + def newFakeSparkApplicationData( + executorSummaries: Seq[ExecutorSummaryImpl], + appConfigurationProperties: Map[String, String] + ): SparkApplicationData = { + val appId = "application_1" + + val restDerivedData = SparkRestDerivedData( + new ApplicationInfoImpl(appId, name = "app", Seq.empty), + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries + ) + + val logDerivedData = SparkLogDerivedData( + SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq)) + ) + + SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + } +} diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index dfdcf4a15..cf07fe377 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -249,6 +249,7 @@ object ExecutorsHeuristicTest { totalShuffleRead, totalShuffleWrite, maxMemory, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty ) From d725653d2bf27b100243d3b64bb41b8fc3ed8cd8 Mon Sep 17 00:00:00 2001 From: swasti Date: Thu, 7 Dec 2017 15:59:16 +0530 Subject: [PATCH 3/5] adding Gc time heuristic --- app-conf/HeuristicConf.xml | 6 ++ .../fetchers/statusapiv1/statusapiv1.scala | 4 +- .../ExecutorStorageSpillHeuristic.scala | 90 ++++++++++--------- .../legacydata/LegacyDataConverters.scala | 2 +- .../spark/legacydata/SparkExecutorData.java | 4 +- ...lpExecutorStorageSpillHeuristic.scala.html | 20 ----- .../spark/helpGcCpuTimeHeuristic.scala.html | 6 ++ .../spark/SparkMetricsAggregatorTest.scala | 2 +- .../ExecutorStorageSpillHeuristicTest.scala | 67 +++++++------- .../heuristics/ExecutorsHeuristicTest.scala | 2 +- 10 files changed, 99 insertions(+), 104 deletions(-) delete mode 100644 app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html create mode 100644 app/views/help/spark/helpGcCpuTimeHeuristic.scala.html diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index 833fa2bc3..2cd6c2b6c 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -193,5 +193,11 @@ com.linkedin.drelephant.spark.heuristics.StagesHeuristic views.html.help.spark.helpStagesHeuristic + + spark + Spark GC Time to CPU Time + com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic + views.html.help.spark.helpGcCpuTimeHeuristic + diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 75bfdc975..9c4b534a0 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,7 +87,7 @@ trait ExecutorSummary{ def totalShuffleRead: Long def totalShuffleWrite: Long def maxMemory: Long - def totalMemoryBytesSpilled: Long + def totalGCTime: Long def executorLogs: Map[String, String]} trait JobData{ @@ -293,7 +293,7 @@ class ExecutorSummaryImpl( var totalShuffleRead: Long, var totalShuffleWrite: Long, var maxMemory: Long, - var totalMemoryBytesSpilled: Long, + var totalGCTime: Long, var executorLogs: Map[String, String]) extends ExecutorSummary class JobDataImpl( diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala index affbd918a..97008a522 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -17,23 +17,20 @@ package com.linkedin.drelephant.spark.heuristics import com.linkedin.drelephant.analysis.Severity -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1._ import com.linkedin.drelephant.analysis._ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.SparkApplicationData -import com.linkedin.drelephant.util.MemoryFormatUtils - import scala.collection.JavaConverters - /** - * A heuristic based on memory spilled. + * A heuristic based on GC time and CPU run time * */ -class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) +class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) extends Heuristic[SparkApplicationData] { - import ExecutorStorageSpillHeuristic._ + import GcCpuTimeHeuristic._ import JavaConverters._ override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData @@ -41,17 +38,18 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur override def apply(data: SparkApplicationData): HeuristicResult = { val evaluator = new Evaluator(this, data) var resultDetails = Seq( - new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)), - new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)), - new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)), - new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString) + new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), + new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString), + new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString) ) - if(evaluator.severity != Severity.NONE){ - resultDetails :+ new HeuristicResultDetails("Note", "Your memory is being spilled. Kindly look into it.") - if(evaluator.sparkExecutorCores >=4 && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes("10GB")) { - resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") - } + //adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation + if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory") + } + //severityTimeD corresponds to the descending severity calculation + if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory") } val result = new HeuristicResult( @@ -65,40 +63,46 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur } } -object ExecutorStorageSpillHeuristic { +object GcCpuTimeHeuristic { val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" val SPARK_EXECUTOR_CORES = "spark.executor.cores" - class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { + /** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_A_THRESHOLDS = + SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) + + /** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_D_THRESHOLDS = + SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) + + class Evaluator(memoryFractionHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties - val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores) - val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max - val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size - val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum - val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble - val severity: Severity = { - if (fractionOfExecutorsHavingBytesSpilled != 0) { - if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.LOW - } - else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.MODERATE - } - - else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) { - Severity.SEVERE - } - else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) { - Severity.CRITICAL - } else Severity.NONE - } - else Severity.NONE - } + var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) + + var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble - lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) - lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) + lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio) + lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio) + lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD) + + /** + * returns the total JVM GC Time and total executor Run Time across all stages + * @param executorSummaries + * @return + */ + private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = { + var jvmGcTimeTotal: Long = 0 + var executorRunTimeTotal: Long = 0 + executorSummaries.foreach(executorSummary => { + jvmGcTimeTotal+=executorSummary.totalGCTime + executorRunTimeTotal+=executorSummary.totalDuration + }) + (jvmGcTimeTotal, executorRunTimeTotal) + } } } diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 719856f74..4abce291d 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -173,7 +173,7 @@ object LegacyDataConverters { executorInfo.shuffleRead, executorInfo.shuffleWrite, executorInfo.maxMem, - executorInfo.totalMemoryBytesSpilled, + executorInfo.totalGCTime, executorLogs = Map.empty ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index fd8377d73..4e2ad4de3 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -43,7 +43,7 @@ public static class ExecutorInfo { public long inputBytes = 0L; public long outputBytes = 0L; public long shuffleRead = 0L; - public long totalMemoryBytesSpilled = 0L; + public long totalGCTime = 0L; public long shuffleWrite = 0L; public String toString() { @@ -51,7 +51,7 @@ public String toString() { + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead - + ", shuffleWrite: " + shuffleWrite + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}"; + + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}"; } } diff --git a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html deleted file mode 100644 index 10f4ff9fe..000000000 --- a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html +++ /dev/null @@ -1,20 +0,0 @@ -@* -* Copyright 2016 LinkedIn Corp. -* -* Licensed under the Apache License, Version 2.0 (the "License"); you may not -* use this file except in compliance with the License. You may obtain a copy of -* the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations under -* the License. -*@ -

Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available at the task level, and each stage will have this information for its tasks. memoryBytesSpilled is cumulative for a task -- it is incremented whenever more execution memory is spilled. If execution memory is being spilled, then the warnings are as follows:

-

Low: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and max size is < .05 * spark.executor.memory / spark.executor.cores.

-

Moderate: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and avg size is < .05 * spark.executor.memory / spark.executor.cores.

-

Severe: memoryBytes Spilled is greater than zero for > 20% of executors and avg size is < .05 spark.executor memory / spark.executor.cores.

-

Critical: memoryBytes Spilled is greater than zero for > 20% of executors and/or avg size is >= .05 spark.executor memory / spark.executor.cores.

diff --git a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html new file mode 100644 index 000000000..a1969ebe2 --- /dev/null +++ b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html @@ -0,0 +1,6 @@ +

The ratio of jvmGcTime to executorCpuTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).

+

The severity thresholds are as follows :

+

Low: avg (jvmGcTime / executorCpuTime) >= .08

+

Moderate: avg (jvmGcTime / executorCpuTime) >= .1

+

Critical: avg (jvmGcTime / executorCpuTime) >= .15

+

Severe:avg (jvmGcTime / executorCpuTime) >= .2

diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 5befb6500..77e3e1d29 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -194,7 +194,7 @@ object SparkMetricsAggregatorTest { totalShuffleRead = 0, totalShuffleWrite = 0, maxMemory = 0, - totalMemoryBytesSpilled = 0, + totalGCTime = 0, executorLogs = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala index 82392e88e..31a2d1635 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -24,11 +24,13 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} +import scala.concurrent.duration.Duration -class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { - import ExecutorStorageSpillHeuristicTest._ - describe("ExecutorStorageSpillHeuristic") { +class GcCpuTimeHeuristicTest extends FunSpec with Matchers { + import GcCpuTimeHeuristicTest._ + + describe("GcCpuTimeHeuristic") { val heuristicConfigurationData = newFakeHeuristicConfigurationData( Map( "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", @@ -36,60 +38,62 @@ class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { "ignore_max_millis_less_than_threshold" -> "4000001" ) ) - val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData) - - val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4") + val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData) val executorSummaries = Seq( newFakeExecutorSummary( id = "1", - totalMemoryBytesSpilled = 200000L + totalGCTime = Duration("2min").toMillis, + totalDuration = Duration("15min").toMillis ), newFakeExecutorSummary( id = "2", - totalMemoryBytesSpilled = 100000L + totalGCTime = Duration("6min").toMillis, + totalDuration = Duration("14min").toMillis ), newFakeExecutorSummary( id = "3", - totalMemoryBytesSpilled = 300000L + totalGCTime = Duration("4min").toMillis, + totalDuration = Duration("20min").toMillis ), newFakeExecutorSummary( id = "4", - totalMemoryBytesSpilled = 200000L + totalGCTime = Duration("8min").toMillis, + totalDuration = Duration("30min").toMillis ) ) describe(".apply") { - val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties) - val heuristicResult = executorStorageSpillHeuristic.apply(data1) + val data1 = newFakeSparkApplicationData(executorSummaries) + val heuristicResult = gcCpuTimeHeuristic.apply(data1) val heuristicResultDetails = heuristicResult.getHeuristicResultDetails it("returns the severity") { - heuristicResult.getSeverity should be(Severity.SEVERE) + heuristicResult.getSeverity should be(Severity.CRITICAL) } - it("returns the total memory spilled") { + it("returns the JVM GC time to Executor Run time duration") { val details = heuristicResultDetails.get(0) - details.getName should include("Total memory spilled") - details.getValue should be("781.25 KB") + details.getName should include("GC time to Executor Run time ratio") + details.getValue should include("0.2531") } - it("returns the max memory spilled") { + it("returns the total GC time") { val details = heuristicResultDetails.get(1) - details.getName should include("Max memory spilled") - details.getValue should be("292.97 KB") + details.getName should include("GC total time") + details.getValue should be("1200000") } - it("returns the mean memory spilled") { + it("returns the executor's run time") { val details = heuristicResultDetails.get(2) - details.getName should include("Mean memory spilled") - details.getValue should be("195.31 KB") + details.getName should include("Executor Run time") + details.getValue should be("4740000") } } } } -object ExecutorStorageSpillHeuristicTest { +object GcCpuTimeHeuristicTest { import JavaConverters._ def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = @@ -97,7 +101,8 @@ object ExecutorStorageSpillHeuristicTest { def newFakeExecutorSummary( id: String, - totalMemoryBytesSpilled: Long + totalGCTime: Long, + totalDuration: Long ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", @@ -108,18 +113,17 @@ object ExecutorStorageSpillHeuristicTest { failedTasks = 0, completedTasks = 0, totalTasks = 0, - totalDuration=0, + totalDuration, totalInputBytes=0, totalShuffleRead=0, totalShuffleWrite= 0, maxMemory= 0, - totalMemoryBytesSpilled, + totalGCTime, executorLogs = Map.empty ) def newFakeSparkApplicationData( - executorSummaries: Seq[ExecutorSummaryImpl], - appConfigurationProperties: Map[String, String] + executorSummaries: Seq[ExecutorSummaryImpl] ): SparkApplicationData = { val appId = "application_1" @@ -129,11 +133,6 @@ object ExecutorStorageSpillHeuristicTest { stageDatas = Seq.empty, executorSummaries = executorSummaries ) - - val logDerivedData = SparkLogDerivedData( - SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq)) - ) - - SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + SparkApplicationData(appId, restDerivedData, None) } } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index cf07fe377..7dbeea921 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -249,7 +249,7 @@ object ExecutorsHeuristicTest { totalShuffleRead, totalShuffleWrite, maxMemory, - totalMemoryBytesSpilled = 0, + totalGCTime = 0, executorLogs = Map.empty ) From b2e23a40088d3ce50875ddd356f387ed97c6b176 Mon Sep 17 00:00:00 2001 From: swasti Date: Mon, 18 Dec 2017 15:59:29 +0530 Subject: [PATCH 4/5] review changes --- app-conf/HeuristicConf.xml | 2 +- ...uristic.scala => GcCpuTimeHeuristic.scala} | 19 +++++++++++++++---- .../spark/helpGcCpuTimeHeuristic.scala.html | 16 +++++++++++----- 3 files changed, 27 insertions(+), 10 deletions(-) rename app/com/linkedin/drelephant/spark/heuristics/{ExecutorStorageSpillHeuristic.scala => GcCpuTimeHeuristic.scala} (83%) diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index 2cd6c2b6c..5a9ba82e8 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -195,7 +195,7 @@ spark - Spark GC Time to CPU Time + Spark GC Time to Run Time com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic views.html.help.spark.helpGcCpuTimeHeuristic diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala similarity index 83% rename from app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala rename to app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala index 97008a522..b547adb8f 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala @@ -21,11 +21,11 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1._ import com.linkedin.drelephant.analysis._ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.SparkApplicationData + import scala.collection.JavaConverters /** * A heuristic based on GC time and CPU run time - * */ class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) extends Heuristic[SparkApplicationData] { @@ -33,6 +33,14 @@ class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfig import GcCpuTimeHeuristic._ import JavaConverters._ + val gcSeverityAThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_A_THRESHOLDS_KEY), ascending = true) + .getOrElse(DEFAULT_GC_SEVERITY_A_THRESHOLDS) + + val gcSeverityDThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_D_THRESHOLDS_KEY), ascending = true) + .getOrElse(DEFAULT_GC_SEVERITY_D_THRESHOLDS) + override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData override def apply(data: SparkApplicationData): HeuristicResult = { @@ -77,7 +85,10 @@ object GcCpuTimeHeuristic { val DEFAULT_GC_SEVERITY_D_THRESHOLDS = SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) - class Evaluator(memoryFractionHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { + val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold" + val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold" + + class Evaluator(gcCpuTimeHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties @@ -85,8 +96,8 @@ object GcCpuTimeHeuristic { var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble - lazy val severityTimeA: Severity = DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(ratio) - lazy val severityTimeD: Severity = DEFAULT_GC_SEVERITY_D_THRESHOLDS.severityOf(ratio) + lazy val severityTimeA: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio) + lazy val severityTimeD: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio) lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD) /** diff --git a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html index a1969ebe2..646b186d6 100644 --- a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html +++ b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html @@ -1,6 +1,12 @@ -

The ratio of jvmGcTime to executorCpuTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).

+

The ratio of jvmGcTime to executorRunTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).

The severity thresholds are as follows :

-

Low: avg (jvmGcTime / executorCpuTime) >= .08

-

Moderate: avg (jvmGcTime / executorCpuTime) >= .1

-

Critical: avg (jvmGcTime / executorCpuTime) >= .15

-

Severe:avg (jvmGcTime / executorCpuTime) >= .2

+

Low: avg (jvmGcTime / executorRunTime) >= .08

+

Moderate: avg (jvmGcTime / executorRunTime) >= .1

+

Critical: avg (jvmGcTime / executorRunTime) >= .15

+

Severe:avg (jvmGcTime / executorRunTime) >= .2

+

The severity thresholds in case it is taking too little time are as follows:

+

Low: avg (jvmGcTime / executorRunTime) < .05)

+

Moderate: avg (jvmGcTime / executorRunTime) < .04)

+

Critical: avg (jvmGcTime / executorRunTime) < .03)

+

Severe: avg (jvmGcTime / executorRunTime) < .01)

+ From fa62ddf1eef28377310514bd25c1b1e6e3c5c4be Mon Sep 17 00:00:00 2001 From: swasti Date: Thu, 4 Jan 2018 11:49:48 +0530 Subject: [PATCH 5/5] removing driver, improving recommendations and renaming files --- app-conf/HeuristicConf.xml | 6 ++-- ...ristic.scala => ExecutorGcHeuristic.scala} | 28 +++++++++---------- .../spark/helpExecutorGcHeuristic.scala.html | 20 +++++++++++++ .../spark/helpGcCpuTimeHeuristic.scala.html | 12 -------- ...st.scala => ExecutorGcHeuristicTest.scala} | 16 +++++------ 5 files changed, 45 insertions(+), 37 deletions(-) rename app/com/linkedin/drelephant/spark/heuristics/{GcCpuTimeHeuristic.scala => ExecutorGcHeuristic.scala} (77%) create mode 100644 app/views/help/spark/helpExecutorGcHeuristic.scala.html delete mode 100644 app/views/help/spark/helpGcCpuTimeHeuristic.scala.html rename test/com/linkedin/drelephant/spark/heuristics/{ExecutorStorageSpillHeuristicTest.scala => ExecutorGcHeuristicTest.scala} (90%) diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index 5a9ba82e8..3e264e853 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -195,9 +195,9 @@ spark - Spark GC Time to Run Time - com.linkedin.drelephant.spark.heuristics.GcCpuTimeHeuristic - views.html.help.spark.helpGcCpuTimeHeuristic + Executor GC + com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic + views.html.help.spark.helpExecutorGcHeuristic diff --git a/app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala similarity index 77% rename from app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala rename to app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala index b547adb8f..23da7db28 100644 --- a/app/com/linkedin/drelephant/spark/heuristics/GcCpuTimeHeuristic.scala +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala @@ -25,12 +25,12 @@ import com.linkedin.drelephant.spark.data.SparkApplicationData import scala.collection.JavaConverters /** - * A heuristic based on GC time and CPU run time + * A heuristic based on GC time and CPU run time. It calculates the ratio of the total time a job spends in GC to the total run time of a job and warns if too much time is spent in GC. */ -class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) +class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) extends Heuristic[SparkApplicationData] { - import GcCpuTimeHeuristic._ + import ExecutorGcHeuristic._ import JavaConverters._ val gcSeverityAThresholds: SeverityThresholds = @@ -47,23 +47,23 @@ class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfig val evaluator = new Evaluator(this, data) var resultDetails = Seq( new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), - new HeuristicResultDetails("GC total time", evaluator.jvmTime.toString), - new HeuristicResultDetails("Executor Run time", evaluator.executorRunTimeTotal.toString) + new HeuristicResultDetails("Total GC time", evaluator.jvmTime.toString), + new HeuristicResultDetails("Total Executor Runtime", evaluator.executorRunTimeTotal.toString) ) //adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { - resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is above normal, we recommend to increase the executor memory") + resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The job is spending too much time on GC. We recommend increasing the executor memory.") } //severityTimeD corresponds to the descending severity calculation if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { - resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The ratio of JVM GC Time and executor Time is below normal, we recommend to decrease the executor memory") + resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too less time in GC. Please check if you have asked for more executor memory than required.") } val result = new HeuristicResult( heuristicConfigurationData.getClassName, heuristicConfigurationData.getHeuristicName, - evaluator.severity, + evaluator.severityTimeA, 0, resultDetails.asJava ) @@ -71,7 +71,7 @@ class GcCpuTimeHeuristic(private val heuristicConfigurationData: HeuristicConfig } } -object GcCpuTimeHeuristic { +object ExecutorGcHeuristic { val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" val SPARK_EXECUTOR_CORES = "spark.executor.cores" @@ -88,17 +88,17 @@ object GcCpuTimeHeuristic { val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold" val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold" - class Evaluator(gcCpuTimeHeuristic: GcCpuTimeHeuristic, data: SparkApplicationData) { - lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries + class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) { + lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries + lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver")) lazy val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble - lazy val severityTimeA: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio) - lazy val severityTimeD: Severity = gcCpuTimeHeuristic.gcSeverityAThresholds.severityOf(ratio) - lazy val severity : Severity = Severity.max(severityTimeA, severityTimeD) + lazy val severityTimeA: Severity = executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio) + lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio) /** * returns the total JVM GC Time and total executor Run Time across all stages diff --git a/app/views/help/spark/helpExecutorGcHeuristic.scala.html b/app/views/help/spark/helpExecutorGcHeuristic.scala.html new file mode 100644 index 000000000..02ca91ac7 --- /dev/null +++ b/app/views/help/spark/helpExecutorGcHeuristic.scala.html @@ -0,0 +1,20 @@ +@* +* Copyright 2016 LinkedIn Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*@ + +

This analysis shows how much time a job is spending in GC. To normalise the results across all jobs, the ratio of the time a job spends in Gc to the total run time of the job is calculated.

+

A job is flagged if the ratio is too high, meaning the job spends too much time in GC.

+

Suggestions

+

We recommend increasing the executor memory.

\ No newline at end of file diff --git a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html b/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html deleted file mode 100644 index 646b186d6..000000000 --- a/app/views/help/spark/helpGcCpuTimeHeuristic.scala.html +++ /dev/null @@ -1,12 +0,0 @@ -

The ratio of jvmGcTime to executorRunTime is checked, to see if GC is taking too much time (providing more memory could help) or too little time (memory may be over provisioned, and can be reduced).

-

The severity thresholds are as follows :

-

Low: avg (jvmGcTime / executorRunTime) >= .08

-

Moderate: avg (jvmGcTime / executorRunTime) >= .1

-

Critical: avg (jvmGcTime / executorRunTime) >= .15

-

Severe:avg (jvmGcTime / executorRunTime) >= .2

-

The severity thresholds in case it is taking too little time are as follows:

-

Low: avg (jvmGcTime / executorRunTime) < .05)

-

Moderate: avg (jvmGcTime / executorRunTime) < .04)

-

Critical: avg (jvmGcTime / executorRunTime) < .03)

-

Severe: avg (jvmGcTime / executorRunTime) < .01)

- diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala similarity index 90% rename from test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala rename to test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala index 31a2d1635..869b9cb67 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala @@ -27,10 +27,10 @@ import org.scalatest.{FunSpec, Matchers} import scala.concurrent.duration.Duration -class GcCpuTimeHeuristicTest extends FunSpec with Matchers { - import GcCpuTimeHeuristicTest._ +class ExecutorGcHeuristicTest extends FunSpec with Matchers { + import ExecutorGcHeuristicTest._ - describe("GcCpuTimeHeuristic") { + describe("ExecutorGcHeuristic") { val heuristicConfigurationData = newFakeHeuristicConfigurationData( Map( "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", @@ -38,7 +38,7 @@ class GcCpuTimeHeuristicTest extends FunSpec with Matchers { "ignore_max_millis_less_than_threshold" -> "4000001" ) ) - val gcCpuTimeHeuristic = new GcCpuTimeHeuristic(heuristicConfigurationData) + val executorGcHeuristic = new ExecutorGcHeuristic(heuristicConfigurationData) val executorSummaries = Seq( newFakeExecutorSummary( @@ -65,7 +65,7 @@ class GcCpuTimeHeuristicTest extends FunSpec with Matchers { describe(".apply") { val data1 = newFakeSparkApplicationData(executorSummaries) - val heuristicResult = gcCpuTimeHeuristic.apply(data1) + val heuristicResult = executorGcHeuristic.apply(data1) val heuristicResultDetails = heuristicResult.getHeuristicResultDetails it("returns the severity") { @@ -80,20 +80,20 @@ class GcCpuTimeHeuristicTest extends FunSpec with Matchers { it("returns the total GC time") { val details = heuristicResultDetails.get(1) - details.getName should include("GC total time") + details.getName should include("Total GC time") details.getValue should be("1200000") } it("returns the executor's run time") { val details = heuristicResultDetails.get(2) - details.getName should include("Executor Run time") + details.getName should include("Total Executor Runtime") details.getValue should be("4740000") } } } } -object GcCpuTimeHeuristicTest { +object ExecutorGcHeuristicTest { import JavaConverters._ def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =