From bc91bb15a9b667582af86932ffdc6a4f5ad45794 Mon Sep 17 00:00:00 2001 From: WangTao Date: Sat, 6 Sep 2014 10:54:34 +0800 Subject: [PATCH] Avoid shuffle every time we schedule the driver using round robin --- .../org/apache/spark/deploy/master/Master.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 3cc8edd4645a0..ee7bb356867ff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -481,17 +481,21 @@ private[spark] class Master( if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications + val shuffledAliveWorkers = Random.shuffle(workers.filter(_.state == WorkerState.ALIVE)) // Randomization helps balance drivers + val aliveWorkerNum = shuffledAliveWorkers.size + var curPos = aliveWorkerNum - 1 for (driver <- List(waitingDrivers: _*)) { - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - val shuffledWorkersIter = shuffledWorkers.iterator + val startFlag = curPos + curPos = (curPos + 1) % aliveWorkerNum var launched = false - while(shuffledWorkersIter.hasNext && !launched) { - val worker = shuffledWorkersIter.next() - if (worker.state == WorkerState.ALIVE && worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + while (curPos != startFlag && !launched) { + val worker = shuffledAliveWorkers(curPos) + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } + curPos = (curPos + 1) % aliveWorkerNum } }