Skip to content

Commit

Permalink
Removing blocking keyword (#361)
Browse files Browse the repository at this point in the history
Removing blocking keyword to ensure that a large number of threads are not created if some of the threads are blocked.
  • Loading branch information
skakker authored and akshayrai committed Apr 6, 2018
1 parent a0470a3 commit 50a7409
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 51 deletions.
34 changes: 17 additions & 17 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.linkedin.drelephant.spark.fetchers

import java.util.concurrent.TimeoutException

import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Failure, Success, Try}
import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
Expand Down Expand Up @@ -110,37 +110,37 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future {
blocking {
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}
SparkApplicationData(appId, restDerivedData, logDerivedData)
}

}

object SparkFetcher {

sealed trait EventLogSource

object EventLogSource {

/** Fetch event logs through REST API. */
case object Rest extends EventLogSource

/** Fetch event logs through WebHDFS. */
case object WebHdfs extends EventLogSource

/** Event logs are not available. */
case object None extends EventLogSource

}

val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import java.io.InputStream
import java.security.PrivilegedAction

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

import com.linkedin.drelephant.security.HadoopSecurity
Expand Down Expand Up @@ -62,9 +62,8 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e
val (eventLogPath, eventLogCodec) =
sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)

Future { blocking {
Future {
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
}
}
Expand Down
53 changes: 22 additions & 31 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.{Calendar, SimpleTimeZone}
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkDataCollection

import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand Down Expand Up @@ -81,38 +81,29 @@ class SparkRestClient(sparkConf: SparkConf) {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)

Future {
blocking {
val futureJobDatas = Future {
blocking {
getJobDatas(attemptTarget)
}
}
val futureStageDatas = Future {
blocking {
getStageDatas(attemptTarget)
}
}
val futureExecutorSummaries = Future {
blocking {
getExecutorSummaries(attemptTarget)
}
val futureJobDatas = Future {
getJobDatas(attemptTarget)
}
val futureStageDatas = Future {
getStageDatas(attemptTarget)
}
val futureExecutorSummaries = Future {
getExecutorSummaries(attemptTarget)
}
val futureLogData = if (fetchLogs) {
Future {
getLogData(attemptTarget)
}
val futureLogData = if (fetchLogs) {
Future {
blocking {
getLogData(attemptTarget)
}
}
} else Future.successful(None)
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Await.result(futureLogData, Duration(5, SECONDS))
)

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Await.result(futureLogData, Duration(5, SECONDS))
)
}
}
}

Expand Down

0 comments on commit 50a7409

Please sign in to comment.