Skip to content

Commit

Permalink
cancel pending allocate requests by taking locality preference into a…
Browse files Browse the repository at this point in the history
…ccount
  • Loading branch information
Ngone51 committed Dec 18, 2018
1 parent 5960a82 commit 23f809c
Showing 1 changed file with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ private[yarn] class YarnAllocator(
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

if (missing > 0) {
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
Expand All @@ -306,15 +315,6 @@ private[yarn] class YarnAllocator(
logInfo(requestContainerMessage)
}

// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)

// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
Expand Down Expand Up @@ -374,14 +374,18 @@ private[yarn] class YarnAllocator(
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = {
if (staleRequests.size >= numToCancel) {
staleRequests.take(numToCancel)
} else if (staleRequests.size + anyHostRequests.size >= numToCancel) {
staleRequests ++ anyHostRequests.take(numToCancel - staleRequests.size)
} else {
staleRequests ++ anyHostRequests ++
localRequests.take(numToCancel - staleRequests.size - anyHostRequests.size)
}
}
cancelRequests.foreach(amClient.removeContainerRequest)
}
}

Expand Down

0 comments on commit 23f809c

Please sign in to comment.