Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite Spark fetcher/heuristics. #162

Merged
merged 13 commits into from
Dec 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions app-conf/AggregatorConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
</aggregator>
<aggregator>
<applicationtype>spark</applicationtype>
<classname>org.apache.spark.SparkMetricsAggregator</classname>
<!--
<classname>com.linkedin.drelephant.spark.SparkMetricsAggregator</classname>
<params>
<storage_mem_wastage_buffer>0.5</storage_mem_wastage_buffer>
<allocated_memory_waste_buffer_percentage>0.5</allocated_memory_waste_buffer_percentage>
</params>
-->
</aggregator>
</aggregators>
13 changes: 2 additions & 11 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,9 @@
</params>
</fetcher>
-->

<fetcher>
<applicationtype>spark</applicationtype>
<classname>org.apache.spark.deploy.history.SparkFSFetcher</classname>
<!--
<params>
<event_log_size_limit_in_mb>100</event_log_size_limit_in_mb>
<event_log_dir>/system/spark-history</event_log_dir>
<spark_log_ext>_1.snappy</spark_log_ext>

#the values specified in namenode_addresses will be used for obtaining spark logs. The cluster configuration will be ignored.
<namenode_addresses>address1,address2</namenode_addresses>
</params>
-->
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
</fetcher>
</fetchers>
62 changes: 12 additions & 50 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,65 +158,27 @@

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Configuration Best Practice</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.BestPropertiesConventionHeuristic</classname>
<viewname>views.html.help.spark.helpBestProperties</viewname>
<!--<params>
<driver_memory_severity_in_gb>4, 4, 8, 8</driver_memory_severity_in_gb>
<num_core_severity>2</num_core_severity>
</params>-->
<heuristicname>Spark Configuration</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ConfigurationHeuristic</classname>
<viewname>views.html.help.spark.helpConfigurationHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Memory Limit</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.MemoryLimitHeuristic</classname>
<viewname>views.html.help.spark.helpMemoryLimit</viewname>
<!--<params>
<total_mem_severity_in_tb>0.5, 1, 1.5, 2</total_mem_severity_in_tb>
<mem_util_severity>0.8, 0.6, 0.4, 0.2</mem_util_severity>
</params>-->
<heuristicname>Spark Executor Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorsHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorsHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Stage Runtime</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StageRuntimeHeuristic</classname>
<viewname>views.html.help.spark.helpStageRuntime</viewname>
<!--<params>
<stage_runtime_severity_in_min>15, 30, 60, 60</stage_runtime_severity_in_min>
<stage_failure_rate_severity>0.3, 0.3, 0.5, 0.5</stage_failure_rate_severity>
<single_stage_tasks_failure_rate_severity>0.0, 0.3, 0.5, 0.5</single_stage_tasks_failure_rate_severity>
</params>-->
<heuristicname>Spark Job Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JobsHeuristic</classname>
<viewname>views.html.help.spark.helpJobsHeuristic</viewname>
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Job Runtime</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JobRuntimeHeuristic</classname>
<viewname>views.html.help.spark.helpJobRuntime</viewname>
<!--<params>
<avg_job_failure_rate_severity>0.1, 0.3, 0.5, 0.5</avg_job_failure_rate_severity>
<single_job_failure_rate_severity>0.0, 0.3, 0.5, 0.5</single_job_failure_rate_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Executor Load Balance</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorLoadHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorLoad</viewname>
<!--<params>
<looser_metric_deviation_severity>0.8, 1, 1.2, 1.4</looser_metric_deviation_severity>
<metric_deviation_severity>0.4, 0.6, 0.8, 1.0</metric_deviation_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Event Log Limit</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.EventLogLimitHeuristic</classname>
<viewname>views.html.help.spark.helpEventLogLimit</viewname>
<heuristicname>Spark Stage Metrics</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>

