Skip to content

Commit

Permalink
Merge pull request apache#494 from tyro89/worker_registration_issue
Browse files Browse the repository at this point in the history
Issue with failed worker registrations

I've been going through the spark source after having some odd issues with workers dying and not coming back. After some digging (I'm very new to scala and spark) I believe I've found a worker registration issue. It looks to me like a failed registration follows the same code path as a successful registration which end up with workers believing they are connected (since they received a `RegisteredWorker` event) even tho they are not registered on the Master.

This is a quick fix that I hope addresses this issue (assuming I didn't completely miss-read the code and I'm about to look like a silly person :P)

I'm opening this pr now to start a chat with you guys while I do some more testing on my side :)

Author: Erik Selin <erik.selin@jadedpixel.com>

== Merge branch commits ==

commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 23:36:12 2014 -0500

    break logwarning into two lines to respect line character limit.

commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 21:16:21 2014 -0500

    add log warning when worker registration fails due to attempt to re-register on same address.

commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 21:23:26 2014 -0500

    address code style comment

commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 16:01:42 2014 -0500

    Make a failed registration not persist, not send a `RegisteredWordker` event and not run `schedule` but rather send a `RegisterWorkerFailed` message to the worker attempting to register.
  • Loading branch information
Erik Selin authored and pwendell committed Jan 29, 2014
1 parent 7930209 commit 0ff38c2
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerWebUiPort, publicAddress)
registerWorker(worker)
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
}
}
}

Expand Down Expand Up @@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}

def registerWorker(worker: WorkerInfo): Unit = {
def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
Expand All @@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return
return false
}

workers += worker
idToWorker(worker.id) = worker
actorToWorker(worker.actor) = worker
addressToWorker(workerAddress) = worker
true
}

def removeWorker(worker: WorkerInfo) {
Expand Down

0 comments on commit 0ff38c2

Please sign in to comment.