From 55a3f2b5725727027a1978b07f605594a3ee23be Mon Sep 17 00:00:00 2001 From: skakker Date: Wed, 10 Jan 2018 11:31:11 +0530 Subject: [PATCH] Spark Executor Spill Heuristic - (Depends on Custom SHS - Requires totalMemoryBytesSpilled metric) (#310) --- app-conf/HeuristicConf.xml | 8 +- .../fetchers/statusapiv1/statusapiv1.scala | 6 +- .../ExecutorStorageSpillHeuristic.scala | 133 +++++++++++++++++ .../legacydata/LegacyDataConverters.scala | 1 + .../spark/legacydata/SparkExecutorData.java | 3 +- .../drelephant/util/MemoryFormatUtils.java | 3 + ...lpExecutorStorageSpillHeuristic.scala.html | 23 +++ .../spark/SparkMetricsAggregatorTest.scala | 1 + .../heuristics/ExecutorGcHeuristicTest.scala | 3 +- .../ExecutorStorageSpillHeuristicTest.scala | 136 ++++++++++++++++++ .../heuristics/ExecutorsHeuristicTest.scala | 1 + 11 files changed, 313 insertions(+), 5 deletions(-) create mode 100644 app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala create mode 100644 app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html create mode 100644 test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index d4fc7b9c1..3a8d833d4 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -316,5 +316,11 @@ com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic views.html.help.spark.helpExecutorGcHeuristic - + + spark + Executor spill + com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic + views.html.help.spark.helpExecutorStorageSpillHeuristic + + diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 41d88e1b8..ea3d29dbc 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,6 +87,7 @@ trait ExecutorSummary{ def totalShuffleWrite: Long def maxMemory: Long def totalGCTime: Long + def totalMemoryBytesSpilled: Long def executorLogs: Map[String, String]} trait JobData{ @@ -160,7 +161,7 @@ trait StageData{ def schedulingPool: String def accumulatorUpdates: Seq[AccumulableInfo] - def tasks: Option[Map[Long, TaskData]] + def tasks: Option[Map[Long, TaskDataImpl]] def executorSummary: Option[Map[String, ExecutorStageSummary]]} trait TaskData{ @@ -293,6 +294,7 @@ class ExecutorSummaryImpl( var totalShuffleWrite: Long, var maxMemory: Long, var totalGCTime: Long, + var totalMemoryBytesSpilled: Long, var executorLogs: Map[String, String]) extends ExecutorSummary class JobDataImpl( @@ -366,7 +368,7 @@ class StageDataImpl( var schedulingPool: String, var accumulatorUpdates: Seq[AccumulableInfoImpl], - var tasks: Option[Map[Long, TaskData]], + var tasks: Option[Map[Long, TaskDataImpl]], var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData class TaskDataImpl( diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala new file mode 100644 index 000000000..a625b441e --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristic.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.Severity +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData} +import com.linkedin.drelephant.analysis._ +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.SparkApplicationData +import com.linkedin.drelephant.util.MemoryFormatUtils + +import scala.collection.JavaConverters + + +/** + * A heuristic based on memory spilled. + * + */ +class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) + extends Heuristic[SparkApplicationData] { + + import ExecutorStorageSpillHeuristic._ + import JavaConverters._ + + val spillFractionOfExecutorsThreshold: Double = + if(heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY) == null) DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD + else heuristicConfigurationData.getParamMap.get(SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY).toDouble + + val spillMaxMemoryThreshold: Double = + if(heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY) == null) DEFAULT_SPILL_MAX_MEMORY_THRESHOLD + else heuristicConfigurationData.getParamMap.get(SPILL_MAX_MEMORY_THRESHOLD_KEY).toDouble + + val sparkExecutorCoresThreshold : Int = + if(heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_CORES_THRESHOLD_KEY) == null) DEFAULT_SPARK_EXECUTOR_CORES_THRESHOLD + else heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_CORES_THRESHOLD_KEY).toInt + + val sparkExecutorMemoryThreshold : String = + if(heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY) == null) DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD + else heuristicConfigurationData.getParamMap.get(SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY) + + override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData + + override def apply(data: SparkApplicationData): HeuristicResult = { + val evaluator = new Evaluator(this, data) + var resultDetails = Seq( + new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)), + new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)), + new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)), + new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString) + ) + + if(evaluator.severity != Severity.NONE){ + resultDetails :+ new HeuristicResultDetails("Note", "Your execution memory is being spilled. Kindly look into it.") + if(evaluator.sparkExecutorCores >= sparkExecutorCoresThreshold && evaluator.sparkExecutorMemory >= MemoryFormatUtils.stringToBytes(sparkExecutorMemoryThreshold)) { + resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try decreasing the number of cores to reduce the number of concurrently running tasks.") + } else if (evaluator.sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(sparkExecutorMemoryThreshold)) { + resultDetails :+ new HeuristicResultDetails("Recommendation", "You can try increasing the executor memory to reduce spill.") + } + } + + val result = new HeuristicResult( + heuristicConfigurationData.getClassName, + heuristicConfigurationData.getHeuristicName, + evaluator.severity, + 0, + resultDetails.asJava + ) + result + } +} + +object ExecutorStorageSpillHeuristic { + val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" + val SPARK_EXECUTOR_CORES = "spark.executor.cores" + val SPILL_FRACTION_OF_EXECUTORS_THRESHOLD_KEY = "spill_fraction_of_executors_threshold" + val SPILL_MAX_MEMORY_THRESHOLD_KEY = "spill_max_memory_threshold" + val SPARK_EXECUTOR_CORES_THRESHOLD_KEY = "spark_executor_cores_threshold" + val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold" + val DEFAULT_SPILL_FRACTION_OF_EXECUTORS_THRESHOLD : Double = 0.2 + val DEFAULT_SPILL_MAX_MEMORY_THRESHOLD : Double = 0.05 + val DEFAULT_SPARK_EXECUTOR_CORES_THRESHOLD : Int = 4 + val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD : String ="10GB" + + class Evaluator(executorStorageSpillHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) { + lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries + lazy val appConfigurationProperties: Map[String, String] = + data.appConfigurationProperties + val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max + val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size + val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum + val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble + val severity: Severity = { + if (fractionOfExecutorsHavingBytesSpilled != 0) { + if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && maxMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + Severity.LOW + } + else if (fractionOfExecutorsHavingBytesSpilled < executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + Severity.MODERATE + } + + else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && meanMemorySpilled < executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + Severity.SEVERE + } + else if (fractionOfExecutorsHavingBytesSpilled >= executorStorageSpillHeuristic.spillFractionOfExecutorsThreshold + && meanMemorySpilled >= executorStorageSpillHeuristic.spillMaxMemoryThreshold * sparkExecutorMemory) { + Severity.CRITICAL + } else Severity.NONE + } + else Severity.NONE + } + + lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0) + lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0) + } +} + diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 62a58695b..f9cc59394 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -174,6 +174,7 @@ object LegacyDataConverters { executorInfo.shuffleWrite, executorInfo.maxMem, executorInfo.totalGCTime, + executorInfo.totalMemoryBytesSpilled, executorLogs = Map.empty ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index 4e2ad4de3..e107a5c4d 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -44,6 +44,7 @@ public static class ExecutorInfo { public long outputBytes = 0L; public long shuffleRead = 0L; public long totalGCTime = 0L; + public long totalMemoryBytesSpilled = 0L; public long shuffleWrite = 0L; public String toString() { @@ -51,7 +52,7 @@ public String toString() { + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead - + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}"; + + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}"; } } diff --git a/app/com/linkedin/drelephant/util/MemoryFormatUtils.java b/app/com/linkedin/drelephant/util/MemoryFormatUtils.java index b32f61fb9..8ed49fbc0 100644 --- a/app/com/linkedin/drelephant/util/MemoryFormatUtils.java +++ b/app/com/linkedin/drelephant/util/MemoryFormatUtils.java @@ -94,6 +94,9 @@ public static long stringToBytes(String formattedString) { return 0L; } + //handling if the string has , for eg. 1,000MB + formattedString = formattedString.replace(",", ""); + Matcher matcher = REGEX_MATCHER.matcher(formattedString); if (!matcher.matches()) { throw new IllegalArgumentException( diff --git a/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html new file mode 100644 index 000000000..a23efb735 --- /dev/null +++ b/app/views/help/spark/helpExecutorStorageSpillHeuristic.scala.html @@ -0,0 +1,23 @@ +@* +* Copyright 2016 LinkedIn Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*@ +

Spark performs best when data is kept in memory. Spilled execution memory is tracked by memoryBytesSpilled, which is available executor level. If execution memory is being spilled, then the warnings are as follows:

+

Low: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and max size is < .05 * spark.executor.memory.

+

Moderate: memoryBytesSpilled is non-zero for 1 or more executors, greater than zero for < 20% of executors, and avg size is < .05 * spark.executor.memory.

+

Severe: memoryBytes Spilled is greater than zero for > 20% of executors and avg size is < .05 * spark.executor.memory.

+

Critical: memoryBytes Spilled is greater than zero for > 20% of executors and/or avg size is >= .05 * spark.executor.memory.

+

Suggestions

+

If number of cores (spark.executor.cores) is more than 4 and executor memory is > 10GB : Try decreasing the number of cores which would decrese the number of tasks running in parallel, hence decreasing the number of bytes spilled.

+

You can also try increasing the spark.executor.memory which will reduce memory spilled.

diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 77e3e1d29..66605dd3e 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -195,6 +195,7 @@ object SparkMetricsAggregatorTest { totalShuffleWrite = 0, maxMemory = 0, totalGCTime = 0, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala index 869b9cb67..d9a8fc69f 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala @@ -117,8 +117,9 @@ object ExecutorGcHeuristicTest { totalInputBytes=0, totalShuffleRead=0, totalShuffleWrite= 0, - maxMemory= 0, + maxMemory = 0, totalGCTime, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty ) diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala new file mode 100644 index 000000000..2a061e8d3 --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -0,0 +1,136 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import scala.collection.JavaConverters +import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.scalatest.{FunSpec, Matchers} + + +class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers { + import ExecutorStorageSpillHeuristicTest._ + + describe("ExecutorStorageSpillHeuristic") { + val heuristicConfigurationData = newFakeHeuristicConfigurationData( + Map.empty + ) + val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData) + + val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4", "spark.executor.instances"->"4") + + val executorSummaries = Seq( + newFakeExecutorSummary( + id = "1", + totalMemoryBytesSpilled = 200000L + ), + newFakeExecutorSummary( + id = "2", + totalMemoryBytesSpilled = 100000L + ), + newFakeExecutorSummary( + id = "3", + totalMemoryBytesSpilled = 300000L + ), + newFakeExecutorSummary( + id = "4", + totalMemoryBytesSpilled = 200000L + ) + ) + + describe(".apply") { + val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties) + val heuristicResult = executorStorageSpillHeuristic.apply(data1) + val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + + it("returns the severity") { + heuristicResult.getSeverity should be(Severity.SEVERE) + } + + it("returns the total memory spilled") { + val details = heuristicResultDetails.get(0) + details.getName should include("Total memory spilled") + details.getValue should be("781.25 KB") + } + + it("returns the max memory spilled") { + val details = heuristicResultDetails.get(1) + details.getName should include("Max memory spilled") + details.getValue should be("292.97 KB") + } + + it("returns the mean memory spilled") { + val details = heuristicResultDetails.get(2) + details.getName should include("Mean memory spilled") + details.getValue should be("195.31 KB") + } + } + } +} + +object ExecutorStorageSpillHeuristicTest { + import JavaConverters._ + + def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = + new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) + + def newFakeExecutorSummary( + id: String, + totalMemoryBytesSpilled: Long + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( + id, + hostPort = "", + rddBlocks = 0, + memoryUsed=0, + diskUsed = 0, + activeTasks = 0, + failedTasks = 0, + completedTasks = 0, + totalTasks = 0, + totalDuration=0, + totalInputBytes=0, + totalShuffleRead=0, + totalShuffleWrite= 0, + maxMemory= 0, + totalGCTime = 0, + totalMemoryBytesSpilled, + executorLogs = Map.empty + ) + + def newFakeSparkApplicationData( + executorSummaries: Seq[ExecutorSummaryImpl], + appConfigurationProperties: Map[String, String] + ): SparkApplicationData = { + val appId = "application_1" + + val restDerivedData = SparkRestDerivedData( + new ApplicationInfoImpl(appId, name = "app", Seq.empty), + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries + ) + + val logDerivedData = SparkLogDerivedData( + SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq)) + ) + + SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) + } +} diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index 7dbeea921..b4095ccf9 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -250,6 +250,7 @@ object ExecutorsHeuristicTest { totalShuffleWrite, maxMemory, totalGCTime = 0, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty )