From f48bce9cc25fa2672ea36bd90e64854159de8ead Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Tue, 21 Oct 2014 17:29:39 +0300 Subject: [PATCH 1/5] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. --- .../storage/BlockManagerMasterActor.scala | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 088f06e389d83..8b70f3a406489 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -18,21 +18,19 @@ package org.apache.spark.storage import java.util.{HashMap => JHashMap} - import scala.collection.mutable import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.duration._ - import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask - import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} + /** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. @@ -187,12 +185,13 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - + // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -203,6 +202,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) + logInfo("removed " + blockManagerId) } private def expireDeadHosts() { @@ -325,22 +325,23 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val time = System.currentTimeMillis() + if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => - // A block manager of the same executor already exists. - // This should never happen. Let's just quit. - logError("Got two different block manager registrations on " + id.executorId) - System.exit(1) + // A block manager of the same executor already exists so remove it (assumed dead). + logError("Got two different block manager registrations on same executor - " + + " will remove, new Id " + id + ", orig id - " + manager) + removeExecutor(id.executorId) case None => - blockManagerIdByExecutor(id.executorId) = id } - - logInfo("Registering block manager %s with %s RAM".format( - id.hostPort, Utils.bytesToString(maxMemSize))) - - blockManagerInfo(id) = - new BlockManagerInfo(id, time, maxMemSize, slaveActor) + logInfo("Registering block manager %s with %s RAM, %s".format( + id.hostPort, Utils.bytesToString(maxMemSize), id)) + + blockManagerIdByExecutor(id.executorId) = id + + blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), + maxMemSize, slaveActor) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) } From df9d98fe6703f6cc37fb0187fa55d140f37bb50e Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 06:36:06 -0400 Subject: [PATCH 2/5] Code review comments fixed --- .../spark/storage/BlockManagerMasterActor.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 8b70f3a406489..2880a249ad7fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -190,8 +190,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockManagerIdByExecutor -= blockManagerId.executorId // Remove it from blockManagerInfo and remove all the blocks. - blockManagerInfo.remove(blockManagerId) - + blockManagerInfo.remove(blockManagerId) val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -202,7 +201,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) - logInfo("removed " + blockManagerId) + logInfo(s"Removing block manager $blockManagerId") } private def expireDeadHosts() { @@ -328,10 +327,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { - case Some(manager) => - // A block manager of the same executor already exists so remove it (assumed dead). + case Some(oldId) => + // already exists so remove it (assumed dead). logError("Got two different block manager registrations on same executor - " - + " will remove, new Id " + id + ", orig id - " + manager) + + " will remove, new Id %s, orig id - %s".format(id, oldId)) removeExecutor(id.executorId) case None => } @@ -340,8 +339,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockManagerIdByExecutor(id.executorId) = id - blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), - maxMemSize, slaveActor) + blockManagerInfo(id) = new BlockManagerInfo( + id, System.currentTimeMillis(), maxMemSize, slaveActor) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) } From 7bcfc3dabaaedcfdc2f30f19e938543792f98e02 Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 06:39:37 -0400 Subject: [PATCH 3/5] whitspaces fix --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 2880a249ad7fa..b6f17d11cde04 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -18,19 +18,21 @@ package org.apache.spark.storage import java.util.{HashMap => JHashMap} + import scala.collection.mutable import scala.collection.JavaConversions._ import scala.concurrent.Future import scala.concurrent.duration._ + import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} - /** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. From 41a22176788bee43b4ee527c1fc295f81f07d343 Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 06:41:58 -0400 Subject: [PATCH 4/5] some more whitspaces change undone --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index b6f17d11cde04..944071c45f99a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -192,7 +192,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockManagerIdByExecutor -= blockManagerId.executorId // Remove it from blockManagerInfo and remove all the blocks. - blockManagerInfo.remove(blockManagerId) + blockManagerInfo.remove(blockManagerId) val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -326,7 +326,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val time = System.currentTimeMillis() - if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => From 094d508fed9aa57beb60d7a571cbe7c1e3b334c1 Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 06:43:11 -0400 Subject: [PATCH 5/5] some more white space change undone --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 944071c45f99a..8a8316cbb84e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -187,10 +187,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - + // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) val iterator = info.blocks.keySet.iterator