Skip to content

Commit

Permalink
Modified Master.scala to use numWorkersVisited and numWorkersAlive in…
Browse files Browse the repository at this point in the history
…stead of stopPos
  • Loading branch information
sarutak committed Sep 17, 2014
1 parent 4e51e35 commit 7a4deea
Showing 1 changed file with 17 additions and 21 deletions.
38 changes: 17 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,28 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size

if (aliveWorkerNum > 0) {
var curPos = 0
var stopPos = aliveWorkerNum
for (driver <- waitingDrivers.toList) {
// iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
while (curPos != stopPos && !launched) {
val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (stopPos + 1) % aliveWorkerNum
stopPos = curPos + aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down

0 comments on commit 7a4deea

Please sign in to comment.