-
Notifications
You must be signed in to change notification settings - Fork 858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update #224 to add FSFetcher as a standalone fetcher #232
Changes from 4 commits
cf435dc
7c74161
ec508e7
35b6e33
1bd88d9
280b46c
725a7d2
14083ad
926c974
516b4c7
8b98ae3
ccedbc3
529d5dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,13 @@ package com.linkedin.drelephant.spark.fetchers | |
import scala.async.Async | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
import scala.concurrent.duration.{Duration, SECONDS} | ||
import scala.util.Try | ||
import scala.util.{Try, Success, Failure} | ||
import scala.util.control.NonFatal | ||
|
||
import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher} | ||
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData | ||
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} | ||
import com.linkedin.drelephant.spark.legacyfetchers.FSFetcher | ||
import com.linkedin.drelephant.util.SparkUtils | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.log4j.Logger | ||
|
@@ -37,6 +38,7 @@ import org.apache.spark.SparkConf | |
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) | ||
extends ElephantFetcher[SparkApplicationData] { | ||
import SparkFetcher._ | ||
import Async.{async, await} | ||
import ExecutionContext.Implicits.global | ||
|
||
private val logger: Logger = Logger.getLogger(classOf[SparkFetcher]) | ||
|
@@ -47,7 +49,7 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) | |
|
||
private[fetchers] lazy val sparkConf: SparkConf = { | ||
val sparkConf = new SparkConf() | ||
sparkUtils.getDefaultPropertiesFile(sparkUtils.defaultEnv) match { | ||
sparkUtils.getDefaultPropertiesFile() match { | ||
case Some(filename) => sparkConf.setAll(sparkUtils.getPropertiesFromFile(filename)) | ||
case None => throw new IllegalStateException("can't find Spark conf; please set SPARK_HOME or SPARK_CONF_DIR") | ||
} | ||
|
@@ -62,31 +64,31 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) | |
} | ||
|
||
override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = { | ||
doFetchData(analyticJob) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this match block is not needed? Just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
case Success(data) => data | ||
case Failure(e) => throw e | ||
} | ||
} | ||
|
||
private def doFetchData(analyticJob: AnalyticJob): Try[SparkApplicationData] = { | ||
val appId = analyticJob.getAppId | ||
logger.info(s"Fetching data for ${appId}") | ||
try { | ||
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId), DEFAULT_TIMEOUT) | ||
} catch { | ||
case NonFatal(e) => | ||
Try { | ||
Await.result(doFetchDataUsingRestAndLogClients(analyticJob), DEFAULT_TIMEOUT) | ||
}.transform( | ||
data => { | ||
logger.info(s"Succeeded fetching data for ${appId}") | ||
Success(data) | ||
}, | ||
e => { | ||
logger.error(s"Failed fetching data for ${appId}", e) | ||
throw e | ||
} | ||
Failure(e) | ||
} | ||
) | ||
} | ||
} | ||
|
||
object SparkFetcher { | ||
import Async.{async, await} | ||
|
||
val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled" | ||
val DEFAULT_TIMEOUT = Duration(30, SECONDS) | ||
|
||
private def doFetchData( | ||
sparkRestClient: SparkRestClient, | ||
sparkLogClient: Option[SparkLogClient], | ||
appId: String | ||
)( | ||
implicit ec: ExecutionContext | ||
): Future[SparkApplicationData] = async { | ||
private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async { | ||
val appId = analyticJob.getAppId | ||
val restDerivedData = await(sparkRestClient.fetchData(appId)) | ||
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId | ||
|
||
|
@@ -98,4 +100,11 @@ object SparkFetcher { | |
|
||
SparkApplicationData(appId, restDerivedData, logDerivedData) | ||
} | ||
|
||
} | ||
|
||
object SparkFetcher { | ||
|
||
val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled" | ||
val DEFAULT_TIMEOUT = Duration(60, SECONDS) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This disables MR fetcher by default. Is it intentional or committed by accident?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accident. I disabled it to test the Spark Fetcher alone. Will put it back