Skip to content

Commit

Permalink
In long running contexts, we encountered the situation of double regi…
Browse files Browse the repository at this point in the history
…ster without a remove in between. The cause for that is unknown, and assumed a temp network issue.

However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us.

The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones.

Also - added some logging for register and unregister.
  • Loading branch information
tsliwowicz committed Oct 12, 2014
1 parent 4322c0b commit efd93f2
Showing 1 changed file with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act

private def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId

logInfo("removed executorId %s from blockManagerIdByExecutor".format(blockManagerId.executorId))

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)

logInfo("removed blockManagerId %s from blockManagerInfo".format(blockManagerId))

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
Expand All @@ -160,6 +165,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
blockLocations.remove(locations)
}
}

logInfo("done with remove "+blockManagerId)
}

private def expireDeadHosts() {
Expand All @@ -180,6 +187,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
logInfo("removed executor " + execId + " from BlockManagerMaster.")
}

private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
Expand Down Expand Up @@ -223,18 +231,25 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id))

if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
// A block manager of the same executor already exists.
// This should never happen. Let's just quit.
logError("Got two different block manager registrations on " + id.executorId)
System.exit(1)
logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager)
removeExecutor(id.executorId)
case None =>
blockManagerIdByExecutor(id.executorId) = id
logInfo("about to register new id "+id)
}
blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)

blockManagerIdByExecutor(id.executorId) = id
logInfo("Added %s to blockManagerIdByExecutor".format(id.executorId))

val info = new BlockManagerMasterActor.BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
blockManagerInfo(id) = info
logInfo("Added %s, %s to blockManagerInfo".format(id, info))
}
}

Expand Down

0 comments on commit efd93f2

Please sign in to comment.