diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 2c1a4e2f5d3a1..5cb9a50a3d25c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -145,12 +145,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId + + logInfo("removed executorId %s from blockManagerIdByExecutor".format(blockManagerId.executorId)) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) + + logInfo("removed blockManagerId %s from blockManagerInfo".format(blockManagerId)) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -160,6 +165,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act blockLocations.remove(locations) } } + + logInfo("done with remove "+blockManagerId) } private def expireDeadHosts() { @@ -180,6 +187,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) + logInfo("removed executor " + execId + " from BlockManagerMaster.") } private def heartBeat(blockManagerId: BlockManagerId): Boolean = { @@ -223,18 +231,25 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id)) + if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => // A block manager of the same executor already exists. // This should never happen. Let's just quit. - logError("Got two different block manager registrations on " + id.executorId) - System.exit(1) + logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager) + removeExecutor(id.executorId) case None => - blockManagerIdByExecutor(id.executorId) = id + logInfo("about to register new id "+id) } - blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( - id, System.currentTimeMillis(), maxMemSize, slaveActor) + + blockManagerIdByExecutor(id.executorId) = id + logInfo("Added %s to blockManagerIdByExecutor".format(id.executorId)) + + val info = new BlockManagerMasterActor.BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) + blockManagerInfo(id) = info + logInfo("Added %s, %s to blockManagerInfo".format(id, info)) } }