From 7a4deea8552664cc53db1e20876b42631d36fae3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 06:54:11 +0900 Subject: [PATCH] Modified Master.scala to use numWorkersVisited and numWorkersAlive instead of stopPos --- .../apache/spark/deploy/master/Master.scala | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d52d720c00780..432b552c58cd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -489,28 +489,24 @@ private[spark] class Master( // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) - val aliveWorkerNum = shuffledAliveWorkers.size - - if (aliveWorkerNum > 0) { - var curPos = 0 - var stopPos = aliveWorkerNum - for (driver <- waitingDrivers.toList) { - // iterate over a copy of waitingDrivers - // We assign workers to each waiting driver in a round-robin fashion. For each driver, we - // start from the last worker that was assigned a driver, and continue onwards until we have - // explored all alive workers. - var launched = false - while (curPos != stopPos && !launched) { - val worker = shuffledAliveWorkers(curPos) - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - launched = true - } - curPos = (curPos + 1) % aliveWorkerNum + val numWorkersAlive = shuffledAliveWorkers.size + var curPos = 0 + + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { + val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + launched = true } - curPos = (stopPos + 1) % aliveWorkerNum - stopPos = curPos + aliveWorkerNum + curPos = (curPos + 1) % numWorkersAlive } }