Skip to content

Commit

Permalink
[SPARK-26392][YARN] Cancel pending allocate requests by taking locali…
Browse files Browse the repository at this point in the history
…ty preference into account

## What changes were proposed in this pull request?

Right now, we cancel pending allocate requests by its sending order. I thing we can take

locality preference into account when do this to perfom least impact on task locality preference.

## How was this patch tested?

N.A.

Closes apache#23344 from Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account.

Authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Ngone51 authored and holdenk committed Jan 5, 2019
1 parent f4e4c1a commit ec539d1
Showing 1 changed file with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ private[yarn] class YarnAllocator(
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

if (missing > 0) {
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
Expand All @@ -306,15 +315,6 @@ private[yarn] class YarnAllocator(
logInfo(requestContainerMessage)
}

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
Expand Down Expand Up @@ -374,14 +374,9 @@ private[yarn] class YarnAllocator(
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}

Expand Down

0 comments on commit ec539d1

Please sign in to comment.