Skip to content

Commit

Permalink
Simplify the request interface by asking for a total
Browse files Browse the repository at this point in the history
This means we do not distinguish between the number of executors
pending vs running. We only ever used to add up the two anyway,
so functionally it makes no difference. The interface exposed to
the scheduler backend is also simpler to reason about.
  • Loading branch information
Andrew Or committed Oct 25, 2014
1 parent 04f625b commit 79aa2df
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,12 @@ private[spark] object CoarseGrainedClusterMessages {
case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
extends CoarseGrainedClusterMessage

// Messages exchanged between the driver and the cluster manager for dynamic executor allocation
// Messages exchanged between the driver and the cluster manager for executor allocation

case object RegisterClusterManager extends CoarseGrainedClusterMessage

// Request as many executors as needed to meet the specified number of pending executors
// Additionally, convey the existing number of executors to avoid allocating too many new ones
case class RequestPendingExecutors(
numPendingExecutors: Int,
numExistingExecutors: Int)
extends CoarseGrainedClusterMessage
// Request executors by specifying the new total number of executors desired
case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage

case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logInfo(s"Requesting $numAdditionalExecutors additional executors from the cluster manager")
logDebug(s"Number of pending executors is now $numPendingExecutors")
numPendingExecutors += numAdditionalExecutors
requestPendingExecutors(numPendingExecutors)
requestTotalExecutors(numPendingExecutors + totalRegisteredExecutors.get)
}

/**
* Send a request to the cluster manager to set the number of pending executors desired.
* Request executors from the cluster manager by specifying the total number desired.
*
* The semantics here guarantee that we do not over-allocate executors for this application,
* since a later request overrides the value of any prior request. The alternative interface
Expand All @@ -303,7 +303,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
*
* Return whether the request successfully reaches the cluster manager.
*/
protected def requestPendingExecutors(numPendingExecutors: Int): Boolean = false
protected def requestTotalExecutors(requestedTotal: Int): Boolean = false

/**
* Kill the given executor through the cluster manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ private[spark] abstract class YarnSchedulerBackend(
private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)

/**
* Request the given number of executors from the ApplicationMaster.
* Request executors from the ApplicationMaster by specifying the total number desired.
*/
override def requestPendingExecutors(numPendingExecutors: Int): Boolean = synchronized {
AkkaUtils.askWithAck(
RequestPendingExecutors(numPendingExecutors, totalRegisteredExecutors.get),
yarnSchedulerActor,
askTimeout)
override def requestTotalExecutors(requestedTotal: Int): Boolean = {
AkkaUtils.askWithReply[Boolean](
RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
}

/**
* Request the ApplicationMaster to kill the specified executors.
*/
override def killExecutors(executorIds: Seq[String]): Boolean = {
AkkaUtils.askWithAck(KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
AkkaUtils.askWithReply[Boolean](
KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
}

override def sufficientResourcesRegistered(): Boolean = {
Expand Down Expand Up @@ -107,23 +106,23 @@ private[spark] abstract class YarnSchedulerBackend(
logInfo(s"ApplicationMaster registered as $sender")
amActor = Some(sender)

case r: RequestPendingExecutors =>
val acked = amActor match {
case Some(actor) => AkkaUtils.askWithAck(r, actor, askTimeout)
case None => logWarning(
"Attempted to request executors before the ApplicationMaster has registered!")
false
case r: RequestExecutors =>
amActor match {
case Some(actor) =>
sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
sender ! false
}
sender ! acked

case k: KillExecutors =>
val acked = amActor match {
case Some(actor) => AkkaUtils.askWithAck(k, actor, askTimeout)
case None => logWarning(
"Attempted to kill executors before the ApplicationMaster has registered!")
false
amActor match {
case Some(actor) =>
sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
case None =>
logWarning("Attempted to kill executors before the AM has registered!")
sender ! false
}
sender ! acked

case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
Expand Down
15 changes: 0 additions & 15 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,6 @@ private[spark] object AkkaUtils extends Logging {
conf.getInt("spark.akka.retry.wait", 3000)
}

/**
* Send a message to the given actor and return whether the message was acknowledged.
* If an exception is thrown during the ask, return false.
*/
def askWithAck(message: Any, actor: ActorRef, timeout: FiniteDuration): Boolean = {
try {
askWithReply(message, actor, timeout)
true
} catch {
case e: SparkException =>
logError(s"Exception when asking actor with acknowledgment", e)
false
}
}

/**
* Send a message to the given actor and get its result within a default timeout, or
* throw a SparkException if this fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,26 +479,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
userThread
}

/**
* Send a request to the ResourceManager to set the number of pending executors.
*/
private def requestPendingExecutors(numPendingExecutors: Int, numExistingExecutors: Int): Unit = {
Option(allocator) match {
case Some(a) => a.requestPendingExecutors(numPendingExecutors, numExistingExecutors)
case None => logWarning("Container allocator is not ready to request executors yet.")
}
}

/**
* Send a request to the ResourceManager to kill the containers running the specified executors.
*/
private def killExecutors(executorIds: Seq[String]): Unit = {
Option(allocator) match {
case Some(a) => executorIds.foreach(a.killExecutor)
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
}

/**
* Actor that communicates with the driver in client deploy mode.
*/
Expand All @@ -524,14 +504,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
logInfo(s"Add WebUI Filter. $x")
driver ! x

case RequestPendingExecutors(numPendingExecutors, numExistingExecutors) =>
logInfo(s"Driver requested $numPendingExecutors executors to be pending.")
requestPendingExecutors(numPendingExecutors, numExistingExecutors)
case RequestExecutors(requestedTotal) =>
logInfo(s"Driver requested $requestedTotal total number of executors.")
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
}
sender ! true

case KillExecutors(executorIds) =>
logInfo(s"Driver requested to kill executors ${executorIds.mkString(", ")}.")
killExecutors(executorIds)
Option(allocator) match {
case Some(a) => executorIds.foreach(a.killExecutor)
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
sender ! true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,19 @@ private[yarn] abstract class YarnAllocator(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue

/**
* Request a number of executors from the RM such that the total number pending for this
* application is at least the value specified.
* Request as many executors from the ResourceManager as needed to reach the desired total.
*/
def requestPendingExecutors(
numPendingRequested: Int,
numExistingExecutors: Int): Unit = synchronized {
// Take into account the executors already pending and running
val requestedTotalExecutors = numPendingRequested + numExistingExecutors
val currentTotalExecutors = numPendingAllocate.get + numExecutorsRunning.get
if (requestedTotalExecutors > currentTotalExecutors) {
maxExecutors += (requestedTotalExecutors - currentTotalExecutors)
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
if (requestedTotal > currentTotal) {
maxExecutors += (requestedTotal - currentTotal)
// We need to call `allocateResources` here to avoid the following race condition:
// If we request executors twice before `allocateResources` is called, then we will end up
// double counting the number requested because `numPendingAllocate` is not updated yet.
allocateResources()
} else {
logInfo(s"Not allocating more executors because there are already $currentTotalExecutors " +
s"pending and running executors (application requested $requestedTotalExecutors total)")
logInfo(s"Not allocating more executors because there are already $currentTotal " +
s"(application requested $requestedTotal total)")
}
}

Expand Down

0 comments on commit 79aa2df

Please sign in to comment.