</heuristics>
1 change: 0 additions & 1 deletion app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.spark.SparkMetricsAggregator;
import org.w3c.dom.Document;
import play.api.Play;
import play.api.templates.Html;
Expand Down
16 changes: 8 additions & 8 deletions app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,32 @@ public class HadoopAggregatedData {
private long totalDelay = 0;

/**
* Returns the resource usage of the job
* @return The resource usage of the job
* Returns the resource usage (in MBSeconds) of the job
* @return The resource usage (in MBSeconds) of the job
*/
public long getResourceUsed() {
return resourceUsed;
}

/**
* Setter for the resource usage of the job
* @param resourceUsed The resource usage of the job
* Setter for the resource usage (in MBSeconds) of the job
* @param resourceUsed The resource usage (in MBSeconds) of the job
*/
public void setResourceUsed(long resourceUsed) {
this.resourceUsed = resourceUsed;
}

/**
* Returns the wasted resources of the job
* @return The wasted resources of the job
* Returns the wasted resources (in MBSeconds) of the job
* @return The wasted resources (in MBSeconds) of the job
*/
public long getResourceWasted() {
return resourceWasted;
}

/**
* Setter for the wasted resources
* @param resourceWasted The wasted resources of the job
* Setter for the wasted resources (in MBSeconds)
* @param resourceWasted The wasted resources (in MBSeconds) of the job
*/
public void setResourceWasted(long resourceWasted) {
this.resourceWasted = resourceWasted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class HeuristicResultDetails {
private String _value;
private String _details;

public HeuristicResultDetails(String name, String value) {
this(name, value, null);
}

public HeuristicResultDetails(String name, String value, String details) {
this._name = name;
this._value = value;
Expand Down
53 changes: 53 additions & 0 deletions app/com/linkedin/drelephant/analysis/SeverityThresholds.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis

import com.linkedin.drelephant.util.Utils


/**
* A convenience case class for containing severity thresholds and calculating severity.
*/
case class SeverityThresholds(low: Number, moderate: Number, severe: Number, critical: Number, ascending: Boolean) {
if (ascending) {
require(low.doubleValue <= moderate.doubleValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use < instead . I can't think of a scenario where it will be equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code used equal numbers for severe and critical for some thresholds, e.g.

private double[] avgJobFailureLimits = {0.1d, 0.3d, 0.5d, 0.5d}; // The avg job failure rate
. Also,
public static Severity getSeverityAscending(Number value, Number low, Number moderate, Number severe,
, for example, will check for equal values.

require(moderate.doubleValue <= severe.doubleValue)
require(severe.doubleValue <= critical.doubleValue)
} else {
require(low.doubleValue >= moderate.doubleValue)
require(moderate.doubleValue >= severe.doubleValue)
require(severe.doubleValue >= critical.doubleValue)
}

def severityOf(value: Number): Severity = if (ascending) {
Severity.getSeverityAscending(value, low, moderate, severe, critical)
} else {
Severity.getSeverityDescending(value, low, moderate, severe, critical)
}
}

object SeverityThresholds {
val NUM_THRESHOLDS = 4

/** Returns a SeverityThresholds object from a Dr. Elephant configuration string parseable by Utils.getParam(String, int). */
def parse(
rawString: String,
ascending: Boolean
): Option[SeverityThresholds] = Option(Utils.getParam(rawString, NUM_THRESHOLDS)).map { thresholds =>
SeverityThresholds(low = thresholds(0), moderate = thresholds(1), severe = thresholds(2), critical = thresholds(3), ascending)
}
}
5 changes: 4 additions & 1 deletion app/com/linkedin/drelephant/math/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ public static long percentile(List<Long> values, int percentile) {
}

Collections.sort(values);
int position = (int) Math.ceil(values.size() * percentile / 100);

// Use Nearest Rank method.
// https://en.wikipedia.org/wiki/Percentile#The_Nearest_Rank_method
int position = (int) Math.ceil(values.size() * percentile / 100.0);

// should never happen.
if (position == 0) {
Expand Down
120 changes: 120 additions & 0 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark

import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator}
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger
import scala.util.Try


class SparkMetricsAggregator(private val aggregatorConfigurationData: AggregatorConfigurationData)
extends HadoopMetricsAggregator {
import SparkMetricsAggregator._

private val logger: Logger = Logger.getLogger(classOf[SparkMetricsAggregator])

private val allocatedMemoryWasteBufferPercentage: Double =
Option(aggregatorConfigurationData.getParamMap.get(ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY))
.flatMap { value => Try(value.toDouble).toOption }
.getOrElse(DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE)

private val hadoopAggregatedData: HadoopAggregatedData = new HadoopAggregatedData()

override def getResult(): HadoopAggregatedData = hadoopAggregatedData

override def aggregate(data: HadoopApplicationData): Unit = data match {
case (data: SparkApplicationData) => aggregate(data)
case _ => throw new IllegalArgumentException("data should be SparkApplicationData")
}

private def aggregate(data: SparkApplicationData): Unit = for {
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedMBSeconds =
aggregateResourcesAllocatedMBSeconds(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesUsedMBSeconds = aggregateResourcesUsedMBSeconds(executorMemoryBytes, totalExecutorTaskTimeMillis)

val resourcesWastedMBSeconds =
((BigDecimal(resourcesAllocatedMBSeconds) * (1.0 - allocatedMemoryWasteBufferPercentage)) - BigDecimal(resourcesUsedMBSeconds))
.toBigInt

if (resourcesUsedMBSeconds.isValidLong) {
hadoopAggregatedData.setResourceUsed(resourcesUsedMBSeconds.toLong)
} else {
logger.info(s"resourcesUsedMBSeconds exceeds Long.MaxValue: ${resourcesUsedMBSeconds}")
}

if (resourcesWastedMBSeconds.isValidLong) {
hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong)
} else {
logger.info(s"resourcesWastedMBSeconds exceeds Long.MaxValue: ${resourcesWastedMBSeconds}")
}
}

private def aggregateResourcesUsedMBSeconds(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}

private def aggregateResourcesAllocatedMBSeconds(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}

private def executorInstancesOf(data: SparkApplicationData): Option[Int] = {
val appConfigurationProperties = data.appConfigurationProperties
appConfigurationProperties.get(SPARK_EXECUTOR_INSTANCES_KEY).map(_.toInt)
}

private def executorMemoryBytesOf(data: SparkApplicationData): Option[Long] = {
val appConfigurationProperties = data.appConfigurationProperties
appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)
}

private def applicationDurationMillisOf(data: SparkApplicationData): Long = {
require(data.applicationInfo.attempts.nonEmpty)
val lastApplicationAttemptInfo = data.applicationInfo.attempts.last
lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime
}

private def totalExecutorTaskTimeMillisOf(data: SparkApplicationData): BigInt = {
data.executorSummaries.map { executorSummary => BigInt(executorSummary.totalDuration) }.sum
}
}

object SparkMetricsAggregator {
/** The percentage of allocated memory we expect to waste because of overhead. */
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
}
Loading