diff --git a/app-conf/FetcherConf.xml b/app-conf/FetcherConf.xml
index d06ce8bf7..6c91a4817 100644
--- a/app-conf/FetcherConf.xml
+++ b/app-conf/FetcherConf.xml
@@ -37,18 +37,18 @@
+ To work properly, this fetcher should use the same timezone with the job history server.
+ If not set, the local timezone will be used.
+ -->
+
spark
- com.linkedin.drelephant.spark.fetchers.SparkFetcher
+ com.linkedin.drelephant.spark.fetchers.FSFetcher
diff --git a/app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala
new file mode 100644
index 000000000..e85196c2c
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.fetchers
+
+import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
+import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
+import com.linkedin.drelephant.spark.data.SparkApplicationData
+import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
+import org.apache.spark.deploy.history.SparkFSFetcher
+
+/**
+ * Wraps the SparkFSFetcher which has the actual logic to comply to the new SparkApplicationData interface
+ * @param fetcherConfigurationData
+ */
+class FSFetcher(fetcherConfigurationData: FetcherConfigurationData)
+ extends ElephantFetcher[SparkApplicationData] {
+ lazy val legacyFetcher = new SparkFSFetcher(fetcherConfigurationData)
+
+ override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
+ val legacyData = legacyFetcher.fetchData(analyticJob)
+ LegacyDataConverters.convert(legacyData)
+ }
+}
+
+object FSFetcher {
+}
diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
index 161b84e3f..698064ac6 100644
--- a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
+++ b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
@@ -19,6 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
+import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal
import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
@@ -36,17 +37,21 @@ import org.apache.spark.SparkConf
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
import SparkFetcher._
+ import Async.{async, await}
import ExecutionContext.Implicits.global
private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])
+ val eventLogUri = Option(fetcherConfigurationData.getParamMap.get(LOG_LOCATION_URI_XML_FIELD))
+ logger.info("The event log location of Spark application is set to " + eventLogUri)
+
private[fetchers] lazy val hadoopConfiguration: Configuration = new Configuration()
private[fetchers] lazy val sparkUtils: SparkUtils = SparkUtils
private[fetchers] lazy val sparkConf: SparkConf = {
val sparkConf = new SparkConf()
- sparkUtils.getDefaultPropertiesFile(sparkUtils.defaultEnv) match {
+ sparkUtils.getDefaultPropertiesFile() match {
case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename))
case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR")
}
@@ -65,25 +70,51 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
private[fetchers] lazy val sparkRestClient: SparkRestClient = new SparkRestClient(sparkConf)
private[fetchers] lazy val sparkLogClient: SparkLogClient = {
- new SparkLogClient(hadoopConfiguration, sparkConf)
+ new SparkLogClient(hadoopConfiguration, sparkConf, eventLogUri)
}
override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
+ doFetchData(analyticJob) match {
+ case Success(data) => data
+ case Failure(e) => throw e
+ }
+ }
+
+ private def doFetchData(analyticJob: AnalyticJob): Try[SparkApplicationData] = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
- try {
- Await.result(doFetchData(sparkRestClient, sparkLogClient, appId, eventLogSource),
- DEFAULT_TIMEOUT)
- } catch {
- case NonFatal(e) =>
+ Try {
+ Await.result(doFetchDataUsingRestAndLogClients(analyticJob), DEFAULT_TIMEOUT)
+ }.transform(
+ data => {
+ logger.info(s"Succeeded fetching data for ${appId}")
+ Success(data)
+ },
+ e => {
logger.error(s"Failed fetching data for ${appId}", e)
- throw e
+ Failure(e)
+ }
+ )
+ }
+
+ private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
+ val appId = analyticJob.getAppId
+ val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))
+
+ val logDerivedData = eventLogSource match {
+ case EventLogSource.None => None
+ case EventLogSource.Rest => restDerivedData.logDerivedData
+ case EventLogSource.WebHdfs =>
+ val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
+ Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}
+
+ SparkApplicationData(appId, restDerivedData, logDerivedData)
}
+
}
object SparkFetcher {
- import Async.{async, await}
sealed trait EventLogSource
@@ -97,27 +128,6 @@ object SparkFetcher {
}
val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
- val DEFAULT_TIMEOUT = Duration(30, SECONDS)
-
- private def doFetchData(
- sparkRestClient: SparkRestClient,
- sparkLogClient: SparkLogClient,
- appId: String,
- eventLogSource: EventLogSource
- )(
- implicit ec: ExecutionContext
- ): Future[SparkApplicationData] = async {
- val restDerivedData = await(sparkRestClient.fetchData(
- appId, eventLogSource == EventLogSource.Rest))
-
- val logDerivedData = eventLogSource match {
- case EventLogSource.None => None
- case EventLogSource.Rest => restDerivedData.logDerivedData
- case EventLogSource.WebHdfs =>
- val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
- Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
- }
-
- SparkApplicationData(appId, restDerivedData, logDerivedData)
- }
+ val DEFAULT_TIMEOUT = Duration(60, SECONDS)
+ val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
}
diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala
index 2461c9cf1..fcd05bf04 100644
--- a/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala
+++ b/app/com/linkedin/drelephant/spark/fetchers/SparkLogClient.scala
@@ -16,72 +16,56 @@
package com.linkedin.drelephant.spark.fetchers
-import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
-import java.net.URI
+import java.io.InputStream
+import java.security.PrivilegedAction
import scala.async.Async
-import scala.collection.mutable.HashMap
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
+import com.linkedin.drelephant.security.HadoopSecurity
import com.linkedin.drelephant.spark.data.SparkLogDerivedData
+import com.linkedin.drelephant.util.SparkUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
-import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec, LZFCompressionCodec, SnappyCompressionCodec}
import org.apache.spark.scheduler.{SparkListenerEnvironmentUpdate, SparkListenerEvent}
import org.json4s.{DefaultFormats, JsonAST}
import org.json4s.jackson.JsonMethods
/**
- * A client for getting data from the Spark event logs, using the location configured for spark.eventLog.dir.
- *
- * This client uses webhdfs to access the location, even if spark.eventLog.dir is provided as an hdfs URL.
- *
- * The codecs used by this client use JNI, which results in some weird classloading issues (at least when testing in the console),
- * so some of the client's implementation is non-lazy or synchronous when needed.
+ * A client for getting data from the Spark event logs.
*/
-class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf) {
+class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, eventLogUri: Option[String]) {
import SparkLogClient._
import Async.async
private val logger: Logger = Logger.getLogger(classOf[SparkLogClient])
- private[fetchers] val webhdfsEventLogUri: URI = {
- val eventLogUri = sparkConf.getOption(SPARK_EVENT_LOG_DIR_KEY).map(new URI(_))
- val dfsNamenodeHttpAddress = Option(hadoopConfiguration.get(HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY))
- (eventLogUri, dfsNamenodeHttpAddress) match {
- case (Some(eventLogUri), _) if eventLogUri.getScheme == "webhdfs" =>
- eventLogUri
- case (Some(eventLogUri), Some(dfsNamenodeHttpAddress)) if eventLogUri.getScheme == "hdfs" =>
- val dfsNamenodeHttpUri = new URI(null, dfsNamenodeHttpAddress, null, null, null)
- new URI(s"webhdfs://${eventLogUri.getHost}:${dfsNamenodeHttpUri.getPort}${eventLogUri.getPath}")
- case _ =>
- throw new IllegalArgumentException(
- s"""|${SPARK_EVENT_LOG_DIR_KEY} must be provided as webhdfs:// or hdfs://;
- |if hdfs, ${HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY} must also be provided for port""".stripMargin.replaceAll("\n", " ")
- )
- }
- }
+ private lazy val security: HadoopSecurity = new HadoopSecurity()
- private[fetchers] lazy val fs: FileSystem = FileSystem.get(webhdfsEventLogUri, hadoopConfiguration)
+ protected lazy val sparkUtils: SparkUtils = SparkUtils
- private lazy val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", defaultValue = false)
- private lazy val compressionCodec = if (shouldCompress) Some(compressionCodecFromConf(sparkConf)) else None
- private lazy val compressionCodecShortName = compressionCodec.map(shortNameOfCompressionCodec)
+ def fetchData(appId: String, attemptId: Option[String])(implicit ec: ExecutionContext): Future[SparkLogDerivedData] =
+ doAsPrivilegedAction { () => doFetchData(appId, attemptId) }
- def fetchData(appId: String, attemptId: Option[String])(implicit ec: ExecutionContext): Future[SparkLogDerivedData] = {
- val logPath = getLogPath(webhdfsEventLogUri, appId, attemptId, compressionCodecShortName)
- logger.info(s"looking for logs at ${logPath}")
+ protected def doAsPrivilegedAction[T](action: () => T): T =
+ security.doAs[T](new PrivilegedAction[T] { override def run(): T = action() })
- val codec = compressionCodecForLogName(sparkConf, logPath.getName)
+ protected def doFetchData(
+ appId: String,
+ attemptId: Option[String]
+ )(
+ implicit ec: ExecutionContext
+ ): Future[SparkLogDerivedData] = {
+ val (eventLogFileSystem, baseEventLogPath) =
+ sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, eventLogUri)
+ val (eventLogPath, eventLogCodec) =
+ sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)
- // Limit scope of async.
async {
- resource.managed { openEventLog(sparkConf, logPath, fs) }
- .acquireAndGet { in => findDerivedData(codec.map { _.compressedInputStream(in) }.getOrElse(in)) }
+ sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
}
@@ -89,9 +73,6 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf) {
object SparkLogClient {
import JsonAST._
- val SPARK_EVENT_LOG_DIR_KEY = "spark.eventLog.dir"
- val HADOOP_DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"
-
private implicit val formats: DefaultFormats = DefaultFormats
def findDerivedData(in: InputStream, eventsLimit: Option[Int] = None): SparkLogDerivedData = {
@@ -123,85 +104,6 @@ object SparkLogClient {
// https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
// https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/util/Utils.scala
- private val IN_PROGRESS = ".inprogress"
- private val DEFAULT_COMPRESSION_CODEC = "snappy"
-
- private val compressionCodecClassNamesByShortName = Map(
- "lz4" -> classOf[LZ4CompressionCodec].getName,
- "lzf" -> classOf[LZFCompressionCodec].getName,
- "snappy" -> classOf[SnappyCompressionCodec].getName
- )
-
- // A cache for compression codecs to avoid creating the same codec many times
- private val compressionCodecMap = HashMap.empty[String, CompressionCodec]
-
- private def compressionCodecFromConf(conf: SparkConf): CompressionCodec = {
- val codecName = conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)
- loadCompressionCodec(conf, codecName)
- }
-
- private def loadCompressionCodec(conf: SparkConf, codecName: String): CompressionCodec = {
- val codecClass = compressionCodecClassNamesByShortName.getOrElse(codecName.toLowerCase, codecName)
- val classLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader)
- val codec = try {
- val ctor = Class.forName(codecClass, true, classLoader).getConstructor(classOf[SparkConf])
- Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
- } catch {
- case e: ClassNotFoundException => None
- case e: IllegalArgumentException => None
- }
- codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. "))
- }
-
- private def shortNameOfCompressionCodec(compressionCodec: CompressionCodec): String = {
- val codecName = compressionCodec.getClass.getName
- if (compressionCodecClassNamesByShortName.contains(codecName)) {
- codecName
- } else {
- compressionCodecClassNamesByShortName
- .collectFirst { case (k, v) if v == codecName => k }
- .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
- }
- }
-
- private def getLogPath(
- logBaseDir: URI,
- appId: String,
- appAttemptId: Option[String],
- compressionCodecName: Option[String] = None
- ): Path = {
- val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
- val codec = compressionCodecName.map("." + _).getOrElse("")
- if (appAttemptId.isDefined) {
- new Path(base + "_" + sanitize(appAttemptId.get) + codec)
- } else {
- new Path(base + codec)
- }
- }
-
- private def openEventLog(conf: SparkConf, logPath: Path, fs: FileSystem): InputStream = {
- // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
- // IOException when a file does not exist, so try our best to throw a proper exception.
- if (!fs.exists(logPath)) {
- throw new FileNotFoundException(s"File ${logPath} does not exist.")
- }
-
- new BufferedInputStream(fs.open(logPath))
- }
-
- private[fetchers] def compressionCodecForLogName(conf: SparkConf, logName: String): Option[CompressionCodec] = {
- // Compression codec is encoded as an extension, e.g. app_123.lzf
- // Since we sanitize the app ID to not include periods, it is safe to split on it
- val logBaseName = logName.stripSuffix(IN_PROGRESS)
- logBaseName.split("\\.").tail.lastOption.map { codecName =>
- compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(conf, codecName))
- }
- }
-
- private def sanitize(str: String): String = {
- str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
- }
-
private def sparkEventFromJson(json: JValue): Option[SparkListenerEvent] = {
val environmentUpdate = getFormattedClassName(SparkListenerEnvironmentUpdate)
diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
index a5c1bb31e..55381831c 100644
--- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
+++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.linkedin.drelephant.spark.data.{SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary, JobData, StageData}
+import com.linkedin.drelephant.util.SparkUtils
import javax.ws.rs.client.{Client, ClientBuilder, WebTarget}
import javax.ws.rs.core.MediaType
@@ -124,7 +125,7 @@ class SparkRestClient(sparkConf: SparkConf) {
logger.warn(s"failed to resolve log for ${target.getUri}")
None
} else {
- val codec = SparkLogClient.compressionCodecForLogName(sparkConf, entry.getName)
+ val codec = SparkUtils.compressionCodecForLogName(sparkConf, entry.getName)
Some(SparkLogClient.findDerivedData(
codec.map { _.compressedInputStream(zis) }.getOrElse(zis)))
}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala
new file mode 100644
index 000000000..2276a00f7
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata
+
+import java.util.Date
+
+import scala.collection.JavaConverters
+import scala.util.Try
+
+import com.linkedin.drelephant.spark.fetchers.statusapiv1._
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.api.v1.StageStatus
+
+/**
+ * Converters for legacy SparkApplicationData to current SparkApplicationData.
+ *
+ * The converters make a best effort, providing default values for attributes the legacy data doesn't provide.
+ * In practice, the Dr. Elephant Spark heuristics end up using a relatively small subset of the converted data.
+ */
+object LegacyDataConverters {
+ import JavaConverters._
+
+ def convert(legacyData: SparkApplicationData): com.linkedin.drelephant.spark.data.SparkApplicationData = {
+ com.linkedin.drelephant.spark.data.SparkApplicationData(
+ legacyData.getAppId,
+ extractAppConfigurationProperties(legacyData),
+ extractApplicationInfo(legacyData),
+ extractJobDatas(legacyData),
+ extractStageDatas(legacyData),
+ extractExecutorSummaries(legacyData)
+ )
+ }
+
+ def extractAppConfigurationProperties(legacyData: SparkApplicationData): Map[String, String] =
+ legacyData.getEnvironmentData.getSparkProperties.asScala.toMap
+
+ def extractApplicationInfo(legacyData: SparkApplicationData): ApplicationInfo = {
+ val generalData = legacyData.getGeneralData
+ new ApplicationInfo(
+ generalData.getApplicationId,
+ generalData.getApplicationName,
+ Seq(
+ new ApplicationAttemptInfo(
+ Some("1"),
+ new Date(generalData.getStartTime),
+ new Date(generalData.getEndTime),
+ generalData.getSparkUser,
+ completed = true
+ )
+ )
+ )
+ }
+
+ def extractJobDatas(legacyData: SparkApplicationData): Seq[JobData] = {
+ val jobProgressData = legacyData.getJobProgressData
+
+ def extractJobData(jobId: Int): JobData = {
+ val jobInfo = jobProgressData.getJobInfo(jobId)
+ new JobData(
+ jobInfo.jobId,
+ jobInfo.jobId.toString,
+ description = None,
+ submissionTime = None,
+ completionTime = None,
+ jobInfo.stageIds.asScala.map { _.toInt },
+ Option(jobInfo.jobGroup),
+ extractJobExecutionStatus(jobId),
+ jobInfo.numTasks,
+ jobInfo.numActiveTasks,
+ jobInfo.numCompletedTasks,
+ jobInfo.numSkippedTasks,
+ jobInfo.numFailedTasks,
+ jobInfo.numActiveStages,
+ jobInfo.completedStageIndices.size(),
+ jobInfo.numSkippedStages,
+ jobInfo.numFailedStages
+ )
+ }
+
+ def extractJobExecutionStatus(jobId: Int): JobExecutionStatus = {
+ if (jobProgressData.getCompletedJobs.contains(jobId)) {
+ JobExecutionStatus.SUCCEEDED
+ } else if (jobProgressData.getFailedJobs.contains(jobId)) {
+ JobExecutionStatus.FAILED
+ } else {
+ JobExecutionStatus.UNKNOWN
+ }
+ }
+
+ val sortedJobIds = jobProgressData.getJobIds.asScala.toSeq.sorted
+ sortedJobIds.map { jobId => extractJobData(jobId) }
+ }
+
+ def extractStageDatas(legacyData: SparkApplicationData): Seq[StageData] = {
+ val jobProgressData = legacyData.getJobProgressData
+
+ def extractStageData(stageAttemptId: SparkJobProgressData.StageAttemptId): StageData = {
+ val stageInfo = jobProgressData.getStageInfo(stageAttemptId.stageId, stageAttemptId.attemptId)
+ new StageData(
+ extractStageStatus(stageAttemptId),
+ stageAttemptId.stageId,
+ stageAttemptId.attemptId,
+ stageInfo.numActiveTasks,
+ stageInfo.numCompleteTasks,
+ stageInfo.numFailedTasks,
+ stageInfo.executorRunTime,
+ stageInfo.inputBytes,
+ inputRecords = 0,
+ stageInfo.outputBytes,
+ outputRecords = 0,
+ stageInfo.shuffleReadBytes,
+ shuffleReadRecords = 0,
+ stageInfo.shuffleWriteBytes,
+ shuffleWriteRecords = 0,
+ stageInfo.memoryBytesSpilled,
+ stageInfo.diskBytesSpilled,
+ stageInfo.name,
+ stageInfo.description,
+ schedulingPool = "",
+ accumulatorUpdates = Seq.empty,
+ tasks = None,
+ executorSummary = None
+ )
+ }
+
+ def extractStageStatus(stageAttemptId: SparkJobProgressData.StageAttemptId): StageStatus = {
+ if (jobProgressData.getCompletedStages.contains(stageAttemptId)) {
+ StageStatus.COMPLETE
+ } else if (jobProgressData.getFailedStages.contains(stageAttemptId)) {
+ StageStatus.FAILED
+ } else {
+ StageStatus.PENDING
+ }
+ }
+
+ val sortedStageAttemptIds = jobProgressData.getStageAttemptIds.asScala.toSeq.sortBy { stageAttemptId =>
+ (stageAttemptId.stageId, stageAttemptId.attemptId)
+ }
+ sortedStageAttemptIds.map { stageAttemptId => extractStageData(stageAttemptId) }
+ }
+
+ def extractExecutorSummaries(legacyData: SparkApplicationData): Seq[ExecutorSummary] = {
+ val executorData = legacyData.getExecutorData
+
+ def extractExecutorSummary(executorId: String): ExecutorSummary = {
+ val executorInfo = executorData.getExecutorInfo(executorId)
+ new ExecutorSummary(
+ executorInfo.execId,
+ executorInfo.hostPort,
+ executorInfo.rddBlocks,
+ executorInfo.memUsed,
+ executorInfo.diskUsed,
+ executorInfo.activeTasks,
+ executorInfo.failedTasks,
+ executorInfo.completedTasks,
+ executorInfo.totalTasks,
+ executorInfo.duration,
+ executorInfo.inputBytes,
+ executorInfo.shuffleRead,
+ executorInfo.shuffleWrite,
+ executorInfo.maxMem,
+ executorLogs = Map.empty
+ )
+ }
+
+ val sortedExecutorIds = {
+ val executorIds = executorData.getExecutors.asScala.toSeq
+ Try(executorIds.sortBy { _.toInt }).getOrElse(executorIds.sorted)
+ }
+ sortedExecutorIds.map { executorId => extractExecutorSummary(executorId) }
+ }
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkApplicationData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkApplicationData.java
new file mode 100644
index 000000000..dfb5b9d3f
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkApplicationData.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import com.linkedin.drelephant.analysis.HadoopApplicationData;
+
+
+/**
+ * This holds a collection of all SparkApplicationData
+ */
+public interface SparkApplicationData extends HadoopApplicationData {
+
+ public boolean isThrottled();
+
+ public SparkGeneralData getGeneralData();
+
+ public SparkEnvironmentData getEnvironmentData();
+
+ public SparkExecutorData getExecutorData();
+
+ public SparkJobProgressData getJobProgressData();
+
+ public SparkStorageData getStorageData();
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkEnvironmentData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkEnvironmentData.java
new file mode 100644
index 000000000..1afc7f1b2
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkEnvironmentData.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import java.util.Properties;
+
+
+/**
+ * This data class holds Spark environment data (Spark properties, JVM properties and etc.)
+ */
+public class SparkEnvironmentData {
+ private final Properties _sparkProperties;
+ private final Properties _systemProperties;
+
+ public SparkEnvironmentData() {
+ _sparkProperties = new Properties();
+ _systemProperties = new Properties();
+ }
+
+ public void addSparkProperty(String key, String value) {
+ _sparkProperties.put(key, value);
+ }
+
+ public void addSystemProperty(String key, String value) {
+ _systemProperties.put(key, value);
+ }
+
+ public String getSparkProperty(String key) {
+ return _sparkProperties.getProperty(key);
+ }
+
+ public String getSparkProperty(String key, String defaultValue) {
+ String val = getSparkProperty(key);
+ if (val == null) {
+ return defaultValue;
+ }
+ return val;
+ }
+
+ public String getSystemProperty(String key) {
+ return _systemProperties.getProperty(key);
+ }
+
+ public Properties getSparkProperties() {
+ return _sparkProperties;
+ }
+
+ public Properties getSystemProperties() {
+ return _systemProperties;
+ }
+
+ @Override
+ public String toString() {
+ return _sparkProperties.toString() + "\n\n\n" + _systemProperties.toString();
+ }
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java
new file mode 100644
index 000000000..7b0fcb5c2
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * This class contains Spark executor information.
+ */
+public class SparkExecutorData {
+ public static final String EXECUTOR_DRIVER_NAME = "driver";
+
+ public static class ExecutorInfo {
+ public String execId;
+ public String hostPort;
+ public int rddBlocks = 0;
+ public long memUsed = 0L;
+ public long maxMem = 0L;
+ public long diskUsed = 0L;
+
+ public int activeTasks = 0;
+ public int completedTasks = 0;
+ public int failedTasks = 0;
+ public int totalTasks = 0;
+ public long duration = 0L;
+ public long inputBytes = 0L;
+ public long outputBytes = 0L;
+ public long shuffleRead = 0L;
+ public long shuffleWrite = 0L;
+
+ public String toString() {
+ return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ + ", shuffleWrite: " + shuffleWrite + "}";
+ }
+ }
+
+ private final Map _executorInfoMap = new HashMap();
+
+ public void setExecutorInfo(String executorId, ExecutorInfo info) {
+ _executorInfoMap.put(executorId, info);
+ }
+
+ public ExecutorInfo getExecutorInfo(String executorId) {
+ return _executorInfoMap.get(executorId);
+ }
+
+ public Set getExecutors() {
+ return _executorInfoMap.keySet();
+ }
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkGeneralData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkGeneralData.java
new file mode 100644
index 000000000..ed251446a
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkGeneralData.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import java.util.Set;
+
+
+/**
+ * This class holds Spark application information
+ */
+public class SparkGeneralData {
+ private Set _adminAcls;
+ private Set _viewAcls;
+ private String _applicationId;
+ private String _applicationName;
+ private String _sparkUser;
+ private long _startTime;
+ private long _endTime;
+
+ public Set getAdminAcls() {
+ return _adminAcls;
+ }
+
+ public void setAdminAcls(Set adminAcls) {
+ _adminAcls = adminAcls;
+ }
+
+ public Set getViewAcls() {
+ return _viewAcls;
+ }
+
+ public void setViewAcls(Set viewAcls) {
+ _viewAcls = viewAcls;
+ }
+
+ public String getApplicationId() {
+ return _applicationId;
+ }
+
+ public void setApplicationId(String applicationId) {
+ _applicationId = applicationId;
+ }
+
+ public String getApplicationName() {
+ return _applicationName;
+ }
+
+ public void setApplicationName(String applicationName) {
+ _applicationName = applicationName;
+ }
+
+ public String getSparkUser() {
+ return _sparkUser;
+ }
+
+ public void setSparkUser(String sparkUser) {
+ _sparkUser = sparkUser;
+ }
+
+ public long getStartTime() {
+ return _startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ _startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return _endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ _endTime = endTime;
+ }
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkJobProgressData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkJobProgressData.java
new file mode 100644
index 000000000..81a0f269c
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkJobProgressData.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class represents information contained in a job runtime process.
+ */
+public class SparkJobProgressData {
+ private static final Logger logger = Logger.getLogger(SparkJobProgressData.class);
+ private final Map _jobIdToInfo = new HashMap();
+ private final Set _completedJobs = new HashSet();
+ private final Set _failedJobs = new HashSet();
+
+ private final Map _stageIdToInfo = new HashMap();
+ private final Set _completedStages = new HashSet();
+ private final Set _failedStages = new HashSet();
+
+ public void addJobInfo(int jobId, JobInfo info) {
+ _jobIdToInfo.put(jobId, info);
+ }
+
+ public void addCompletedJob(int jobId) {
+ _completedJobs.add(jobId);
+ }
+
+ public void addFailedJob(int jobId) {
+ _failedJobs.add(jobId);
+ }
+
+ public void addStageInfo(int stageId, int attemptId, StageInfo info) {
+ _stageIdToInfo.put(new StageAttemptId(stageId, attemptId), info);
+ }
+
+ public void addCompletedStages(int stageId, int attemptId) {
+ _completedStages.add(new StageAttemptId(stageId, attemptId));
+ }
+
+ public void addFailedStages(int stageId, int attemptId) {
+ _failedStages.add(new StageAttemptId(stageId, attemptId));
+ }
+
+ public Set getJobIds() {
+ return _jobIdToInfo.keySet();
+ }
+
+ public Set getStageAttemptIds() {
+ return _stageIdToInfo.keySet();
+ }
+
+ public Set getCompletedJobs() {
+ return _completedJobs;
+ }
+
+ public Set getFailedJobs() {
+ return _failedJobs;
+ }
+
+ private static double getFailureRate(int numCompleted, int numFailed) {
+ int num = numCompleted + numFailed;
+
+ if (num == 0) {
+ return 0d;
+ }
+
+ return numFailed * 1.0d / num;
+ }
+
+ public double getJobFailureRate() {
+ return getFailureRate(_completedJobs.size(), _failedJobs.size());
+ }
+
+ public double getStageFailureRate() {
+ return getFailureRate(_completedStages.size(), _failedStages.size());
+ }
+
+ public JobInfo getJobInfo(int jobId) {
+ return _jobIdToInfo.get(jobId);
+ }
+
+ public StageInfo getStageInfo(int stageId, int attemptId) {
+ return _stageIdToInfo.get(new StageAttemptId(stageId, attemptId));
+ }
+
+ public Set getCompletedStages() {
+ return _completedStages;
+ }
+
+ public Set getFailedStages() {
+ return _failedStages;
+ }
+
+ /**
+ * Job itself does not have a name, it will use its latest stage as the name.
+ *
+ * @param jobId
+ * @return
+ */
+ public String getJobDescription(int jobId) {
+ List stageIds = _jobIdToInfo.get(jobId).stageIds;
+ int id = -1;
+ for (int stageId : stageIds) {
+ id = Math.max(id, stageId);
+ }
+ if (id == -1) {
+ logger.error("Spark Job id [" + jobId + "] does not contain any stage.");
+ return null;
+ }
+ return _stageIdToInfo.get(new StageAttemptId(id, 0)).name;
+ }
+
+ public List getFailedJobDescriptions() {
+ List result = new ArrayList();
+ for (int id : _failedJobs) {
+ result.add(getJobDescription(id));
+ }
+ return result;
+ }
+
+ // For debug purpose
+ public String toString() {
+ StringBuilder s = new StringBuilder();
+ s.append("JobInfo: [");
+
+ for (Map.Entry entry : _jobIdToInfo.entrySet()) {
+ s.append("{id:" + entry.getKey() + ", value: " + entry.getValue() + "}");
+ }
+
+ s.append("]\nStageInfo: [");
+ for (Map.Entry entry : _stageIdToInfo.entrySet()) {
+ s.append("{id:" + entry.getKey() + ", value: " + entry.getValue() + "}");
+ }
+ s.append("]");
+
+ return s.toString();
+ }
+
+ public static class StageAttemptId {
+ public int stageId;
+ public int attemptId;
+
+ public StageAttemptId(int stageId, int attemptId) {
+ this.stageId = stageId;
+ this.attemptId = attemptId;
+ }
+
+ @Override
+ public int hashCode() {
+ return new Integer(stageId).hashCode() * 31 + new Integer(attemptId).hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StageAttemptId) {
+ StageAttemptId other = (StageAttemptId) obj;
+ return stageId == other.stageId && attemptId == other.attemptId;
+ }
+ return false;
+ }
+
+ public String toString() {
+ return "id: " + stageId + " # attemptId: " + attemptId;
+ }
+ }
+
+ public static class JobInfo {
+ public int jobId;
+ public String jobGroup;
+ public long startTime;
+ public long endTime;
+ public final List stageIds = new ArrayList();
+
+ /* Tasks */
+ public int numTasks = 0;
+ public int numActiveTasks = 0;
+ public int numCompletedTasks = 0;
+ public int numSkippedTasks = 0;
+ public int numFailedTasks = 0;
+
+ /* Stages */
+ public int numActiveStages = 0;
+ // This needs to be a set instead of a simple count to prevent double-counting of rerun stages:
+ public final Set completedStageIndices = new HashSet();
+ public int numSkippedStages = 0;
+ public int numFailedStages = 0;
+
+ public void addStageId(int stageId) {
+ stageIds.add(stageId);
+ }
+
+ public double getFailureRate() {
+ return SparkJobProgressData.getFailureRate(numCompletedTasks, numFailedTasks);
+ }
+
+ public String toString() {
+ return String.format("{jobId:%s, jobGroup:%s, startTime:%s, endTime:%s, numTask:%s, numActiveTasks:%s, "
+ + "numCompletedTasks:%s, numSkippedTasks:%s, numFailedTasks:%s, numActiveStages:%s, "
+ + "completedStageIndices:%s, stages:%s, numSkippedStages:%s, numFailedStages:%s}", jobId, jobGroup,
+ startTime, endTime, numTasks, numActiveTasks, numCompletedTasks, numSkippedTasks, numFailedTasks,
+ numActiveStages, getListString(completedStageIndices), getListString(stageIds), numSkippedStages,
+ numFailedStages);
+ }
+ }
+
+ public static class StageInfo {
+ public int numActiveTasks;
+ public int numCompleteTasks;
+ public final Set completedIndices = new HashSet();
+ public int numFailedTasks;
+
+ // Total accumulated executor runtime
+ public long executorRunTime;
+ // Total stage duration
+ public long duration;
+
+ // Note, currently calculating I/O speed on stage level does not make sense
+ // since we do not have information about specific I/O time.
+ public long inputBytes = 0;
+ public long outputBytes = 0;
+ public long shuffleReadBytes = 0;
+ public long shuffleWriteBytes = 0;
+ public long memoryBytesSpilled = 0;
+ public long diskBytesSpilled = 0;
+
+ public String name;
+ public String description;
+
+ public double getFailureRate() {
+ return SparkJobProgressData.getFailureRate(numCompleteTasks, numFailedTasks);
+ }
+
+ // TODO: accumulables info seem to be unnecessary, might might be useful later on
+ // sample code from Spark source: var accumulables = new HashMap[Long, AccumulableInfo]
+
+ @Override
+ public String toString() {
+ return String.format("{numActiveTasks:%s, numCompleteTasks:%s, completedIndices:%s, numFailedTasks:%s,"
+ + " executorRunTime:%s, inputBytes:%s, outputBytes:%s, shuffleReadBytes:%s, shuffleWriteBytes:%s,"
+ + " memoryBytesSpilled:%s, diskBytesSpilled:%s, name:%s, description:%s}",
+ numActiveTasks, numCompleteTasks, getListString(completedIndices), numFailedTasks, executorRunTime,
+ inputBytes, outputBytes, shuffleReadBytes, shuffleWriteBytes, memoryBytesSpilled, diskBytesSpilled, name,
+ description);
+ }
+ }
+
+ private static String getListString(Collection collection) {
+ return "[" + StringUtils.join(collection, ",") + "]";
+ }
+}
diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkStorageData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkStorageData.java
new file mode 100644
index 000000000..0145848a3
--- /dev/null
+++ b/app/com/linkedin/drelephant/spark/legacydata/SparkStorageData.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import java.util.List;
+import org.apache.spark.storage.RDDInfo;
+import org.apache.spark.storage.StorageStatus;
+
+
+/**
+ * This class holds information related to Spark storage (RDDs specifically) information.
+ */
+public class SparkStorageData {
+ private List _rddInfoList;
+ private List _storageStatusList;
+
+ public List getRddInfoList() {
+ return _rddInfoList;
+ }
+
+ public void setRddInfoList(List rddInfoList) {
+ _rddInfoList = rddInfoList;
+ }
+
+ public List getStorageStatusList() {
+ return _storageStatusList;
+ }
+
+ public void setStorageStatusList(List storageStatusList) {
+ _storageStatusList = storageStatusList;
+ }
+}
diff --git a/app/com/linkedin/drelephant/util/HadoopUtils.scala b/app/com/linkedin/drelephant/util/HadoopUtils.scala
new file mode 100644
index 000000000..8f37b4a32
--- /dev/null
+++ b/app/com/linkedin/drelephant/util/HadoopUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.util
+
+import java.io.InputStream
+import java.net.{HttpURLConnection, URL}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL
+import org.apache.log4j.Logger
+
+trait HadoopUtils {
+ val DFS_NAMESERVICES_KEY = "dfs.nameservices"
+ val DFS_HA_NAMENODES_KEY = "dfs.ha.namenodes"
+ val DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"
+
+ protected def logger: Logger
+
+ def findHaNameNodeAddress(conf: Configuration): Option[String] = {
+
+ def findNameNodeAddressInNameServices(nameServices: Array[String]): Option[String] = nameServices match {
+ case Array(nameService) => {
+ val ids = Option(conf.get(s"${DFS_HA_NAMENODES_KEY}.${nameService}")).map { _.split(",") }
+ val namenodeAddress = ids.flatMap { findNameNodeAddressInNameService(nameService, _) }
+ namenodeAddress match {
+ case Some(address) => logger.info(s"Active namenode for ${nameService}: ${address}")
+ case None => logger.info(s"No active namenode for ${nameService}.")
+ }
+ namenodeAddress
+ }
+ case Array() => {
+ logger.info("No name services found.")
+ None
+ }
+ case _ => {
+ logger.info("Multiple name services found. HDFS federation is not supported right now.")
+ None
+ }
+ }
+
+ def findNameNodeAddressInNameService(nameService: String, nameNodeIds: Array[String]): Option[String] =
+ nameNodeIds
+ .flatMap { id => Option(conf.get(s"${DFS_NAMENODE_HTTP_ADDRESS_KEY}.${nameService}.${id}")) }
+ .find(isActiveNameNode)
+
+ val nameServices = Option(conf.get(DFS_NAMESERVICES_KEY)).map { _.split(",") }
+ nameServices.flatMap(findNameNodeAddressInNameServices)
+ }
+
+ def httpNameNodeAddress(conf: Configuration): Option[String] = Option(conf.get(DFS_NAMENODE_HTTP_ADDRESS_KEY))
+
+ def isActiveNameNode(hostAndPort: String): Boolean = {
+ val url = new URL(s"http://${hostAndPort}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus")
+ val conn = newAuthenticatedConnection(url)
+ try {
+ val in = conn.getInputStream()
+ try {
+ isActiveNameNode(in)
+ } finally {
+ in.close()
+ }
+ } finally {
+ conn.disconnect()
+ }
+ }
+
+ protected def isActiveNameNode(in: InputStream): Boolean =
+ new ObjectMapper().readTree(in).path("beans").get(0).path("State").textValue() == "active"
+
+ protected def newAuthenticatedConnection(url: URL): HttpURLConnection = {
+ val token = new AuthenticatedURL.Token()
+ val authenticatedURL = new AuthenticatedURL()
+ authenticatedURL.openConnection(url, token)
+ }
+}
+
+object HadoopUtils extends HadoopUtils {
+ override protected lazy val logger = Logger.getLogger(classOf[HadoopUtils])
+}
diff --git a/app/com/linkedin/drelephant/util/SparkUtils.scala b/app/com/linkedin/drelephant/util/SparkUtils.scala
index 3a0354070..e7efd9d84 100644
--- a/app/com/linkedin/drelephant/util/SparkUtils.scala
+++ b/app/com/linkedin/drelephant/util/SparkUtils.scala
@@ -16,16 +16,130 @@
package com.linkedin.drelephant.util
-import java.io.{File, FileInputStream, InputStreamReader}
+import java.io.{BufferedInputStream, File, FileInputStream, FileNotFoundException, InputStream, InputStreamReader}
+import java.net.URI
import java.util.Properties
import scala.collection.JavaConverters
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path, PathFilter, FileStatus}
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec, LZFCompressionCodec, SnappyCompressionCodec}
trait SparkUtils {
import JavaConverters._
- def defaultEnv: Map[String, String]
+ protected def logger: Logger
+
+ protected def hadoopUtils: HadoopUtils
+
+ protected def defaultEnv: Map[String, String]
+
+ val SPARK_EVENT_LOG_DIR_KEY = "spark.eventLog.dir"
+ val SPARK_EVENT_LOG_COMPRESS_KEY = "spark.eventLog.compress"
+ val DFS_HTTP_PORT = 50070
+
+ /**
+ * Returns the webhdfs FileSystem and Path for the configured Spark event log directory and optionally the
+ * configured Hadoop namenode.
+ *
+ * Primarily the FileSystem and Path are based on spark.eventLog.dir but if spark.eventLog.dir is a simple path,
+ * then it is combined with the namenode info from the Hadoop configuration.
+ *
+ * @param hadoopConfiguration a Hadoop configuration containing namenode info
+ * @param sparkConf a Spark configuration with the Spark event log directory setting
+ * @return a tuple (FileSystem, Path) for the configured Spark event log directory
+ */
+ def fileSystemAndPathForEventLogDir(hadoopConfiguration: Configuration,
+ sparkConf: SparkConf,
+ uriFromFetcherConf : Option[String]): (FileSystem, Path) = {
+ if(uriFromFetcherConf.isDefined) {
+ logger.info(s"Using log location from FetcherConf ${uriFromFetcherConf}")
+ val uri = new URI(uriFromFetcherConf.get)
+ (FileSystem.get(uri, hadoopConfiguration), new Path(uri.getPath))
+ } else {
+ val eventLogUri = sparkConf.getOption(SPARK_EVENT_LOG_DIR_KEY).map(new URI(_))
+ eventLogUri match {
+ case Some(uri) if uri.getScheme == "webhdfs" =>
+ (FileSystem.get(uri, hadoopConfiguration), new Path(uri.getPath))
+ case Some(uri) if uri.getScheme == "hdfs" =>
+ (FileSystem.get(new URI(s"webhdfs://${uri.getHost}:${DFS_HTTP_PORT}${uri.getPath}"), hadoopConfiguration), new Path(uri.getPath))
+ case Some(uri) =>
+ val nameNodeAddress
+ = hadoopUtils.findHaNameNodeAddress(hadoopConfiguration)
+ .orElse(hadoopUtils.httpNameNodeAddress(hadoopConfiguration))
+ nameNodeAddress match {
+ case Some(address) =>
+ (FileSystem.get(new URI(s"webhdfs://${address}${uri.getPath}"), hadoopConfiguration), new Path(uri.getPath))
+ case None =>
+ throw new IllegalArgumentException("Couldn't find configured namenode")
+ }
+ case None =>
+ throw new IllegalArgumentException("${SPARK_EVENT_LOG_DIR_KEY} not provided")
+ }
+ }
+ }
+
+ /**
+ * Returns the path and codec for the event log for the given app and attempt.
+ *
+ * This invokes JNI to get the codec, so it must be done synchronously, otherwise weird classloading issues will
+ * manifest (at least they manifest during testing).
+ *
+ * The path and codec can then be passed to withEventLog, which can be called asynchronously.
+ *
+ * @param sparkConf the Spark configuration with the setting for whether Spark event logs are compressed
+ * @param fs the filesystem which contains the logs
+ * @param basePath the base path for logs on the given filesystem
+ * @param appId the app identifier to use for the specific log file
+ * @param attemptId the attempt identifier to use for the specific log file
+ * @return a tuple (Path, Option[CompressionCodec]) for the specific event log file and the codec to use
+ */
+ def pathAndCodecforEventLog(
+ sparkConf: SparkConf,
+ fs: FileSystem,
+ basePath: Path,
+ appId: String,
+ attemptId: Option[String]
+ ): (Path, Option[CompressionCodec]) = {
+ attemptId match {
+ // if attemptid is given, use the existing method
+ case x: Some[String] => { val path = {
+ val shouldUseCompression = sparkConf.getBoolean(SPARK_EVENT_LOG_COMPRESS_KEY, defaultValue = false)
+ val compressionCodecShortName =
+ if (shouldUseCompression) Some(shortNameOfCompressionCodec(compressionCodecFromConf(sparkConf))) else None
+ getLogPath(fs.getUri.resolve(basePath.toUri), appId, attemptId, compressionCodecShortName)
+ }
+ val codec = compressionCodecForLogName(sparkConf, path.getName())
+ (path, codec)
+ }
+ case None => {
+ val (logPath, codecName) = getLogPathAndCodecName(fs, fs.getUri.resolve(basePath.toUri), appId)
+
+ (logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
+ }
+ }
+
+ }
+
+ /**
+ * A loan method that performs the given function on the loaned event log inputstream, and closes it after use.
+ *
+ * The method arguments should have been attained from fileSystemAndPathForEventLogDir and pathAndCodecforEventLog.
+ *
+ * @param fs the filesystem which contains the log
+ * @param path the full path to the log
+ * @param codec the codec to use for the log
+ */
+ def withEventLog[T](fs: FileSystem, path: Path, codec: Option[CompressionCodec])(f: InputStream => T): T = {
+ resource.managed { openEventLog(path, fs) }
+ .map { in => codec.map { _.compressedInputStream(in) }.getOrElse(in) }
+ .acquireAndGet(f)
+ }
// Below this line are modified utility methods from
// https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -55,8 +169,148 @@ trait SparkUtils {
inReader.close()
}
}
+
+ def compressionCodecForLogName(conf: SparkConf, logName: String): Option[CompressionCodec] = {
+ // Compression codec is encoded as an extension, e.g. app_123.lzf
+ // Since we sanitize the app ID to not include periods, it is safe to split on it
+ val logBaseName = logName.stripSuffix(IN_PROGRESS)
+ logBaseName.split("\\.").tail.lastOption.map { codecName =>
+ compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(conf, codecName))
+ }
+ }
+
+ private val IN_PROGRESS = ".inprogress"
+ private val DEFAULT_COMPRESSION_CODEC = "snappy"
+
+ private val compressionCodecClassNamesByShortName = Map(
+ "lz4" -> classOf[LZ4CompressionCodec].getName,
+ "lzf" -> classOf[LZFCompressionCodec].getName,
+ "snappy" -> classOf[SnappyCompressionCodec].getName
+ )
+
+ // A cache for compression codecs to avoid creating the same codec many times
+ private val compressionCodecMap = HashMap.empty[String, CompressionCodec]
+
+ private def compressionCodecFromConf(conf: SparkConf): CompressionCodec = {
+ val codecName = conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)
+ loadCompressionCodec(conf, codecName)
+ }
+
+ private def loadCompressionCodec(conf: SparkConf, codecName: String): CompressionCodec = {
+ val codecClass = compressionCodecClassNamesByShortName.getOrElse(codecName.toLowerCase, codecName)
+ val classLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader)
+ val codec = try {
+ val ctor = Class.forName(codecClass, true, classLoader).getConstructor(classOf[SparkConf])
+ Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+ } catch {
+ case e: ClassNotFoundException => None
+ case e: IllegalArgumentException => None
+ }
+ codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. "))
+ }
+
+ private def shortNameOfCompressionCodec(compressionCodec: CompressionCodec): String = {
+ val codecName = compressionCodec.getClass.getName
+ if (compressionCodecClassNamesByShortName.contains(codecName)) {
+ codecName
+ } else {
+ compressionCodecClassNamesByShortName
+ .collectFirst { case (k, v) if v == codecName => k }
+ .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
+ }
+ }
+
+ private def splitLogPath( logPath: String) : (Option[String],Option[String],Option[String]) = {
+ var extension: Option[String] = None
+ var attempt: Option[String] = None
+ var appId: Option[String] = None
+ val nameAndExtension = logPath.split('.')
+ if( nameAndExtension.length == 2 ) {
+ extension = Some(nameAndExtension(1))
+ val name = nameAndExtension(0)
+ val appIdAndAttempt = name.split('_')
+ if( appIdAndAttempt.length == 4 ) {
+ attempt = Some(appIdAndAttempt(3))
+ appId = Some(appIdAndAttempt.dropRight(1).mkString("_"))
+ } else {
+ appId = Some(name)
+ }
+ }
+ (appId, attempt, extension)
+ }
+ private def getLogPathAndCodecName(
+ fs: FileSystem,
+ logBaseDir: URI,
+ appId: String
+ ): (Path, String) = {
+ val base = logBaseDir.toString.stripSuffix("/");
+ val filter = new PathFilter() {
+ override def accept(file: Path): Boolean = {
+ file.getName().startsWith(appId);
+ }
+ }
+ val attemptsList = fs.listStatus(new Path(base), filter)
+ val finalAttempt = attemptsList.length match {
+ case 0 => throw new FileNotFoundException(s"logfile does not exist for ${appId}.")
+ case 1 => splitLogPath(attemptsList(0).getPath().getName())
+ case _ => attemptsList.
+ map( x => splitLogPath(x.getPath().getName())).
+ sortWith( (x,y) => x._2.getOrElse("-1").toInt > y._2.getOrElse("-1").toInt ).
+ head
+ }
+
+ finalAttempt match {
+ // if attemptId is none and the codec is available, use the appid with no attemptid suffix
+ case noAttempt if noAttempt._1 != None & noAttempt._2 == None & noAttempt._3 != None =>
+ (new Path(base +
+ "/" + finalAttempt._1.get +
+ "." + finalAttempt._3.get), finalAttempt._3.get)
+ // if attemptId is available and the codec is available, use the appid with attemptid suffix
+ case attempt if attempt._1 != None & attempt._2 != None & attempt._3 != None =>
+ (new Path(base +
+ "/" + attempt._1.get +
+ "_" + sanitize(finalAttempt._2.get) +
+ "." + finalAttempt._3.get), finalAttempt._3.get)
+ // if codec is not available, but we found a file match with appId, use the actual file Path from the first match
+ case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), DEFAULT_COMPRESSION_CODEC)
+
+ // This should be reached only if we can't parse the filename in the path.
+ // Try to construct a general path in that case.
+ case _ => (new Path(base + "/" + appId + "." + DEFAULT_COMPRESSION_CODEC), DEFAULT_COMPRESSION_CODEC)
+ }
+ }
+
+ private def getLogPath(
+ logBaseDir: URI,
+ appId: String,
+ appAttemptId: Option[String],
+ compressionCodecName: Option[String] = None
+ ): Path = {
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isDefined) {
+ new Path(base + "_" + sanitize(appAttemptId.get) + codec)
+ } else {
+ new Path(base + codec)
+ }
+ }
+ private def openEventLog(logPath: Path, fs: FileSystem): InputStream = {
+ // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
+ // IOException when a file does not exist, so try our best to throw a proper exception.
+ if (!fs.exists(logPath)) {
+ throw new FileNotFoundException(s"File ${logPath} does not exist.")
+ }
+
+ new BufferedInputStream(fs.open(logPath))
+ }
+
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+ }
}
object SparkUtils extends SparkUtils {
- override val defaultEnv = sys.env
+ override protected lazy val logger = Logger.getLogger(classOf[SparkUtils])
+ override protected lazy val hadoopUtils = HadoopUtils
+ override protected lazy val defaultEnv = sys.env
}
diff --git a/app/org/apache/spark/deploy/history/SparkDataCollection.scala b/app/org/apache/spark/deploy/history/SparkDataCollection.scala
new file mode 100644
index 000000000..f60fcfa19
--- /dev/null
+++ b/app/org/apache/spark/deploy/history/SparkDataCollection.scala
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.InputStream
+import java.util.{Set => JSet, Properties, List => JList, HashSet => JHashSet, ArrayList => JArrayList}
+
+import scala.collection.mutable
+
+import com.linkedin.drelephant.analysis.ApplicationType
+import com.linkedin.drelephant.spark.legacydata._
+import com.linkedin.drelephant.spark.legacydata.SparkExecutorData.ExecutorInfo
+import com.linkedin.drelephant.spark.legacydata.SparkJobProgressData.JobInfo
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus, StageInfo}
+import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageStatusListener, StorageStatusTrackingListener}
+import org.apache.spark.ui.env.EnvironmentListener
+import org.apache.spark.ui.exec.ExecutorsListener
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.storage.StorageListener
+import org.apache.spark.util.collection.OpenHashSet
+
+/**
+ * This class wraps the logic of collecting the data in SparkEventListeners into the
+ * HadoopApplicationData instances.
+ *
+ * Notice:
+ * This has to live in Spark's scope because ApplicationEventListener is in private[spark] scope. And it is problematic
+ * to compile if written in Java.
+ */
+class SparkDataCollection extends SparkApplicationData {
+ import SparkDataCollection._
+
+ lazy val applicationEventListener = new ApplicationEventListener()
+ lazy val jobProgressListener = new JobProgressListener(new SparkConf())
+ lazy val environmentListener = new EnvironmentListener()
+ lazy val storageStatusListener = new StorageStatusListener()
+ lazy val executorsListener = new ExecutorsListener(storageStatusListener)
+ lazy val storageListener = new StorageListener(storageStatusListener)
+
+ // This is a customized listener that tracks peak used memory
+ // The original listener only tracks the current in use memory which is useless in offline scenario.
+ lazy val storageStatusTrackingListener = new StorageStatusTrackingListener()
+
+ private var _applicationData: SparkGeneralData = null;
+ private var _jobProgressData: SparkJobProgressData = null;
+ private var _environmentData: SparkEnvironmentData = null;
+ private var _executorData: SparkExecutorData = null;
+ private var _storageData: SparkStorageData = null;
+ private var _isThrottled: Boolean = false;
+
+ def throttle(): Unit = {
+ _isThrottled = true
+ }
+
+ override def isThrottled(): Boolean = _isThrottled
+
+ override def getApplicationType(): ApplicationType = APPLICATION_TYPE
+
+ override def getConf(): Properties = getEnvironmentData().getSparkProperties()
+
+ override def isEmpty(): Boolean = !isThrottled() && getExecutorData().getExecutors.isEmpty()
+
+ override def getGeneralData(): SparkGeneralData = {
+ if (_applicationData == null) {
+ _applicationData = new SparkGeneralData()
+
+ applicationEventListener.adminAcls match {
+ case Some(s: String) => {
+ _applicationData.setAdminAcls(stringToSet(s))
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.viewAcls match {
+ case Some(s: String) => {
+ _applicationData.setViewAcls(stringToSet(s))
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.appId match {
+ case Some(s: String) => {
+ _applicationData.setApplicationId(s)
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.appName match {
+ case Some(s: String) => {
+ _applicationData.setApplicationName(s)
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.sparkUser match {
+ case Some(s: String) => {
+ _applicationData.setSparkUser(s)
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.startTime match {
+ case Some(s: Long) => {
+ _applicationData.setStartTime(s)
+ }
+ case None => {
+ // do nothing
+ }
+ }
+
+ applicationEventListener.endTime match {
+ case Some(s: Long) => {
+ _applicationData.setEndTime(s)
+ }
+ case None => {
+ // do nothing
+ }
+ }
+ }
+ _applicationData
+ }
+
+ override def getEnvironmentData(): SparkEnvironmentData = {
+ if (_environmentData == null) {
+ // Notice: we ignore jvmInformation and classpathEntries, because they are less likely to be used by any analyzer.
+ _environmentData = new SparkEnvironmentData()
+ environmentListener.systemProperties.foreach { case (name, value) =>
+ _environmentData.addSystemProperty(name, value)
+ }
+ environmentListener.sparkProperties.foreach { case (name, value) =>
+ _environmentData.addSparkProperty(name, value)
+ }
+ }
+ _environmentData
+ }
+
+ override def getExecutorData(): SparkExecutorData = {
+ if (_executorData == null) {
+ _executorData = new SparkExecutorData()
+
+ for (statusId <- 0 until executorsListener.storageStatusList.size) {
+ val info = new ExecutorInfo()
+
+ val status = executorsListener.storageStatusList(statusId)
+
+ info.execId = status.blockManagerId.executorId
+ info.hostPort = status.blockManagerId.hostPort
+ info.rddBlocks = status.numBlocks
+
+ // Use a customized listener to fetch the peak memory used, the data contained in status are
+ // the current used memory that is not useful in offline settings.
+ info.memUsed = storageStatusTrackingListener.executorIdToMaxUsedMem.getOrElse(info.execId, 0L)
+ info.maxMem = status.maxMem
+ info.diskUsed = status.diskUsed
+ info.activeTasks = executorsListener.executorToTasksActive.getOrElse(info.execId, 0)
+ info.failedTasks = executorsListener.executorToTasksFailed.getOrElse(info.execId, 0)
+ info.completedTasks = executorsListener.executorToTasksComplete.getOrElse(info.execId, 0)
+ info.totalTasks = info.activeTasks + info.failedTasks + info.completedTasks
+ info.duration = executorsListener.executorToDuration.getOrElse(info.execId, 0L)
+ info.inputBytes = executorsListener.executorToInputBytes.getOrElse(info.execId, 0L)
+ info.shuffleRead = executorsListener.executorToShuffleRead.getOrElse(info.execId, 0L)
+ info.shuffleWrite = executorsListener.executorToShuffleWrite.getOrElse(info.execId, 0L)
+
+ _executorData.setExecutorInfo(info.execId, info)
+ }
+ }
+ _executorData
+ }
+
+ override def getJobProgressData(): SparkJobProgressData = {
+ if (_jobProgressData == null) {
+ _jobProgressData = new SparkJobProgressData()
+
+ // Add JobInfo
+ jobProgressListener.jobIdToData.foreach { case (id, data) =>
+ val jobInfo = new JobInfo()
+
+ jobInfo.jobId = data.jobId
+ jobInfo.jobGroup = data.jobGroup.getOrElse("")
+ jobInfo.numActiveStages = data.numActiveStages
+ jobInfo.numActiveTasks = data.numActiveTasks
+ jobInfo.numCompletedTasks = data.numCompletedTasks
+ jobInfo.numFailedStages = data.numFailedStages
+ jobInfo.numFailedTasks = data.numFailedTasks
+ jobInfo.numSkippedStages = data.numSkippedStages
+ jobInfo.numSkippedTasks = data.numSkippedTasks
+ jobInfo.numTasks = data.numTasks
+
+ jobInfo.startTime = data.submissionTime.getOrElse(0)
+ jobInfo.endTime = data.completionTime.getOrElse(0)
+
+ data.stageIds.foreach{ case (id: Int) => jobInfo.addStageId(id)}
+ addIntSetToJSet(data.completedStageIndices, jobInfo.completedStageIndices)
+
+ _jobProgressData.addJobInfo(id, jobInfo)
+ }
+
+ // Add Stage Info
+ jobProgressListener.stageIdToData.foreach { case (id, data) =>
+ val stageInfo = new SparkJobProgressData.StageInfo()
+ val sparkStageInfo = jobProgressListener.stageIdToInfo.get(id._1)
+ stageInfo.name = sparkStageInfo match {
+ case Some(info: StageInfo) => {
+ info.name
+ }
+ case None => {
+ ""
+ }
+ }
+ stageInfo.description = data.description.getOrElse("")
+ stageInfo.diskBytesSpilled = data.diskBytesSpilled
+ stageInfo.executorRunTime = data.executorRunTime
+ stageInfo.duration = sparkStageInfo match {
+ case Some(info: StageInfo) => {
+ val submissionTime = info.submissionTime.getOrElse(0L)
+ info.completionTime.getOrElse(submissionTime) - submissionTime
+ }
+ case _ => 0L
+ }
+ stageInfo.inputBytes = data.inputBytes
+ stageInfo.memoryBytesSpilled = data.memoryBytesSpilled
+ stageInfo.numActiveTasks = data.numActiveTasks
+ stageInfo.numCompleteTasks = data.numCompleteTasks
+ stageInfo.numFailedTasks = data.numFailedTasks
+ stageInfo.outputBytes = data.outputBytes
+ stageInfo.shuffleReadBytes = data.shuffleReadTotalBytes
+ stageInfo.shuffleWriteBytes = data.shuffleWriteBytes
+ addIntSetToJSet(data.completedIndices, stageInfo.completedIndices)
+
+ _jobProgressData.addStageInfo(id._1, id._2, stageInfo)
+ }
+
+ // Add completed jobs
+ jobProgressListener.completedJobs.foreach { case (data) => _jobProgressData.addCompletedJob(data.jobId) }
+ // Add failed jobs
+ jobProgressListener.failedJobs.foreach { case (data) => _jobProgressData.addFailedJob(data.jobId) }
+ // Add completed stages
+ jobProgressListener.completedStages.foreach { case (data) =>
+ _jobProgressData.addCompletedStages(data.stageId, data.attemptId)
+ }
+ // Add failed stages
+ jobProgressListener.failedStages.foreach { case (data) =>
+ _jobProgressData.addFailedStages(data.stageId, data.attemptId)
+ }
+ }
+ _jobProgressData
+ }
+
+ // This method returns a combined information from StorageStatusListener and StorageListener
+ override def getStorageData(): SparkStorageData = {
+ if (_storageData == null) {
+ _storageData = new SparkStorageData()
+ _storageData.setRddInfoList(toJList[RDDInfo](storageListener.rddInfoList))
+ _storageData.setStorageStatusList(toJList[StorageStatus](storageStatusListener.storageStatusList))
+ }
+ _storageData
+ }
+
+ override def getAppId: String = {
+ getGeneralData().getApplicationId
+ }
+
+ def load(in: InputStream, sourceName: String): Unit = {
+ val replayBus = new ReplayListenerBus()
+ replayBus.addListener(applicationEventListener)
+ replayBus.addListener(jobProgressListener)
+ replayBus.addListener(environmentListener)
+ replayBus.addListener(storageStatusListener)
+ replayBus.addListener(executorsListener)
+ replayBus.addListener(storageListener)
+ replayBus.addListener(storageStatusTrackingListener)
+ replayBus.replay(in, sourceName, maybeTruncated = false)
+ }
+}
+
+object SparkDataCollection {
+ private val APPLICATION_TYPE = new ApplicationType("SPARK")
+
+ def stringToSet(str: String): JSet[String] = {
+ val set = new JHashSet[String]()
+ str.split(",").foreach { case t: String => set.add(t)}
+ set
+ }
+
+ def toJList[T](seq: Seq[T]): JList[T] = {
+ val list = new JArrayList[T]()
+ seq.foreach { case (item: T) => list.add(item)}
+ list
+ }
+
+ def addIntSetToJSet(set: OpenHashSet[Int], jset: JSet[Integer]): Unit = {
+ val it = set.iterator
+ while (it.hasNext) {
+ jset.add(it.next())
+ }
+ }
+
+ def addIntSetToJSet(set: mutable.HashSet[Int], jset: JSet[Integer]): Unit = {
+ val it = set.iterator
+ while (it.hasNext) {
+ jset.add(it.next())
+ }
+ }
+}
diff --git a/app/org/apache/spark/deploy/history/SparkFSFetcher.scala b/app/org/apache/spark/deploy/history/SparkFSFetcher.scala
new file mode 100644
index 000000000..6788cccf5
--- /dev/null
+++ b/app/org/apache/spark/deploy/history/SparkFSFetcher.scala
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.InputStream
+import java.security.PrivilegedAction
+
+import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
+import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
+import com.linkedin.drelephant.security.HadoopSecurity
+import com.linkedin.drelephant.spark.legacydata.SparkApplicationData
+import com.linkedin.drelephant.util.{HadoopUtils, SparkUtils, Utils}
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}
+import org.apache.spark.storage.{StorageStatusListener, StorageStatusTrackingListener}
+import org.apache.spark.ui.env.EnvironmentListener
+import org.apache.spark.ui.exec.ExecutorsListener
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.storage.StorageListener
+
+
+/**
+ * A wrapper that replays Spark event history from files and then fill proper data objects.
+ */
+class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends ElephantFetcher[SparkApplicationData] {
+ import SparkFSFetcher._
+
+ val eventLogSizeLimitMb =
+ Option(fetcherConfData.getParamMap.get(LOG_SIZE_XML_FIELD))
+ .flatMap { x => Option(Utils.getParam(x, 1)) }
+ .map { _(0) }
+ .getOrElse(DEFAULT_EVENT_LOG_SIZE_LIMIT_MB)
+ logger.info("The event log limit of Spark application is set to " + eventLogSizeLimitMb + " MB")
+ val eventLogUri = Option(fetcherConfData.getParamMap.get(LOG_LOCATION_URI_XML_FIELD))
+ logger.info("The event log location of Spark application is set to " + eventLogUri)
+
+ private lazy val security = new HadoopSecurity()
+
+ protected lazy val hadoopUtils: HadoopUtils = HadoopUtils
+
+ protected lazy val sparkUtils: SparkUtils = SparkUtils
+
+ protected lazy val hadoopConfiguration: Configuration = new Configuration()
+
+ protected lazy val sparkConf: SparkConf = {
+ val sparkConf = new SparkConf()
+ sparkUtils.getDefaultPropertiesFile() match {
+ case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename))
+ case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR")
+ }
+ sparkConf
+ }
+
+ def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
+ val appId = analyticJob.getAppId()
+ doAsPrivilegedAction { () => doFetchData(appId) }
+ }
+
+ protected def doAsPrivilegedAction[T](action: () => T): T =
+ security.doAs[T](new PrivilegedAction[T] { override def run(): T = action() })
+
+ protected def doFetchData(appId: String): SparkDataCollection = {
+ val dataCollection = new SparkDataCollection()
+
+ val (eventLogFileSystem, baseEventLogPath) =
+ sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, eventLogUri)
+ val (eventLogPath, eventLogCodec) =
+ sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, None)
+
+ // Check if the log parser should be throttled when the file is too large.
+ val shouldThrottle = eventLogFileSystem.getFileStatus(eventLogPath).getLen() > (eventLogSizeLimitMb * FileUtils.ONE_MB)
+ if (shouldThrottle) {
+ dataCollection.throttle()
+ // Since the data set is empty, we need to set the application id,
+ // so that we could detect this is Spark job type
+ dataCollection.getGeneralData().setApplicationId(appId)
+ dataCollection.getConf().setProperty("spark.app.id", appId)
+
+ logger.info("The event log of Spark application: " + appId + " is over the limit size of "
+ + eventLogSizeLimitMb + " MB, the parsing process gets throttled.")
+ } else {
+ logger.info("Replaying Spark logs for application: " + appId +
+ " withlogPath: " + eventLogPath +
+ " with codec:" + eventLogCodec)
+
+ sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec) { in =>
+ dataCollection.load(in, eventLogPath.toString())
+ }
+
+ logger.info("Replay completed for application: " + appId)
+ }
+
+ dataCollection
+
+ }
+}
+
+object SparkFSFetcher {
+ private val logger = Logger.getLogger(SparkFSFetcher.getClass)
+
+ val DEFAULT_EVENT_LOG_SIZE_LIMIT_MB = 100d; // 100MB
+
+ val LOG_SIZE_XML_FIELD = "event_log_size_limit_in_mb"
+
+ val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
+
+ val DEFAULT_ATTEMPT_ID = Some("1")
+}
diff --git a/app/org/apache/spark/storage/StorageStatusTrackingListener.scala b/app/org/apache/spark/storage/StorageStatusTrackingListener.scala
new file mode 100644
index 000000000..5d30a2887
--- /dev/null
+++ b/app/org/apache/spark/storage/StorageStatusTrackingListener.scala
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.spark.storage
+
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.scheduler._
+
+
+/**
+ * :: DeveloperApi ::
+ * A modified version of StorageStatusListener that tracks the peak memory usage during the entire application runtime.
+ *
+ * NOTICE: this class copies StorageStatusListener's codes instead of extending from it, because the methods that
+ * require being overridden are all in private scope.
+ */
+@DeveloperApi
+class StorageStatusTrackingListener extends SparkListener {
+ // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
+ private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+
+ def storageStatusList = executorIdToStorageStatus.values.toSeq
+
+ val executorIdToMaxUsedMem = mutable.Map[String, Long]()
+
+ /** Update storage status list to reflect updated block statuses */
+ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
+ executorIdToStorageStatus.get(execId).foreach { storageStatus =>
+ updatedBlocks.foreach { case (blockId, updatedStatus) =>
+ if (updatedStatus.storageLevel == StorageLevel.NONE) {
+ storageStatus.removeBlock(blockId)
+ } else {
+ storageStatus.updateBlock(blockId, updatedStatus)
+ }
+ }
+ }
+ updateUsedMem()
+ }
+
+ /** Update storage status list to reflect the removal of an RDD from the cache */
+ private def updateStorageStatus(unpersistedRDDId: Int): Unit = {
+ storageStatusList.foreach { storageStatus =>
+ storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
+ storageStatus.removeBlock(blockId)
+ }
+ }
+ updateUsedMem()
+ }
+
+ private def updateUsedMem(): Unit = {
+ executorIdToStorageStatus.foreach { case (execId, storageStatus) =>
+ val currentMemUsed = storageStatus.memUsed
+ if (currentMemUsed > executorIdToMaxUsedMem.getOrElse(execId, 0L)) {
+ executorIdToMaxUsedMem(execId) = currentMemUsed
+ }
+ }
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ synchronized {
+ val info = taskEnd.taskInfo
+ val metrics = taskEnd.taskMetrics
+ if (info != null && metrics != null) {
+ val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ if (updatedBlocks.length > 0) {
+ updateStorageStatus(info.executorId, updatedBlocks)
+ }
+ }
+ }
+ }
+
+ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = {
+ synchronized {
+ updateStorageStatus(unpersistRDD.rddId)
+ }
+ }
+
+ override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
+ synchronized {
+ val blockManagerId = blockManagerAdded.blockManagerId
+ val executorId = blockManagerId.executorId
+ val maxMem = blockManagerAdded.maxMem
+ val storageStatus = new StorageStatus(blockManagerId, maxMem)
+ executorIdToStorageStatus(executorId) = storageStatus
+ }
+ }
+
+ override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
+ synchronized {
+ val executorId = blockManagerRemoved.blockManagerId.executorId
+ executorIdToStorageStatus.remove(executorId)
+ }
+ }
+}
diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
index 8efd3d11b..a3c0e1cf2 100644
--- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
+++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
@@ -134,7 +134,7 @@ object SparkMetricsAggregatorTest {
import JavaConverters._
def newFakeAggregatorConfigurationData(params: Map[String, String] = Map.empty): AggregatorConfigurationData =
- new AggregatorConfigurationData("org.apache.spark.SparkMetricsAggregator", new ApplicationType("SPARK"), params.asJava)
+ new AggregatorConfigurationData("org.apache.spark.SparkMetricsAggregator", new ApplicationType("SPARK"), params.asJava)
def newFakeSparkListenerEnvironmentUpdate(appConfigurationProperties: Map[String, String]): SparkListenerEnvironmentUpdate =
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala
index b1879ba5c..e422b2499 100644
--- a/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala
+++ b/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala
@@ -16,24 +16,30 @@
package com.linkedin.drelephant.spark.fetchers
-import java.io.{File, FileOutputStream, InputStream, OutputStream}
+import java.io.InputStream
+import java.nio.file.Files
import java.util.Date
import scala.concurrent.{ExecutionContext, Future}
-import com.google.common.io.Files
import com.linkedin.drelephant.analysis.{AnalyticJob, ApplicationType}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.{SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.SparkFetcher.EventLogSource
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo}
-import com.linkedin.drelephant.util.SparkUtils
+import com.linkedin.drelephant.spark.legacydata.{MockSparkApplicationData, SparkGeneralData}
+import com.linkedin.drelephant.spark.fetchers.FSFetcher
+import com.linkedin.drelephant.util.{SparkUtils, HadoopUtils}
+import org.apache.hadoop.fs.Path
+import org.apache.log4j.Logger
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.SparkFSFetcher
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.mockito.Mockito
import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.mockito.MockitoSugar
-class SparkFetcherTest extends FunSpec with Matchers {
+class SparkFetcherTest extends FunSpec with Matchers with MockitoSugar {
import SparkFetcherTest._
describe("SparkFetcher") {
@@ -97,22 +103,25 @@ class SparkFetcherTest extends FunSpec with Matchers {
}
it("gets its SparkConf when SPARK_CONF_DIR is set") {
- val tempDir = Files.createTempDir()
+ val tempDir = Files.createTempDirectory(null)
val testResourceIn = getClass.getClassLoader.getResourceAsStream("spark-defaults.conf")
- val testResourceFile = new File(tempDir, "spark-defaults.conf")
- val testResourceOut = new FileOutputStream(testResourceFile)
- managedCopyInputStreamToOutputStream(testResourceIn, testResourceOut)
+ val testResourceFile = tempDir.resolve("spark-defaults.conf")
+ Files.copy(testResourceIn, testResourceFile)
val fetcherConfigurationData = newFakeFetcherConfigurationData()
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkUtils = new SparkUtils() {
- override val defaultEnv = Map("SPARK_CONF_DIR" -> tempDir.toString)
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map("SPARK_CONF_DIR" -> tempDir.toString)
}
}
val sparkConf = sparkFetcher.sparkConf
- tempDir.delete()
+ testResourceIn.close()
+ Files.delete(testResourceFile)
+ Files.delete(tempDir)
sparkConf.get("spark.yarn.historyServer.address") should be("jh1.grid.example.com:18080")
sparkConf.get("spark.eventLog.enabled") should be("true")
@@ -121,24 +130,27 @@ class SparkFetcherTest extends FunSpec with Matchers {
}
it("gets its SparkConf when SPARK_HOME is set") {
- val tempDir = Files.createTempDir()
- val tempConfDir = new File(tempDir, "conf")
- tempConfDir.mkdir()
+ val tempDir = Files.createTempDirectory(null)
+ val tempConfDir = Files.createDirectory(tempDir.resolve("conf"))
val testResourceIn = getClass.getClassLoader.getResourceAsStream("spark-defaults.conf")
- val testResourceFile = new File(tempConfDir, "spark-defaults.conf")
- val testResourceOut = new FileOutputStream(testResourceFile)
- managedCopyInputStreamToOutputStream(testResourceIn, testResourceOut)
+ val testResourceFile = tempConfDir.resolve("spark-defaults.conf")
+ Files.copy(testResourceIn, testResourceFile)
val fetcherConfigurationData = newFakeFetcherConfigurationData()
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkUtils = new SparkUtils() {
- override val defaultEnv = Map("SPARK_HOME" -> tempDir.toString)
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map("SPARK_HOME" -> tempDir.toString)
}
}
val sparkConf = sparkFetcher.sparkConf
- tempDir.delete()
+ testResourceIn.close()
+ Files.delete(testResourceFile)
+ Files.delete(tempConfDir)
+ Files.delete(tempDir)
sparkConf.get("spark.yarn.historyServer.address") should be("jh1.grid.example.com:18080")
sparkConf.get("spark.eventLog.enabled") should be("true")
@@ -149,7 +161,11 @@ class SparkFetcherTest extends FunSpec with Matchers {
it("throws an exception if neither SPARK_CONF_DIR nor SPARK_HOME are set") {
val fetcherConfigurationData = newFakeFetcherConfigurationData()
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
- override lazy val sparkUtils = new SparkUtils() { override val defaultEnv = Map.empty[String, String] }
+ override lazy val sparkUtils = new SparkUtils() {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
}
an[IllegalStateException] should be thrownBy { sparkFetcher.sparkConf }
@@ -239,21 +255,4 @@ object SparkFetcherTest {
Mockito.when(sparkLogClient.fetchData(appId, attemptId)).thenReturn(logDerivedData)
sparkLogClient
}
-
- def managedCopyInputStreamToOutputStream(in: => InputStream, out: => OutputStream): Unit = {
- for {
- input <- resource.managed(in)
- output <- resource.managed(out)
- } {
- val buffer = new Array[Byte](512)
- def read(): Unit = input.read(buffer) match {
- case -1 => ()
- case bytesRead => {
- output.write(buffer, 0, bytesRead)
- read()
- }
- }
- read()
- }
- }
}
diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkLogClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkLogClientTest.scala
index d5fd38927..994af486f 100644
--- a/test/com/linkedin/drelephant/spark/fetchers/SparkLogClientTest.scala
+++ b/test/com/linkedin/drelephant/spark/fetchers/SparkLogClientTest.scala
@@ -16,73 +16,54 @@
package com.linkedin.drelephant.spark.fetchers
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+import java.io.{ByteArrayOutputStream, InputStream}
import java.net.URI
import scala.concurrent.ExecutionContext
+import com.linkedin.drelephant.util.{SparkUtils, SparkUtilsTest}
+import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path, PositionedReadable}
-import org.apache.hadoop.io.compress.CompressionInputStream
+import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.mockito.BDDMockito
import org.scalatest.{AsyncFunSpec, Matchers}
import org.scalatest.mockito.MockitoSugar
import org.xerial.snappy.SnappyOutputStream
class SparkLogClientTest extends AsyncFunSpec with Matchers with MockitoSugar {
- import SparkLogClientTest._
-
describe("SparkLogClient") {
- it("throws an exception if spark.eventLog.dir is missing") {
- an[IllegalArgumentException] should be thrownBy { new SparkLogClient(new Configuration(), new SparkConf()) }
- }
-
- it("uses spark.eventLog.dir if it is already an webhdfs URI") {
- val hadoopConfiguration = new Configuration()
- val sparkConf = new SparkConf().set("spark.eventLog.dir", "webhdfs://nn1.grid.example.com:50070/logs/spark")
- val sparkLogClient = new SparkLogClient(hadoopConfiguration, sparkConf)
- sparkLogClient.webhdfsEventLogUri should be(new URI("webhdfs://nn1.grid.example.com:50070/logs/spark"))
- }
-
- it("uses a webhdfs URI constructed from spark.eventLog.dir and dfs.namenode.http-address if spark.eventLog.dir is an hdfs URI") {
- val hadoopConfiguration = new Configuration()
- hadoopConfiguration.set("dfs.namenode.http-address", "0.0.0.0:50070")
- val sparkConf = new SparkConf().set("spark.eventLog.dir", "hdfs://nn1.grid.example.com:9000/logs/spark")
- val sparkLogClient = new SparkLogClient(hadoopConfiguration, sparkConf)
- sparkLogClient.webhdfsEventLogUri should be(new URI("webhdfs://nn1.grid.example.com:50070/logs/spark"))
- }
-
- it("returns the desired data from the Spark event logs") {
- import ExecutionContext.Implicits.global
-
- val hadoopConfiguration = new Configuration()
- hadoopConfiguration.set("dfs.namenode.http-address", "0.0.0.0:50070")
+ it("returns log-derived data") {
+ val hadoopConfiguration = new Configuration(false)
val sparkConf =
new SparkConf()
- .set("spark.eventLog.dir", "hdfs://nn1.grid.example.com:9000/logs/spark")
+ .set("spark.eventLog.dir", "webhdfs://nn1.grid.example.com:50070/logs/spark")
.set("spark.eventLog.compress", "true")
.set("spark.io.compression.codec", "snappy")
val appId = "application_1"
val attemptId = Some("1")
- val testResourceIn = getClass.getClassLoader.getResourceAsStream("spark_event_logs/event_log_2")
- val byteOut = new ByteArrayOutputStream()
- val snappyOut = new SnappyOutputStream(byteOut)
- managedCopyInputStreamToOutputStream(testResourceIn, snappyOut)
-
- val sparkLogClient = new SparkLogClient(hadoopConfiguration, sparkConf) {
- override lazy val fs: FileSystem = {
- val fs = mock[FileSystem]
- val expectedPath = new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.snappy")
- BDDMockito.given(fs.exists(expectedPath)).willReturn(true)
- BDDMockito.given(fs.open(expectedPath)).willReturn(
- new FSDataInputStream(new FakeCompressionInputStream(new ByteArrayInputStream(byteOut.toByteArray)))
- )
- fs
+ val eventLogBytes = {
+ val bout = new ByteArrayOutputStream()
+ for {
+ in <- resource.managed(getClass.getClassLoader.getResourceAsStream("spark_event_logs/event_log_2"))
+ out <- resource.managed(new SnappyOutputStream(bout))
+ } {
+ IOUtils.copy(in, out)
}
+ bout.toByteArray
+ }
+
+ val sparkLogClient = new SparkLogClient(hadoopConfiguration, sparkConf, None) {
+ override lazy val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
+ new URI("webhdfs://nn1.grid.example.com:50070"),
+ new Path("/logs/spark"),
+ new Path("application_1_1.snappy"),
+ eventLogBytes
+ )
+
+ override protected def doAsPrivilegedAction[T](action: () => T): T = action()
}
sparkLogClient.fetchData(appId, attemptId).map { logDerivedData =>
@@ -100,31 +81,3 @@ class SparkLogClientTest extends AsyncFunSpec with Matchers with MockitoSugar {
}
}
}
-
-object SparkLogClientTest {
- class FakeCompressionInputStream(in: InputStream) extends CompressionInputStream(in) with PositionedReadable {
- override def read(): Int = in.read()
- override def read(b: Array[Byte], off: Int, len: Int): Int = in.read(b, off, len)
- override def read(pos: Long, buffer: Array[Byte], off: Int, len: Int): Int = ???
- override def readFully(pos: Long, buffer: Array[Byte], off: Int, len: Int): Unit = ???
- override def readFully(pos: Long, buffer: Array[Byte]): Unit = ???
- override def resetState(): Unit = ???
- }
-
- def managedCopyInputStreamToOutputStream(in: => InputStream, out: => OutputStream): Unit = {
- for {
- input <- resource.managed(in)
- output <- resource.managed(out)
- } {
- val buffer = new Array[Byte](512)
- def read(): Unit = input.read(buffer) match {
- case -1 => ()
- case bytesRead => {
- output.write(buffer, 0, bytesRead)
- read()
- }
- }
- read()
- }
- }
-}
diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
index e004b855b..f428902c8 100644
--- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
+++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala
@@ -44,10 +44,6 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
import SparkRestClientTest._
describe("SparkRestClient") {
- it("throws an exception if spark.eventLog.dir is missing") {
- an[IllegalArgumentException] should be thrownBy(new SparkRestClient(new SparkConf()))
- }
-
it("returns the desired data from the Spark REST API for cluster mode application") {
import ExecutionContext.Implicits.global
val fakeJerseyServer = new FakeJerseyServer() {
@@ -169,6 +165,24 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
assertion
}
}
+
+ it("throws an exception if spark.yarn.historyServer.address is missing") {
+ an[IllegalArgumentException] should be thrownBy(new SparkRestClient(new SparkConf()))
+ }
+
+ it("handles unrecognized fields gracefully when parsing") {
+ val objectMapper = SparkRestClient.SparkRestObjectMapper
+ val json = s"""{
+ "startTime" : "2016-09-12T19:30:18.101GMT",
+ "endTime" : "1969-12-31T23:59:59.999GMT",
+ "sparkUser" : "foo",
+ "completed" : false,
+ "unrecognized" : "bar"
+ }"""
+
+ val applicationAttemptInfo = objectMapper.readValue[ApplicationAttemptInfo](json)
+ applicationAttemptInfo.sparkUser should be("foo")
+ }
}
}
diff --git a/test/com/linkedin/drelephant/spark/legacydata/LegacyDataConvertersTest.scala b/test/com/linkedin/drelephant/spark/legacydata/LegacyDataConvertersTest.scala
new file mode 100644
index 000000000..ad8e7511c
--- /dev/null
+++ b/test/com/linkedin/drelephant/spark/legacydata/LegacyDataConvertersTest.scala
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata
+
+import java.util.Date
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.api.v1.StageStatus
+import org.scalatest.{FunSpec, Matchers}
+
+
+class LegacyDataConvertersTest extends FunSpec with Matchers {
+ describe("LegacyDataConverters") {
+ describe(".convert") {
+ }
+
+ describe(".extractAppConfigurationProperties") {
+ it("returns a Map of Spark properties extracted from the given legacy SparkApplicationData") {
+ val legacyData = new MockSparkApplicationData() {
+ val environmentData = {
+ val environmentData = new SparkEnvironmentData()
+ environmentData.addSparkProperty("a", "b")
+ environmentData.addSparkProperty("c", "d")
+ environmentData
+ }
+
+ override def getEnvironmentData(): SparkEnvironmentData = environmentData
+ }
+
+ val appConfigurationProperties = LegacyDataConverters.extractAppConfigurationProperties(legacyData)
+ appConfigurationProperties should contain theSameElementsAs Map("a" -> "b", "c" -> "d")
+ }
+ }
+
+ describe(".extractApplicationInfo") {
+ it("returns an ApplicationInfo extracted from the given legacy SparkApplicationData") {
+ val legacyData = new MockSparkApplicationData() {
+ val generalData = {
+ val generalData = new SparkGeneralData()
+ generalData.setApplicationId("application_1")
+ generalData.setApplicationName("app")
+ generalData.setStartTime(1000L)
+ generalData.setEndTime(2000L)
+ generalData.setSparkUser("foo")
+ generalData
+ }
+
+ override def getGeneralData(): SparkGeneralData = generalData
+ }
+
+ val applicationInfo = LegacyDataConverters.extractApplicationInfo(legacyData)
+ applicationInfo.id should be("application_1")
+ applicationInfo.name should be("app")
+ applicationInfo.attempts.size should be(1)
+
+ val applicationAttemptInfo = applicationInfo.attempts.last
+ applicationAttemptInfo.attemptId should be(Some("1"))
+ applicationAttemptInfo.startTime should be(new Date(1000L))
+ applicationAttemptInfo.endTime should be(new Date(2000L))
+ applicationAttemptInfo.sparkUser should be("foo")
+ applicationAttemptInfo.completed should be(true)
+ }
+ }
+
+ describe(".extractJobDatas") {
+ it("returns JobDatas extracted from the given legacy SparkApplicationData") {
+ val legacyData = new MockSparkApplicationData() {
+ val jobProgressData = {
+ val jobProgressData = new SparkJobProgressData()
+
+ val jobInfo1 = {
+ val jobInfo = new SparkJobProgressData.JobInfo()
+ jobInfo.jobId = 1
+
+ jobInfo.numTasks = 10
+ jobInfo.numActiveTasks = 1
+ jobInfo.numCompletedTasks = 2
+ jobInfo.numSkippedTasks = 3
+ jobInfo.numFailedTasks = 4
+
+ for (i <- 1 to 100) { jobInfo.stageIds.add(i) }
+ jobInfo.numActiveStages = 10
+ for (i <- 1 to 20) { jobInfo.completedStageIndices.add(i) }
+ jobInfo.numSkippedStages = 30
+ jobInfo.numFailedStages = 40
+
+ jobInfo
+ }
+ jobProgressData.addJobInfo(1, jobInfo1)
+ jobProgressData.addCompletedJob(1)
+
+ val jobInfo2 = {
+ val jobInfo = new SparkJobProgressData.JobInfo()
+ jobInfo.jobId = 2
+ jobInfo
+ }
+ jobProgressData.addJobInfo(2, jobInfo2)
+ jobProgressData.addFailedJob(2)
+
+ jobProgressData
+ }
+
+ override def getJobProgressData(): SparkJobProgressData = jobProgressData
+ }
+
+ val jobDatas = LegacyDataConverters.extractJobDatas(legacyData)
+ jobDatas.size should be(2)
+
+ val jobData1 = jobDatas(0)
+ jobData1.jobId should be(1)
+ jobData1.name should be("1")
+ jobData1.description should be(None)
+ jobData1.submissionTime should be(None)
+ jobData1.completionTime should be(None)
+ jobData1.stageIds should be((1 to 100).toSeq)
+ jobData1.jobGroup should be(None)
+ jobData1.status should be(JobExecutionStatus.SUCCEEDED)
+ jobData1.numTasks should be(10)
+ jobData1.numActiveTasks should be(1)
+ jobData1.numCompletedTasks should be(2)
+ jobData1.numSkippedTasks should be(3)
+ jobData1.numFailedTasks should be(4)
+ jobData1.numActiveStages should be(10)
+ jobData1.numCompletedStages should be(20)
+ jobData1.numSkippedStages should be(30)
+ jobData1.numFailedStages should be(40)
+
+ val jobData2 = jobDatas(1)
+ jobData2.jobId should be(2)
+ jobData2.name should be("2")
+ jobData2.status should be(JobExecutionStatus.FAILED)
+ }
+ }
+
+ describe(".extractStageDatas") {
+ it("returns StageDatas extracted from the given legacy SparkApplicationData") {
+ val legacyData = new MockSparkApplicationData() {
+ val jobProgressData = {
+ val jobProgressData = new SparkJobProgressData()
+
+ val stageInfoS1A1 = {
+ val stageInfo = new SparkJobProgressData.StageInfo()
+
+ stageInfo.numActiveTasks = 1
+ stageInfo.numCompleteTasks = 2
+ stageInfo.numFailedTasks = 3
+
+ stageInfo.executorRunTime = 1000L
+
+ stageInfo.inputBytes = 10000L
+ stageInfo.outputBytes = 20000L
+ stageInfo.shuffleReadBytes = 30000L
+ stageInfo.shuffleWriteBytes = 40000L
+ stageInfo.memoryBytesSpilled = 50000L
+ stageInfo.diskBytesSpilled = 60000L
+
+ stageInfo.name = "1,1"
+ stageInfo.description = "a"
+
+ stageInfo
+ }
+ jobProgressData.addStageInfo(1, 1, stageInfoS1A1)
+ jobProgressData.addCompletedStages(1, 1)
+
+ val stageInfoS1A2 = {
+ val stageInfo = new SparkJobProgressData.StageInfo()
+ stageInfo.name = "1,2"
+ stageInfo
+ }
+ jobProgressData.addStageInfo(1, 2, stageInfoS1A2)
+ jobProgressData.addCompletedStages(1, 2)
+
+ val stageInfoS2A1 = {
+ val stageInfo = new SparkJobProgressData.StageInfo()
+ stageInfo.name = "2,1"
+ stageInfo
+ }
+ jobProgressData.addStageInfo(2, 1, stageInfoS2A1)
+ jobProgressData.addFailedStages(2, 1)
+
+ jobProgressData
+ }
+
+ override def getJobProgressData(): SparkJobProgressData = jobProgressData
+ }
+
+ val stageDatas = LegacyDataConverters.extractStageDatas(legacyData)
+ stageDatas.size should be(3)
+
+ val stageDataS1A1 = stageDatas(0)
+ stageDataS1A1.status should be(StageStatus.COMPLETE)
+ stageDataS1A1.stageId should be(1)
+ stageDataS1A1.attemptId should be(1)
+ stageDataS1A1.numActiveTasks should be(1)
+ stageDataS1A1.numCompleteTasks should be(2)
+ stageDataS1A1.numFailedTasks should be(3)
+ stageDataS1A1.executorRunTime should be(1000L)
+ stageDataS1A1.inputBytes should be(10000L)
+ stageDataS1A1.inputRecords should be(0L)
+ stageDataS1A1.outputBytes should be(20000L)
+ stageDataS1A1.outputRecords should be(0L)
+ stageDataS1A1.shuffleReadBytes should be(30000L)
+ stageDataS1A1.shuffleReadRecords should be(0L)
+ stageDataS1A1.shuffleWriteBytes should be(40000L)
+ stageDataS1A1.shuffleWriteRecords should be(0L)
+ stageDataS1A1.memoryBytesSpilled should be(50000L)
+ stageDataS1A1.diskBytesSpilled should be(60000L)
+ stageDataS1A1.name should be("1,1")
+ stageDataS1A1.details should be("a")
+ stageDataS1A1.schedulingPool should be("")
+ stageDataS1A1.accumulatorUpdates should be(Seq.empty)
+ stageDataS1A1.tasks should be(None)
+ stageDataS1A1.executorSummary should be(None)
+
+ val stageDataS1A2 = stageDatas(1)
+ stageDataS1A2.status should be(StageStatus.COMPLETE)
+ stageDataS1A2.stageId should be(1)
+ stageDataS1A2.attemptId should be(2)
+ stageDataS1A2.name should be("1,2")
+
+ val stageDataS2A1 = stageDatas(2)
+ stageDataS2A1.status should be(StageStatus.FAILED)
+ stageDataS2A1.stageId should be(2)
+ stageDataS2A1.attemptId should be(1)
+ stageDataS2A1.name should be("2,1")
+ }
+ }
+
+ describe(".extractExecutorSummaries") {
+ it("returns ExecutorSummaries extracted from the given legacy SparkApplicationData") {
+ val legacyData = new MockSparkApplicationData() {
+ val executorData = {
+ val executorData = new SparkExecutorData()
+
+ val executorInfo1 = {
+ val executorInfo = new SparkExecutorData.ExecutorInfo()
+
+ executorInfo.execId = "1"
+ executorInfo.hostPort = "9090"
+
+ executorInfo.rddBlocks = 10
+ executorInfo.memUsed = 10000L
+ executorInfo.maxMem = 20000L
+ executorInfo.diskUsed = 30000L
+
+ executorInfo.activeTasks = 1
+ executorInfo.completedTasks = 2
+ executorInfo.failedTasks = 3
+ executorInfo.totalTasks = 6
+
+ executorInfo.duration = 1000L
+
+ executorInfo.inputBytes = 100000L
+ executorInfo.shuffleRead = 200000L
+ executorInfo.shuffleWrite = 300000L
+
+ executorInfo
+ }
+ executorData.setExecutorInfo("1", executorInfo1)
+
+ val executorInfo2 = {
+ val executorInfo = new SparkExecutorData.ExecutorInfo()
+ executorInfo.execId = "2"
+ executorInfo
+ }
+ executorData.setExecutorInfo("2", executorInfo2)
+
+ executorData
+ }
+
+ override def getExecutorData(): SparkExecutorData = executorData
+ }
+
+ val executorSummaries = LegacyDataConverters.extractExecutorSummaries(legacyData)
+ executorSummaries.size should be(2)
+
+ val executorSummary1 = executorSummaries(0)
+ executorSummary1.id should be("1")
+ executorSummary1.hostPort should be("9090")
+ executorSummary1.rddBlocks should be(10)
+ executorSummary1.memoryUsed should be(10000L)
+ executorSummary1.diskUsed should be(30000L)
+ executorSummary1.activeTasks should be(1)
+ executorSummary1.failedTasks should be(3)
+ executorSummary1.completedTasks should be(2)
+ executorSummary1.totalTasks should be(6)
+ executorSummary1.totalDuration should be(1000L)
+ executorSummary1.totalInputBytes should be(100000L)
+ executorSummary1.totalShuffleRead should be(200000L)
+ executorSummary1.totalShuffleWrite should be(300000L)
+ executorSummary1.maxMemory should be(20000L)
+ executorSummary1.executorLogs should be(Map.empty)
+
+ val executorSummary2 = executorSummaries(1)
+ executorSummary2.id should be("2")
+ }
+ }
+
+ describe(".") {
+ }
+ }
+}
+
+object LegacyDataConvertersTest {
+
+}
diff --git a/test/com/linkedin/drelephant/spark/legacydata/MockSparkApplicationData.java b/test/com/linkedin/drelephant/spark/legacydata/MockSparkApplicationData.java
new file mode 100644
index 000000000..34917f6a4
--- /dev/null
+++ b/test/com/linkedin/drelephant/spark/legacydata/MockSparkApplicationData.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.spark.legacydata;
+
+import com.linkedin.drelephant.analysis.ApplicationType;
+import java.util.Properties;
+
+
+/**
+ * This is a pseudo local implementation of SparkApplicationData interface, supposed to be used for test purpose.
+ */
+public class MockSparkApplicationData implements SparkApplicationData {
+ private static final ApplicationType APPLICATION_TYPE = new ApplicationType("SPARK");
+
+ private final SparkGeneralData _sparkGeneralData;
+ private final SparkEnvironmentData _sparkEnvironmentData;
+ private final SparkExecutorData _sparkExecutorData;
+ private final SparkJobProgressData _sparkJobProgressData;
+ private final SparkStorageData _sparkStorageData;
+
+ public MockSparkApplicationData() {
+ _sparkGeneralData = new SparkGeneralData();
+ _sparkEnvironmentData = new SparkEnvironmentData();
+ _sparkExecutorData = new SparkExecutorData();
+ _sparkJobProgressData = new SparkJobProgressData();
+ _sparkStorageData = new SparkStorageData();
+ }
+
+ @Override
+ public boolean isThrottled() {
+ return false;
+ }
+
+ @Override
+ public SparkGeneralData getGeneralData() {
+ return _sparkGeneralData;
+ }
+
+ @Override
+ public SparkEnvironmentData getEnvironmentData() {
+ return _sparkEnvironmentData;
+ }
+
+ @Override
+ public SparkExecutorData getExecutorData() {
+ return _sparkExecutorData;
+ }
+
+ @Override
+ public SparkJobProgressData getJobProgressData() {
+ return _sparkJobProgressData;
+ }
+
+ @Override
+ public SparkStorageData getStorageData() {
+ return _sparkStorageData;
+ }
+
+ @Override
+ public Properties getConf() {
+ return getEnvironmentData().getSparkProperties();
+ }
+
+ @Override
+ public String getAppId() {
+ return getGeneralData().getApplicationId();
+ }
+
+ @Override
+ public ApplicationType getApplicationType() {
+ return APPLICATION_TYPE;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return getExecutorData().getExecutors().isEmpty();
+ }
+}
diff --git a/test/com/linkedin/drelephant/util/HadoopUtilsTest.scala b/test/com/linkedin/drelephant/util/HadoopUtilsTest.scala
new file mode 100644
index 000000000..753e087a7
--- /dev/null
+++ b/test/com/linkedin/drelephant/util/HadoopUtilsTest.scala
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.util
+
+import java.io.{ByteArrayInputStream, IOException}
+import java.net.{HttpURLConnection, URL}
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.JsonNodeFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.mockito.Mockito
+import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.mockito.MockitoSugar
+
+class HadoopUtilsTest extends FunSpec with Matchers {
+ import HadoopUtilsTest._
+
+ describe("HadoopUtils") {
+ describe(".findHaNameNodeAddress") {
+ it("returns the first active HA name node it can find") {
+ val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "active"))
+ )
+ val conf = new Configuration(false)
+ conf.addResource("core-site.xml")
+ val haNameNodeAddress = hadoopUtils.findHaNameNodeAddress(conf)
+ haNameNodeAddress should be(Some("sample-ha2.grid.example.com:50070"))
+ }
+
+ it("returns no HA name node if one isn't configured") {
+ val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "active"))
+ )
+ val conf = new Configuration(false)
+ val haNameNodeAddress = hadoopUtils.findHaNameNodeAddress(conf)
+ haNameNodeAddress should be(None)
+ }
+ }
+
+ describe(".httpNameNodeAddress") {
+ it("returns the default name node") {
+ val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "active"))
+ )
+ val conf = new Configuration(false)
+ conf.addResource("core-site.xml")
+ val haNameNodeAddress = hadoopUtils.httpNameNodeAddress(conf)
+ haNameNodeAddress should be(Some("sample.grid.example.com:50070"))
+ }
+ }
+
+ describe(".isActiveNameNode") {
+ it("returns true for active name nodes") {
+ val hadoopUtils =
+ newFakeHadoopUtilsForNameNode(Map(("nn1.grid.example.com", ("nn1-ha1.grid.example.com", "active"))))
+ hadoopUtils.isActiveNameNode("nn1.grid.example.com") should be(true)
+ }
+
+ it("returns false for standby name nodes") {
+ val hadoopUtils =
+ newFakeHadoopUtilsForNameNode(Map(("nn1.grid.example.com", ("nn1-ha1.grid.example.com", "standby"))))
+ hadoopUtils.isActiveNameNode("nn1.grid.example.com") should be(false)
+ }
+ }
+ }
+}
+
+object HadoopUtilsTest extends MockitoSugar {
+ import scala.annotation.varargs
+
+ @varargs
+ def newFakeHadoopUtilsForNameNode(nameNodeHostsAndStatesByJmxHost: (String, (String, String))*): HadoopUtils =
+ newFakeHadoopUtilsForNameNode(nameNodeHostsAndStatesByJmxHost.toMap)
+
+ def newFakeHadoopUtilsForNameNode(nameNodeHostsAndStatesByJmxHost: Map[String, (String, String)]): HadoopUtils =
+ new HadoopUtils {
+ override lazy val logger = mock[Logger]
+
+ override def newAuthenticatedConnection(url: URL): HttpURLConnection = {
+ val conn = mock[HttpURLConnection]
+ val jmxHost = url.getHost
+ nameNodeHostsAndStatesByJmxHost.get(jmxHost) match {
+ case Some((host, state)) => {
+ val jsonNode = newFakeNameNodeStatus(host, state)
+ val bytes = jsonNode.toString.getBytes("UTF-8")
+ Mockito.when(conn.getInputStream()).thenReturn(new ByteArrayInputStream(bytes))
+ }
+ case None => {
+ Mockito.when(conn.getInputStream()).thenThrow(new IOException())
+ }
+ }
+ conn
+ }
+ }
+
+ def newFakeNameNodeStatus(host: String, state: String): JsonNode = {
+ val jsonNodeFactory = JsonNodeFactory.instance;
+
+ val beanJsonNode =
+ jsonNodeFactory.objectNode()
+ .put("name", "Hadoop:service=NameNode, name=NameNodeStatus")
+ .put("modelerType", "org.apache.hadoop.hdfs.server.namenode.NameNode")
+ .put("NNRole", "NameNode")
+ .put("HostAndPort", "s${host}:9000")
+ .put("SecurityEnabled", "true")
+ .put("State", state)
+
+ val beansJsonNode =
+ jsonNodeFactory.arrayNode().add(beanJsonNode)
+
+ jsonNodeFactory.objectNode().set("beans", beansJsonNode)
+ }
+}
diff --git a/test/com/linkedin/drelephant/util/SparkUtilsTest.scala b/test/com/linkedin/drelephant/util/SparkUtilsTest.scala
new file mode 100644
index 000000000..632b49536
--- /dev/null
+++ b/test/com/linkedin/drelephant/util/SparkUtilsTest.scala
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.linkedin.drelephant.util
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+import java.net.URI
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, FileSystem, Path, PathFilter, PositionedReadable}
+import org.apache.hadoop.io.compress.CompressionInputStream
+import org.apache.log4j.Logger
+import org.apache.spark.SparkConf
+import org.apache.spark.io.SnappyCompressionCodec
+import org.mockito.BDDMockito
+import org.mockito.Matchers
+import org.scalatest.{FunSpec, Matchers, OptionValues}
+import org.scalatest.mockito.MockitoSugar
+import org.xerial.snappy.SnappyOutputStream
+
+
+class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValues with MockitoSugar {
+ describe("SparkUtils") {
+ describe(".fileSystemAndPathForEventLogDir") {
+ it("returns a filesystem + path based on uri from fetcherConfg") {
+ val hadoopConfiguration = new Configuration(false)
+ val sparkConf = new SparkConf()
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration,
+ sparkConf,
+ Some("webhdfs://nn1.grid.example.com:50070/logs/spark"))
+ fs.getUri.toString should be("webhdfs://nn1.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("returns a webhdfs filesystem + path based on spark.eventLog.dir when it is a webhdfs URL") {
+ val hadoopConfiguration = new Configuration(false)
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "webhdfs://nn1.grid.example.com:50070/logs/spark")
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+ fs.getUri.toString should be("webhdfs://nn1.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("returns a webhdfs filesystem + path based on spark.eventLog.dir when it is an hdfs URL") {
+ val hadoopConfiguration = new Configuration(false)
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "hdfs://nn1.grid.example.com:9000/logs/spark")
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+ fs.getUri.toString should be("webhdfs://nn1.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("returns a webhdfs filesystem + path based on dfs.nameservices and spark.eventLog.dir when the latter is a path and the dfs.nameservices is configured and available") {
+ val hadoopConfiguration = new Configuration(false)
+ hadoopConfiguration.set("dfs.nameservices", "sample")
+ hadoopConfiguration.set("dfs.ha.namenodes.sample", "ha1,ha2")
+ hadoopConfiguration.set("dfs.namenode.http-address.sample.ha1", "sample-ha1.grid.example.com:50070")
+ hadoopConfiguration.set("dfs.namenode.http-address.sample.ha2", "sample-ha2.grid.example.com:50070")
+
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "/logs/spark")
+
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+
+ override lazy val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "active"))
+ )
+
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+ fs.getUri.toString should be("webhdfs://sample-ha2.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("returns a webhdfs filesystem + path based on dfs.nameservices and spark.eventLog.dir when the latter is a path and the dfs.nameservices is configured but unavailable") {
+ val hadoopConfiguration = new Configuration(false)
+ hadoopConfiguration.set("dfs.nameservices", "sample")
+ hadoopConfiguration.set("dfs.ha.namenodes.sample", "ha1,ha2")
+ hadoopConfiguration.set("dfs.namenode.http-address.sample.ha1", "sample-ha1.grid.example.com:50070")
+ hadoopConfiguration.set("dfs.namenode.http-address.sample.ha2", "sample-ha2.grid.example.com:50070")
+ hadoopConfiguration.set("dfs.namenode.http-address", "sample.grid.example.com:50070")
+
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "/logs/spark")
+
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+
+ override lazy val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "standby"))
+ )
+
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+ fs.getUri.toString should be("webhdfs://sample.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("returns a webhdfs filesystem + path based on dfs.namenode.http-address and spark.eventLog.dir when the latter is a path and dfs.nameservices is not configured") {
+ val hadoopConfiguration = new Configuration(false)
+ hadoopConfiguration.set("dfs.namenode.http-address", "sample.grid.example.com:50070")
+
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "/logs/spark")
+
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+
+ override lazy val hadoopUtils = HadoopUtilsTest.newFakeHadoopUtilsForNameNode(
+ ("sample-ha1.grid.example.com", ("sample-ha1.grid.example.com", "standby")),
+ ("sample-ha2.grid.example.com", ("sample-ha2.grid.example.com", "active"))
+ )
+
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+ fs.getUri.toString should be("webhdfs://sample.grid.example.com:50070")
+ path should be(new Path("/logs/spark"))
+ }
+
+ it("throws an exception when spark.eventLog.dir is a path and no namenode is configured at all") {
+ val hadoopConfiguration = new Configuration(false)
+
+ val sparkConf = new SparkConf().set("spark.eventLog.dir", "/logs/spark")
+
+ val sparkUtils = new SparkUtils {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+ }
+
+ an[Exception] should be thrownBy { sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None) }
+ }
+ }
+
+ describe(".pathAndCodecforEventLog") {
+ it("returns the path and codec for the event log, given the base path and app/attempt information") {
+ val hadoopConfiguration = new Configuration(false)
+
+ val sparkConf =
+ new SparkConf()
+ .set("spark.eventLog.dir", "/logs/spark")
+ .set("spark.eventLog.compress", "true")
+
+ val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
+ new URI("webhdfs://nn1.grid.example.com:50070"),
+ new Path("/logs/spark"),
+ new Path("application_1_1.snappy"),
+ Array.empty[Byte]
+ )
+
+ val (fs, basePath) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+
+ val (path, codec) =
+ sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", Some("1"))
+
+ path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.snappy"))
+ codec.value should be(a[SnappyCompressionCodec])
+ }
+ it("returns the path and codec for the event log, given the base path and appid. Extracts attempt and codec from path") {
+ val hadoopConfiguration = new Configuration(false)
+
+ val sparkConf =
+ new SparkConf()
+ .set("spark.eventLog.dir", "/logs/spark")
+ .set("spark.eventLog.compress", "true")
+
+ val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
+ new URI("webhdfs://nn1.grid.example.com:50070"),
+ new Path("/logs/spark"),
+ new Path("application_1_1.snappy"),
+ Array.empty[Byte]
+ )
+
+ val (fs, basePath) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+
+ val (path, codec) =
+ sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", None)
+
+ path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.snappy"))
+ codec.value should be(a[SnappyCompressionCodec])
+ }
+ }
+
+ describe(".withEventLog") {
+ it("loans the input stream for the event log") {
+ val expectedLog =
+ """{"Event":"SparkListenerApplicationStart","App Name":"app","App ID":"application_1","Timestamp":1,"User":"foo"}"""
+
+ val eventLogBytes = {
+ val bout = new ByteArrayOutputStream()
+ for {
+ in <- resource.managed(new ByteArrayInputStream(expectedLog.getBytes("UTF-8")))
+ out <- resource.managed(new SnappyOutputStream(bout))
+ } {
+ IOUtils.copy(in, out)
+ }
+ bout.toByteArray
+ }
+
+ val hadoopConfiguration = new Configuration(false)
+
+ val sparkConf =
+ new SparkConf()
+ .set("spark.eventLog.dir", "/logs/spark")
+ .set("spark.eventLog.compress", "true")
+
+ val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
+ new URI("webhdfs://nn1.grid.example.com:50070"),
+ new Path("/logs/spark"),
+ new Path("application_1_1.snappy"),
+ eventLogBytes
+ )
+
+ val (fs, basePath) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)
+
+ val (path, codec) =
+ sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", None)
+
+ sparkUtils.withEventLog(fs, path, codec) { in =>
+ val bout = new ByteArrayOutputStream()
+ IOUtils.copy(in, bout)
+
+ val actualLog = new String(bout.toByteArray, "UTF-8")
+ actualLog should be(expectedLog)
+ }
+ }
+ }
+ }
+}
+
+object SparkUtilsTest extends MockitoSugar {
+ def newFakeSparkUtilsForEventLog(
+ fileSystemUri: URI,
+ basePath: Path,
+ filename: Path,
+ bytes: Array[Byte]
+ ): SparkUtils = new SparkUtils() {
+ override lazy val logger = mock[Logger]
+ override lazy val hadoopUtils = mock[HadoopUtils]
+ override lazy val defaultEnv = Map.empty[String, String]
+
+ override def fileSystemAndPathForEventLogDir(
+ hadoopConfiguration: Configuration,
+ sparkConf: SparkConf,
+ uriFromFetcherConf: Option[String]
+ ): (FileSystem, Path) = {
+ val fs = mock[FileSystem]
+ val expectedPath = new Path(new Path(fileSystemUri), new Path(basePath, filename))
+ val expectedFileStatus = {
+ val fileStatus = mock[FileStatus]
+ BDDMockito.given(fileStatus.getLen).willReturn(bytes.length.toLong)
+ BDDMockito.given(fileStatus.getPath()).willReturn(expectedPath)
+ fileStatus
+ }
+ val expectedStatusArray = Array(expectedFileStatus)
+
+ val filter = new PathFilter() {
+ override def accept(file: Path): Boolean = {
+ file.getName().startsWith("mockAppId");
+ }
+ }
+
+ BDDMockito.given(fs.getUri).willReturn(fileSystemUri)
+ BDDMockito.given(fs.exists(expectedPath)).willReturn(true)
+ BDDMockito.given(fs.getFileStatus(expectedPath)).willReturn(expectedFileStatus)
+ BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path( new Path(fileSystemUri), basePath)),
+ org.mockito.Matchers.any(filter.getClass))).
+ willReturn(expectedStatusArray)
+ BDDMockito.given(fs.open(expectedPath)).willReturn(
+ new FSDataInputStream(new FakeCompressionInputStream(new ByteArrayInputStream(bytes)))
+ )
+ (fs, basePath)
+ }
+ }
+
+ class FakeCompressionInputStream(in: InputStream) extends CompressionInputStream(in) with PositionedReadable {
+ override def read(): Int = in.read()
+ override def read(b: Array[Byte], off: Int, len: Int): Int = in.read(b, off, len)
+ override def read(pos: Long, buffer: Array[Byte], off: Int, len: Int): Int = ???
+ override def readFully(pos: Long, buffer: Array[Byte], off: Int, len: Int): Unit = ???
+ override def readFully(pos: Long, buffer: Array[Byte]): Unit = ???
+ override def resetState(): Unit = ???
+ }
+}
diff --git a/test/org/apache/spark/deploy/history/SparkDataCollectionTest.java b/test/org/apache/spark/deploy/history/SparkDataCollectionTest.java
new file mode 100644
index 000000000..0ed76b2e5
--- /dev/null
+++ b/test/org/apache/spark/deploy/history/SparkDataCollectionTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.spark.deploy.history;
+
+import com.linkedin.drelephant.spark.legacydata.SparkJobProgressData;
+import java.io.IOException;
+import org.apache.spark.SparkConf;
+import org.apache.spark.scheduler.ApplicationEventListener;
+import org.apache.spark.scheduler.ReplayListenerBus;
+import org.apache.spark.storage.StorageStatusListener;
+import org.apache.spark.storage.StorageStatusTrackingListener;
+import org.apache.spark.ui.env.EnvironmentListener;
+import org.apache.spark.ui.exec.ExecutorsListener;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.spark.ui.storage.StorageListener;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertNotNull;
+
+public class SparkDataCollectionTest {
+
+ private static final String event_log_dir = "spark_event_logs/";
+
+ @Test
+ public void testCollectJobProgressData() throws IOException {
+ SparkDataCollection dataCollection = new SparkDataCollection();
+
+ InputStream in = new BufferedInputStream(
+ SparkDataCollectionTest.class.getClassLoader().getResourceAsStream(event_log_dir + "event_log_1"));
+ dataCollection.load(in, in.toString());
+ in.close();
+
+ SparkJobProgressData jobProgressData = dataCollection.getJobProgressData();
+ assertNotNull("can't get job progress data", jobProgressData);
+ }
+
+}
diff --git a/test/org/apache/spark/deploy/history/SparkFsFetcherTest.scala b/test/org/apache/spark/deploy/history/SparkFsFetcherTest.scala
new file mode 100644
index 000000000..50995b2a8
--- /dev/null
+++ b/test/org/apache/spark/deploy/history/SparkFsFetcherTest.scala
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2016 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.net.URI
+
+import com.linkedin.drelephant.analysis.AnalyticJob
+import com.linkedin.drelephant.configurations.fetcher.{FetcherConfiguration, FetcherConfigurationData}
+import com.linkedin.drelephant.util.{SparkUtils, SparkUtilsTest}
+import javax.xml.parsers.DocumentBuilderFactory
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkConf
+import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.mockito.MockitoSugar
+import org.w3c.dom.Document
+import org.xerial.snappy.SnappyOutputStream
+
+class SparkFsFetcherTest extends FunSpec with Matchers with MockitoSugar {
+ import SparkFsFetcherTest._
+
+ describe("SparkFsFetcher") {
+ describe("constructor") {
+ it("handles fetcher configurations with supplied values") {
+ val fetcher = newFetcher("configurations/fetcher/FetcherConfTest5.xml")
+ fetcher.eventLogSizeLimitMb should be(50)
+ }
+
+ it("handles fetcher configurations with empty values") {
+ val fetcher = newFetcher("configurations/fetcher/FetcherConfTest6.xml")
+ fetcher.eventLogSizeLimitMb should be(SparkFSFetcher.DEFAULT_EVENT_LOG_SIZE_LIMIT_MB)
+ }
+
+ it("handles fetcher configurations with missing values") {
+ val fetcher = newFetcher("configurations/fetcher/FetcherConfTest7.xml")
+ fetcher.eventLogSizeLimitMb should be(SparkFSFetcher.DEFAULT_EVENT_LOG_SIZE_LIMIT_MB)
+ }
+ }
+
+ describe(".fetchData") {
+ it("returns the data collected from the Spark event log for the given analytic job") {
+ val eventLogBytes = {
+ val eventLog =
+ """{"Event":"SparkListenerApplicationStart","App Name":"app","App ID":"application_1","Timestamp":1,"User":"foo"}"""
+ val bout = new ByteArrayOutputStream()
+ for {
+ in <- resource.managed(new ByteArrayInputStream(eventLog.getBytes("UTF-8")))
+ out <- resource.managed(new SnappyOutputStream(bout))
+ } {
+ IOUtils.copy(in, out)
+ }
+ bout.toByteArray
+ }
+
+ val fetcherConfigurationData = newFetcherConfigurationData("configurations/fetcher/FetcherConfTest7.xml")
+ val fetcher = new SparkFSFetcher(fetcherConfigurationData) {
+ override lazy val hadoopConfiguration = new Configuration(false)
+
+ override lazy val sparkConf =
+ new SparkConf()
+ .set("spark.eventLog.dir", "webhdfs://nn1.grid.example.com:50070/logs/spark")
+ .set("spark.eventLog.compress", "true")
+ .set("spark.io.compression.codec", "snappy")
+
+ override lazy val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
+ new URI("webhdfs://nn1.grid.example.com:50070"),
+ new Path("/logs/spark"),
+ new Path("application_1_1.snappy"),
+ eventLogBytes
+ )
+
+ override protected def doAsPrivilegedAction[T](action: () => T): T = action()
+ }
+ val analyticJob = new AnalyticJob().setAppId("application_1")
+
+ val data = fetcher.fetchData(analyticJob)
+ data.getAppId should be("application_1")
+
+ val generalData = data.getGeneralData
+ generalData.getApplicationId should be("application_1")
+ generalData.getApplicationName should be("app")
+ generalData.getSparkUser should be("foo")
+ }
+ }
+ }
+}
+
+object SparkFsFetcherTest {
+ def newFetcher(confResourcePath: String): SparkFSFetcher = {
+ val fetcherConfData = newFetcherConfigurationData(confResourcePath)
+ val fetcherClass = getClass.getClassLoader.loadClass(fetcherConfData.getClassName)
+ fetcherClass.getConstructor(classOf[FetcherConfigurationData]).newInstance(fetcherConfData).asInstanceOf[SparkFSFetcher]
+ }
+
+ def newFetcherConfigurationData(confResourcePath: String): FetcherConfigurationData = {
+ val document = parseDocument(confResourcePath)
+ val fetcherConf = new FetcherConfiguration(document.getDocumentElement())
+ fetcherConf.getFetchersConfigurationData().get(0)
+ }
+
+ def parseDocument(resourcePath: String): Document = {
+ val factory = DocumentBuilderFactory.newInstance()
+ val builder = factory.newDocumentBuilder()
+ builder.parse(getClass.getClassLoader.getResourceAsStream(resourcePath))
+ }
+}
diff --git a/test/resources/configurations/fetcher/FetcherConfTest5.xml b/test/resources/configurations/fetcher/FetcherConfTest5.xml
index 2372f0828..4004c5e3e 100644
--- a/test/resources/configurations/fetcher/FetcherConfTest5.xml
+++ b/test/resources/configurations/fetcher/FetcherConfTest5.xml
@@ -18,8 +18,9 @@
spark
- com.linkedin.drelephant.spark.fetchers.SparkFetcher
+ org.apache.spark.deploy.history.SparkFSFetcher
+ 50
diff --git a/test/resources/configurations/fetcher/FetcherConfTest6.xml b/test/resources/configurations/fetcher/FetcherConfTest6.xml
new file mode 100644
index 000000000..a09588dfc
--- /dev/null
+++ b/test/resources/configurations/fetcher/FetcherConfTest6.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+ spark
+ org.apache.spark.deploy.history.SparkFSFetcher
+
+
+
+
+
diff --git a/test/resources/configurations/fetcher/FetcherConfTest7.xml b/test/resources/configurations/fetcher/FetcherConfTest7.xml
new file mode 100644
index 000000000..1564cb313
--- /dev/null
+++ b/test/resources/configurations/fetcher/FetcherConfTest7.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+ spark
+ org.apache.spark.deploy.history.SparkFSFetcher
+
+
+
+
diff --git a/test/resources/configurations/fetcher/FetcherConfTest8.xml b/test/resources/configurations/fetcher/FetcherConfTest8.xml
new file mode 100644
index 000000000..00fe9c055
--- /dev/null
+++ b/test/resources/configurations/fetcher/FetcherConfTest8.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+ spark
+ org.apache.spark.deploy.history.SparkFSFetcher
+
+ sample-ha3.grid.example.com:50070,sample-ha4.grid.example.com:50070
+
+
+
diff --git a/test/resources/core-site.xml b/test/resources/core-site.xml
index 403589589..7c26750fb 100644
--- a/test/resources/core-site.xml
+++ b/test/resources/core-site.xml
@@ -41,13 +41,16 @@
dfs.namenode.http-address.sample.ha1
- sample-ha1.grid.company.com:50070
+ sample-ha1.grid.example.com:50070
dfs.namenode.http-address.sample.ha2
- sample-ha2.grid.company.com:50070
+ sample-ha2.grid.example.com:50070
+
+ dfs.namenode.http-address
+ sample.grid.example.com:50070
+
-
diff --git a/test/resources/spark_event_logs/event_log_1 b/test/resources/spark_event_logs/event_log_1
new file mode 100644
index 000000000..0ed97efaf
--- /dev/null
+++ b/test/resources/spark_event_logs/event_log_1
@@ -0,0 +1,32 @@
+{"Event":"SparkListenerLogStart","Spark Version":"1.5.1"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.20.0.71","Port":58838},"Maximum Memory":1111794647,"Timestamp":1458126388757}
+{"Event":"SparkListenerApplicationStart","App Name":"PythonPi","App ID":"application_1457600942802_0093","Timestamp":1458126354336,"User":"hdfs"}
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1458126390170,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"reduce at pi.py:39","Number of Tasks":10,"RDD Info":[{"RDD ID":1,"Name":"PythonRDD","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"","Accumulables":[]}],"Stage IDs":[0],"Properties":{"spark.rdd.scope.noOverride":"true","spark.rdd.scope":"{\"id\":\"1\",\"name\":\"collect\"}","callSite.short":"reduce at pi.py:39"}}
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"reduce at pi.py:39","Number of Tasks":10,"RDD Info":[{"RDD ID":1,"Name":"PythonRDD","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"","Submission Time":1458126390256,"Accumulables":[]},"Properties":{"spark.rdd.scope.noOverride":"true","spark.rdd.scope":"{\"id\":\"1\",\"name\":\"collect\"}","callSite.short":"reduce at pi.py:39"}}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1458126397624,"Executor ID":"2","Executor Info":{"Host":".hello.com","Total Cores":2,"Log Urls":{"stdout":"http://hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000003/hdfs/stdout?start=-4096","stderr":"http://hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000003/hdfs/stderr?start=-4096"}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":".hello.com","Port":36478},"Maximum Memory":2223023063,"Timestamp":1458126398028}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1458126398712,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1458126398726,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1458126398962,"Executor ID":"1","Executor Info":{"Host":".hello.com","Total Cores":2,"Log Urls":{"stdout":"http://.hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000002/hdfs/stdout?start=-4096","stderr":"http://.hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000002/hdfs/stderr?start=-4096"}}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1458126398970,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1458126398973,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":".hello.com","Port":38464},"Maximum Memory":2223023063,"Timestamp":1458126399357}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1458126403532,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1458126398726,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126403558,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":"hello.com","Executor Deserialize Time":1091,"Executor Run Time":408,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1458126403563,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1458126398712,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126403565,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":"hello.com","Executor Deserialize Time":2605,"Executor Run Time":411,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":6,"Index":6,"Attempt":0,"Launch Time":1458126404784,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1458126398973,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126404787,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":".hello.com","Executor Deserialize Time":1375,"Executor Run Time":473,"Result Size":1018,"JVM GC Time":48,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":7,"Index":7,"Attempt":0,"Launch Time":1458126404791,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1458126398970,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126404793,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":".hello.com","Executor Deserialize Time":1385,"Executor Run Time":473,"Result Size":1018,"JVM GC Time":48,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":8,"Index":8,"Attempt":0,"Launch Time":1458126405270,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1458126403532,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126405273,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":"hello.com","Executor Deserialize Time":16,"Executor Run Time":115,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":9,"Index":9,"Attempt":0,"Launch Time":1458126406523,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":6,"Index":6,"Attempt":0,"Launch Time":1458126404784,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126406526,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":".hello.com","Executor Deserialize Time":14,"Executor Run Time":115,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1458126403563,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126406809,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":"hello.com","Executor Deserialize Time":14,"Executor Run Time":125,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":7,"Index":7,"Attempt":0,"Launch Time":1458126404791,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126408068,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":".hello.com","Executor Deserialize Time":15,"Executor Run Time":118,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":8,"Index":8,"Attempt":0,"Launch Time":1458126405270,"Executor ID":"2","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126408323,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":"hello.com","Executor Deserialize Time":13,"Executor Run Time":116,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":9,"Index":9,"Attempt":0,"Launch Time":1458126406523,"Executor ID":"1","Host":".hello.com","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1458126409598,"Failed":false,"Accumulables":[]},"Task Metrics":{"Host Name":".hello.com","Executor Deserialize Time":12,"Executor Run Time":116,"Result Size":1018,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0}}
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"reduce at pi.py:39","Number of Tasks":10,"RDD Info":[{"RDD ID":1,"Name":"PythonRDD","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"","Submission Time":1458126390256,"Completion Time":1458126409599,"Accumulables":[]}}
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1458126409602,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1458126409609}