Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update #224 to add FSFetcher as a standalone fetcher #232

Merged
merged 13 commits into from
Apr 14, 2017
Merged
38 changes: 27 additions & 11 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
</params>
</fetcher>
<!--
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.
This is an experimental replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
the job history server.

Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"
Increasing the param history_log_size_limit_in_mb allows this fetcher to accept larger log
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"

To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
Expand All @@ -61,8 +61,24 @@
</fetcher>
-->

<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties

Param Description:
*event_log_size_limit_in_mb* sets the threshold for the size of the eventlog. Increasing it will necessiate
increase in heap size. default is 100

*event_log_location_uri* can be used to specify the fully qualified uri for the location in hdfs for eventlogs
if this is not specified, the fetcher will try to deduce it from the spark-conf

eg:
<params>
<event_log_size_limit_in_mb>500</event_log_size_limit_in_mb>
<event_log_location_uri>webhdfs://localhost:50070/system/spark-history</event_log_location_uri>
</params>
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
</fetcher>
</fetchers>
40 changes: 40 additions & 0 deletions app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.fetchers

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkFSFetcher

/**
* Wraps the SparkFSFetcher which has the actual logic to comply to the new SparkApplicationData interface
* @param fetcherConfigurationData
*/
class FSFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
lazy val legacyFetcher = new SparkFSFetcher(fetcherConfigurationData)

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
val legacyData = legacyFetcher.fetchData(analyticJob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the legacy package and other legacy classes in it. Since we will be using the file system fetcher, so it's no longer a legacy code. We should instead have all the relevant classes in this fetchers package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way SparkFSFetcher reads the event logs needs to be revisioned to not rely on older API like replaybus. That's why I have kept it as legacy for now. I will fix those and then move it to fetchers in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. That makes sense.
Thank you.

LegacyDataConverters.convert(legacyData)
}
}

object FSFetcher {
}
74 changes: 42 additions & 32 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
Expand All @@ -36,17 +37,21 @@ import org.apache.spark.SparkConf
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
import SparkFetcher._
import Async.{async, await}
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])

val eventLogUri = Option(fetcherConfigurationData.getParamMap.get(LOG_LOCATION_URI_XML_FIELD))
logger.info("The event log location of Spark application is set to " + eventLogUri)

private[fetchers] lazy val hadoopConfiguration: Configuration = new Configuration()

private[fetchers] lazy val sparkUtils: SparkUtils = SparkUtils

private[fetchers] lazy val sparkConf: SparkConf = {
val sparkConf = new SparkConf()
sparkUtils.getDefaultPropertiesFile(sparkUtils.defaultEnv) match {
sparkUtils.getDefaultPropertiesFile() match {
case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename))
case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR")
}
Expand All @@ -65,25 +70,51 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
private[fetchers] lazy val sparkRestClient: SparkRestClient = new SparkRestClient(sparkConf)

private[fetchers] lazy val sparkLogClient: SparkLogClient = {
new SparkLogClient(hadoopConfiguration, sparkConf)
new SparkLogClient(hadoopConfiguration, sparkConf, eventLogUri)
}

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
doFetchData(analyticJob) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this match block is not needed? Just call doFetchData method which either returns SparkApplicationData or throws an exception if an error occurs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

case Success(data) => data
case Failure(e) => throw e
}
}

private def doFetchData(analyticJob: AnalyticJob): Try[SparkApplicationData] = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
try {
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId, eventLogSource),
DEFAULT_TIMEOUT)
} catch {
case NonFatal(e) =>
Try {
Await.result(doFetchDataUsingRestAndLogClients(analyticJob), DEFAULT_TIMEOUT)
}.transform(
data => {
logger.info(s"Succeeded fetching data for ${appId}")
Success(data)
},
e => {
logger.error(s"Failed fetching data for ${appId}", e)
throw e
Failure(e)
}
)
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
}

}

object SparkFetcher {
import Async.{async, await}

sealed trait EventLogSource

Expand All @@ -97,27 +128,6 @@ object SparkFetcher {
}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(30, SECONDS)

private def doFetchData(
sparkRestClient: SparkRestClient,
sparkLogClient: SparkLogClient,
appId: String,
eventLogSource: EventLogSource
)(
implicit ec: ExecutionContext
): Future[SparkApplicationData] = async {
val restDerivedData = await(sparkRestClient.fetchData(
appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
}
val DEFAULT_TIMEOUT = Duration(60, SECONDS)
val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
}
Loading