Skip to content

Commit

Permalink
[SPARK-4006] In long running contexts, we encountered the situation o…
Browse files Browse the repository at this point in the history
…f double registe...

...r without a remove in between. The cause for that is unknown, and assumed a temp network issue.

However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us.

The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones.

Also - added some logging for register and unregister.

This is just like #2854 except it's on master

Author: Tal Sliwowicz <tal.s@taboola.com>

Closes #2886 from tsliwowicz/master-block-mgr-removal and squashes the following commits:

094d508 [Tal Sliwowicz] some more white space change undone
41a2217 [Tal Sliwowicz] some more whitspaces change undone
7bcfc3d [Tal Sliwowicz] whitspaces fix
df9d98f [Tal Sliwowicz] Code review comments fixed
f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue.

(cherry picked from commit 6b48522)

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
  • Loading branch information
tsliwowicz committed Oct 23, 2014
1 parent 5e191fa commit d122236
Showing 1 changed file with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
}

private def expireDeadHosts() {
Expand Down Expand Up @@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
// A block manager of the same executor already exists.
// This should never happen. Let's just quit.
logError("Got two different block manager registrations on " + id.executorId)
System.exit(1)
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
blockManagerIdByExecutor(id.executorId) = id
}

logInfo("Registering block manager %s with %s RAM".format(
id.hostPort, Utils.bytesToString(maxMemSize)))

blockManagerInfo(id) =
new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxMemSize), id))

blockManagerIdByExecutor(id.executorId) = id

blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
}
Expand Down

0 comments on commit d122236

Please sign in to comment.