Skip to content

Commit

Permalink
changed logic to fetch spilled bytes information from executors
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Dec 7, 2017
1 parent af29f4b commit 204d5a7
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalMemoryBytesSpilled: Long
def executorLogs: Map[String, String]}

trait JobData{
Expand Down Expand Up @@ -292,6 +293,7 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalMemoryBytesSpilled: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary

class JobDataImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.Severity
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, StageData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ExecutorStageSummary, ExecutorSummary, StageData}
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
Expand All @@ -41,7 +41,10 @@ class ExecutorStorageSpillHeuristic(private val heuristicConfigurationData: Heur
override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled))
new HeuristicResultDetails("Total memory spilled", MemoryFormatUtils.bytesToString(evaluator.totalMemorySpilled)),
new HeuristicResultDetails("Max memory spilled", MemoryFormatUtils.bytesToString(evaluator.maxMemorySpilled)),
new HeuristicResultDetails("Mean memory spilled", MemoryFormatUtils.bytesToString(evaluator.meanMemorySpilled)),
new HeuristicResultDetails("Fraction of executors having non zero bytes spilled", evaluator.fractionOfExecutorsHavingBytesSpilled.toString)
)

if(evaluator.severity != Severity.NONE){
Expand All @@ -67,45 +70,35 @@ object ExecutorStorageSpillHeuristic {
val SPARK_EXECUTOR_CORES = "spark.executor.cores"

class Evaluator(memoryFractionHeuristic: ExecutorStorageSpillHeuristic, data: SparkApplicationData) {
lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
lazy val stageDatas: Seq[StageData] = data.stageDatas
val ratioMemoryCores: Double = sparkExecutorMemory.toDouble / sparkExecutorCores.toDouble
lazy val (severity, totalMemorySpilled : Long) = getExecutionSpillSeverity()
def getExecutionSpillSeverity(): (Severity, Long) = {
var bytesSpilled : Long = 0
var executionSpillSeverity = Severity.NONE
stageDatas.foreach(stageData => {
val executorStageList: collection.Map[String, ExecutorStageSummary] = stageData.executorSummary.getOrElse(Map.empty)
val maxMemorySpilled: Long = executorStageList.values.map(_.memoryBytesSpilled).max
val meanMemorySpilled = executorStageList.values.map(_.memoryBytesSpilled).sum / executorStageList.values.size
val ratioMemoryBytesSpilled: Double = executorStageList.values.count(_.memoryBytesSpilled > 0).toDouble / executorStageList.values.size.toDouble
bytesSpilled += executorStageList.values.count(_.memoryBytesSpilled > 0).toLong
val severityExecutionSpillStage: Severity = {
if (ratioMemoryBytesSpilled != 0) {
if (ratioMemoryBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.LOW
}
else if (ratioMemoryBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.MODERATE
}
val ratioMemoryCores: Long = (sparkExecutorMemory / sparkExecutorCores)
val maxMemorySpilled: Long = executorSummaries.map(_.totalMemoryBytesSpilled).max
val meanMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum / executorSummaries.size
val totalMemorySpilled = executorSummaries.map(_.totalMemoryBytesSpilled).sum
val fractionOfExecutorsHavingBytesSpilled: Double = executorSummaries.count(_.totalMemoryBytesSpilled > 0).toDouble / executorSummaries.size.toDouble
val severity: Severity = {
if (fractionOfExecutorsHavingBytesSpilled != 0) {
if (fractionOfExecutorsHavingBytesSpilled < 0.2 && maxMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.LOW
}
else if (fractionOfExecutorsHavingBytesSpilled < 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.MODERATE
}

else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.SEVERE
}
else if (ratioMemoryBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) {
Severity.CRITICAL
}
}
Severity.NONE
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled < 0.05 * ratioMemoryCores) {
Severity.SEVERE
}
executionSpillSeverity = Severity.max(executionSpillSeverity, severityExecutionSpillStage)
})
(executionSpillSeverity, bytesSpilled)
else if (fractionOfExecutorsHavingBytesSpilled >= 0.2 && meanMemorySpilled >= 0.05 * ratioMemoryCores) {
Severity.CRITICAL
} else Severity.NONE
}
else Severity.NONE
}

val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
lazy val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
lazy val sparkExecutorCores: Int = (appConfigurationProperties.get(SPARK_EXECUTOR_CORES).map(_.toInt)).getOrElse(0)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorInfo.totalMemoryBytesSpilled,
executorLogs = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public static class ExecutorInfo {
public long inputBytes = 0L;
public long outputBytes = 0L;
public long shuffleRead = 0L;
public long totalMemoryBytesSpilled = 0L;
public long shuffleWrite = 0L;

public String toString() {
return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ ", shuffleWrite: " + shuffleWrite + "}";
+ ", shuffleWrite: " + shuffleWrite + ", totalMemoryBytesSpilled: " + totalMemoryBytesSpilled + "}";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import scala.collection.JavaConverters
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}


class ExecutorStorageSpillHeuristicTest extends FunSpec with Matchers {
import ExecutorStorageSpillHeuristicTest._

describe("ExecutorStorageSpillHeuristic") {
val heuristicConfigurationData = newFakeHeuristicConfigurationData(
Map(
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
"ignore_max_bytes_less_than_threshold" -> "4000000",
"ignore_max_millis_less_than_threshold" -> "4000001"
)
)
val executorStorageSpillHeuristic = new ExecutorStorageSpillHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.executor.memory" -> "4g", "spark.executor.cores"->"4")

val executorSummaries = Seq(
newFakeExecutorSummary(
id = "1",
totalMemoryBytesSpilled = 200000L
),
newFakeExecutorSummary(
id = "2",
totalMemoryBytesSpilled = 100000L
),
newFakeExecutorSummary(
id = "3",
totalMemoryBytesSpilled = 300000L
),
newFakeExecutorSummary(
id = "4",
totalMemoryBytesSpilled = 200000L
)
)

describe(".apply") {
val data1 = newFakeSparkApplicationData(executorSummaries, appConfigurationProperties)
val heuristicResult = executorStorageSpillHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.SEVERE)
}

it("returns the total memory spilled") {
val details = heuristicResultDetails.get(0)
details.getName should include("Total memory spilled")
details.getValue should be("781.25 KB")
}

it("returns the max memory spilled") {
val details = heuristicResultDetails.get(1)
details.getName should include("Max memory spilled")
details.getValue should be("292.97 KB")
}

it("returns the mean memory spilled") {
val details = heuristicResultDetails.get(2)
details.getName should include("Mean memory spilled")
details.getValue should be("195.31 KB")
}
}
}
}

object ExecutorStorageSpillHeuristicTest {
import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newFakeExecutorSummary(
id: String,
totalMemoryBytesSpilled: Long
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed=0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration=0,
totalInputBytes=0,
totalShuffleRead=0,
totalShuffleWrite= 0,
maxMemory= 0,
totalMemoryBytesSpilled,
executorLogs = Map.empty
)

def newFakeSparkApplicationData(
executorSummaries: Seq[ExecutorSummaryImpl],
appConfigurationProperties: Map[String, String]
): SparkApplicationData = {
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)

val logDerivedData = SparkLogDerivedData(
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty
)

Expand Down

0 comments on commit 204d5a7

Please sign in to comment.