From 23f809c2ac7869de47942c020db4939574ee8dac Mon Sep 17 00:00:00 2001 From: Ngone51 Date: Tue, 18 Dec 2018 17:09:58 +0800 Subject: [PATCH 1/2] cancel pending allocate requests by taking locality preference into account --- .../spark/deploy/yarn/YarnAllocator.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d37d0d66d8ae2..0c036c451a10f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -294,6 +294,15 @@ private[yarn] class YarnAllocator( s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " + s"executorsStarting: ${numExecutorsStarting.get}") + // Split the pending container request into three groups: locality matched list, locality + // unmatched list and non-locality list. Take the locality matched container request into + // consideration of container placement, treat as allocated containers. + // For locality unmatched and locality free container requests, cancel these container + // requests, since required locality preference has been changed, recalculating using + // container placement strategy. + val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( + hostToLocalTaskCounts, pendingAllocate) + if (missing > 0) { if (log.isInfoEnabled()) { var requestContainerMessage = s"Will request $missing executor container(s), each with " + @@ -306,15 +315,6 @@ private[yarn] class YarnAllocator( logInfo(requestContainerMessage) } - // Split the pending container request into three groups: locality matched list, locality - // unmatched list and non-locality list. Take the locality matched container request into - // consideration of container placement, treat as allocated containers. - // For locality unmatched and locality free container requests, cancel these container - // requests, since required locality preference has been changed, recalculating using - // container placement strategy. - val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( - hostToLocalTaskCounts, pendingAllocate) - // cancel "stale" requests for locations that are no longer needed staleRequests.foreach { stale => amClient.removeContainerRequest(stale) @@ -374,14 +374,18 @@ private[yarn] class YarnAllocator( val numToCancel = math.min(numPendingAllocate, -missing) logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + s"total $targetNumExecutors executors.") - - val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) - if (!matchingRequests.isEmpty) { - matchingRequests.iterator().next().asScala - .take(numToCancel).foreach(amClient.removeContainerRequest) - } else { - logWarning("Expected to find pending requests, but found none.") + // cancel pending allocate requests by taking locality preference into account + val cancelRequests = { + if (staleRequests.size >= numToCancel) { + staleRequests.take(numToCancel) + } else if (staleRequests.size + anyHostRequests.size >= numToCancel) { + staleRequests ++ anyHostRequests.take(numToCancel - staleRequests.size) + } else { + staleRequests ++ anyHostRequests ++ + localRequests.take(numToCancel - staleRequests.size - anyHostRequests.size) + } } + cancelRequests.foreach(amClient.removeContainerRequest) } } From d29eb7a7212d84bb8f3d3122bde91863408fb9a1 Mon Sep 17 00:00:00 2001 From: Ngone51 Date: Wed, 19 Dec 2018 09:24:03 +0800 Subject: [PATCH 2/2] address comment --- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0c036c451a10f..54b1ec266113f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -375,16 +375,7 @@ private[yarn] class YarnAllocator( logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + s"total $targetNumExecutors executors.") // cancel pending allocate requests by taking locality preference into account - val cancelRequests = { - if (staleRequests.size >= numToCancel) { - staleRequests.take(numToCancel) - } else if (staleRequests.size + anyHostRequests.size >= numToCancel) { - staleRequests ++ anyHostRequests.take(numToCancel - staleRequests.size) - } else { - staleRequests ++ anyHostRequests ++ - localRequests.take(numToCancel - staleRequests.size - anyHostRequests.size) - } - } + val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) cancelRequests.foreach(amClient.removeContainerRequest) } }