Skip to content

Commit

Permalink
Spark fetcher is now able to fetch event logs via REST API (#225)
Browse files Browse the repository at this point in the history
This commit allows Dr. Elephant to fetch Spark logs without universal
read access to eventLog.dir on HDFS. SparkFetcher would use SparkRestClient
instead of SparkLogClient if configured as

    <params>
      <use_rest_for_eventlogs>true</use_rest_for_eventlogs>
    </params>

The default behaviour is to fetch the logs via SparkLogClient/WebHDFS.
  • Loading branch information
superbobry authored and akshayrai committed Apr 12, 2017
1 parent 5a98701 commit 8e4a094
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ case class SparkRestDerivedData(
applicationInfo: ApplicationInfo,
jobDatas: Seq[JobData],
stageDatas: Seq[StageData],
executorSummaries: Seq[ExecutorSummary]
)
executorSummaries: Seq[ExecutorSummary],
private[spark] val logDerivedData: Option[SparkLogDerivedData] = None)
52 changes: 37 additions & 15 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.Try
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.SparkUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.Logger
Expand Down Expand Up @@ -54,18 +53,27 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
sparkConf
}

private[fetchers] lazy val eventLogSource: EventLogSource = {
val eventLogEnabled = sparkConf.getBoolean(SPARK_EVENT_LOG_ENABLED_KEY, false)
val useRestForLogs = Option(fetcherConfigurationData.getParamMap.get("use_rest_for_eventlogs"))
.exists(_.toBoolean)
if (!eventLogEnabled) {
EventLogSource.None
} else if (useRestForLogs) EventLogSource.Rest else EventLogSource.WebHdfs
}

private[fetchers] lazy val sparkRestClient: SparkRestClient = new SparkRestClient(sparkConf)

private[fetchers] lazy val sparkLogClient: Option[SparkLogClient] = {
val eventLogEnabled = sparkConf.getBoolean(SPARK_EVENT_LOG_ENABLED_KEY, false)
if (eventLogEnabled) Some(new SparkLogClient(hadoopConfiguration, sparkConf)) else None
private[fetchers] lazy val sparkLogClient: SparkLogClient = {
new SparkLogClient(hadoopConfiguration, sparkConf)
}

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
try {
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId), DEFAULT_TIMEOUT)
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId, eventLogSource),
DEFAULT_TIMEOUT)
} catch {
case NonFatal(e) =>
logger.error(s"Failed fetching data for ${appId}", e)
Expand All @@ -77,23 +85,37 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
object SparkFetcher {
import Async.{async, await}

sealed trait EventLogSource

object EventLogSource {
/** Fetch event logs through REST API. */
case object Rest extends EventLogSource
/** Fetch event logs through WebHDFS. */
case object WebHdfs extends EventLogSource
/** Event logs are not available. */
case object None extends EventLogSource
}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(30, SECONDS)

private def doFetchData(
sparkRestClient: SparkRestClient,
sparkLogClient: Option[SparkLogClient],
appId: String
sparkLogClient: SparkLogClient,
appId: String,
eventLogSource: EventLogSource
)(
implicit ec: ExecutionContext
): Future[SparkApplicationData] = async {
val restDerivedData = await(sparkRestClient.fetchData(appId))
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId

// Would use .map but await doesn't like that construction.
val logDerivedData = sparkLogClient match {
case Some(sparkLogClient) => Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
case None => None
val restDerivedData = await(sparkRestClient.fetchData(
appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf) {
val logPath = getLogPath(webhdfsEventLogUri, appId, attemptId, compressionCodecShortName)
logger.info(s"looking for logs at ${logPath}")

val codec = compressionCodecForLogPath(sparkConf, logPath)
val codec = compressionCodecForLogName(sparkConf, logPath.getName)

// Limit scope of async.
async {
Expand Down Expand Up @@ -189,10 +189,10 @@ object SparkLogClient {
new BufferedInputStream(fs.open(logPath))
}

private def compressionCodecForLogPath(conf: SparkConf, logPath: Path): Option[CompressionCodec] = {
private[fetchers] def compressionCodecForLogName(conf: SparkConf, logName: String): Option[CompressionCodec] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logBaseName = logPath.getName.stripSuffix(IN_PROGRESS)
val logBaseName = logName.stripSuffix(IN_PROGRESS)
logBaseName.split("\\.").tail.lastOption.map { codecName =>
compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(conf, codecName))
}
Expand Down
62 changes: 53 additions & 9 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.BufferedInputStream
import java.net.URI
import java.text.SimpleDateFormat
import java.util.zip.ZipInputStream
import java.util.{Calendar, SimpleTimeZone}

import scala.async.Async
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.linkedin.drelephant.spark.data.SparkRestDerivedData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo, ExecutorSummary, JobData, StageData}
import com.linkedin.drelephant.spark.data.{SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary, JobData, StageData}
import javax.ws.rs.client.{Client, ClientBuilder, WebTarget}
import javax.ws.rs.core.MediaType

import org.apache.log4j.Logger
import org.apache.spark.SparkConf


/**
* A client for getting data from the Spark monitoring REST API, e.g. <https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api>.
*
Expand Down Expand Up @@ -67,24 +68,34 @@ class SparkRestClient(sparkConf: SparkConf) {

private val apiTarget: WebTarget = client.target(historyServerUri).path(API_V1_MOUNT_PATH)

def fetchData(appId: String)(implicit ec: ExecutionContext): Future[SparkRestDerivedData] = {
def fetchData(appId: String, fetchLogs: Boolean = false)(
implicit ec: ExecutionContext
): Future[SparkRestDerivedData] = {
val appTarget = apiTarget.path(s"applications/${appId}")
logger.info(s"calling REST API at ${appTarget.getUri}")

val applicationInfo = getApplicationInfo(appTarget)

// Limit scope of async.
// These are pure and cannot fail, therefore it is safe to have
// them outside of the async block.
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)

// Limit the scope of async.
async {
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)
val futureJobDatas = async { getJobDatas(attemptTarget) }
val futureStageDatas = async { getStageDatas(attemptTarget) }
val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) }
val futureLogData = if (fetchLogs) {
async { getLogData(attemptTarget)}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries)
await(futureExecutorSummaries),
await(futureLogData)
)
}
}
Expand All @@ -100,6 +111,39 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}

private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = {
val target = attemptTarget.path("logs")
logger.info(s"calling REST API at ${target.getUri} to get eventlogs")

// The logs are stored in a ZIP archive with a single entry.
// It should be named as "$logPrefix.$archiveExtension", but
// we trust Spark to get it right.
resource.managed { getApplicationLogs(target) }.acquireAndGet { zis =>
val entry = zis.getNextEntry
if (entry == null) {
logger.warn(s"failed to resolve log for ${target.getUri}")
None
} else {
val codec = SparkLogClient.compressionCodecForLogName(sparkConf, entry.getName)
Some(SparkLogClient.findDerivedData(
codec.map { _.compressedInputStream(zis) }.getOrElse(zis)))
}
}
}

private def getApplicationLogs(logTarget: WebTarget): ZipInputStream = {
try {
val is = logTarget.request(MediaType.APPLICATION_OCTET_STREAM)
.get(classOf[java.io.InputStream])
new ZipInputStream(new BufferedInputStream(is))
} catch {
case NonFatal(e) => {
logger.error(s"error reading logs ${logTarget.getUri}", e)
throw e
}
}
}

private def getJobDatas(attemptTarget: WebTarget): Seq[JobData] = {
val target = attemptTarget.path("jobs")
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
applicationInfo,
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries
executorSummaries = executorSummaries
)
}

Expand Down
58 changes: 51 additions & 7 deletions test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package com.linkedin.drelephant.spark.fetchers
import java.io.{File, FileOutputStream, InputStream, OutputStream}
import java.util.Date

import scala.collection.JavaConverters
import scala.concurrent.{ExecutionContext, Future}

import com.google.common.io.Files
import com.linkedin.drelephant.analysis.{AnalyticJob, ApplicationType}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.{SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.SparkFetcher.EventLogSource
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo}
import com.linkedin.drelephant.util.SparkUtils
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -69,7 +69,7 @@ class SparkFetcherTest extends FunSpec with Matchers {
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf = new SparkConf()
override lazy val sparkRestClient = newFakeSparkRestClient(appId, Future(restDerivedData))
override lazy val sparkLogClient = Some(newFakeSparkLogClient(appId, Some("2"), Future(logDerivedData)))
override lazy val sparkLogClient = newFakeSparkLogClient(appId, Some("2"), Future(logDerivedData))
}
val data = sparkFetcher.fetchData(analyticJob)
data.appId should be(appId)
Expand All @@ -79,7 +79,7 @@ class SparkFetcherTest extends FunSpec with Matchers {
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf = new SparkConf()
override lazy val sparkRestClient = newFakeSparkRestClient(appId, Future { throw new Exception() })
override lazy val sparkLogClient = Some(newFakeSparkLogClient(appId, Some("2"), Future(logDerivedData)))
override lazy val sparkLogClient = newFakeSparkLogClient(appId, Some("2"), Future(logDerivedData))
}

an[Exception] should be thrownBy { sparkFetcher.fetchData(analyticJob) }
Expand All @@ -88,8 +88,9 @@ class SparkFetcherTest extends FunSpec with Matchers {
it("throws an exception if the log client fails") {
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf = new SparkConf()
.set(SparkFetcher.SPARK_EVENT_LOG_ENABLED_KEY, "true")
override lazy val sparkRestClient = newFakeSparkRestClient(appId, Future(restDerivedData))
override lazy val sparkLogClient = Some(newFakeSparkLogClient(appId, Some("2"), Future { throw new Exception() }))
override lazy val sparkLogClient = newFakeSparkLogClient(appId, Some("2"), Future { throw new Exception() })
}

an[Exception] should be thrownBy { sparkFetcher.fetchData(analyticJob) }
Expand Down Expand Up @@ -150,16 +151,59 @@ class SparkFetcherTest extends FunSpec with Matchers {
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkUtils = new SparkUtils() { override val defaultEnv = Map.empty[String, String] }
}

an[IllegalStateException] should be thrownBy { sparkFetcher.sparkConf }
}

it("eventlog source defaults to WebHDFS") {
val fetcherConfigurationData = newFakeFetcherConfigurationData()
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf: SparkConf = new SparkConf()
.set(SparkFetcher.SPARK_EVENT_LOG_ENABLED_KEY, "true")
}

sparkFetcher.eventLogSource should be(EventLogSource.WebHdfs)
}

it("eventlog source is WebHDFS if use_rest_for_eventlogs is false") {
val fetcherConfigurationData = newFakeFetcherConfigurationData(
Map("use_rest_for_eventlogs" -> "false"))
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf: SparkConf = new SparkConf()
.set(SparkFetcher.SPARK_EVENT_LOG_ENABLED_KEY, "true")
}

sparkFetcher.eventLogSource should be(EventLogSource.WebHdfs)
}

it("eventlog source is REST if use_rest_for_eventlogs is true") {
val fetcherConfigurationData = newFakeFetcherConfigurationData(
Map("use_rest_for_eventlogs" -> "true"))
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf: SparkConf = new SparkConf()
.set(SparkFetcher.SPARK_EVENT_LOG_ENABLED_KEY, "true")
}

sparkFetcher.eventLogSource should be(EventLogSource.Rest)
}

it("eventlog fetching is disabled when spark.eventLog is false") {
val fetcherConfigurationData = newFakeFetcherConfigurationData()
val sparkFetcher = new SparkFetcher(fetcherConfigurationData) {
override lazy val sparkConf: SparkConf = new SparkConf()
.set(SparkFetcher.SPARK_EVENT_LOG_ENABLED_KEY, "false")
}

sparkFetcher.eventLogSource should be(EventLogSource.None)
}
}
}

object SparkFetcherTest {
import JavaConverters._
import scala.collection.JavaConverters._

def newFakeFetcherConfigurationData(): FetcherConfigurationData =
new FetcherConfigurationData(classOf[SparkFetcher].getName, new ApplicationType("SPARK"), Map.empty.asJava)
def newFakeFetcherConfigurationData(paramMap: Map[String, String] = Map.empty): FetcherConfigurationData =
new FetcherConfigurationData(classOf[SparkFetcher].getName, new ApplicationType("SPARK"), paramMap.asJava)

def newFakeApplicationAttemptInfo(
attemptId: Option[String],
Expand Down
Loading

0 comments on commit 8e4a094

Please sign in to comment.