From e57e270906da88599cd240bff3d0fce6334bd954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Wed, 3 Dec 2014 15:19:50 +0800 Subject: [PATCH] add check info is already remove or not while having gotten info.syn --- .../apache/spark/storage/BlockManager.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 308c59eda594d..1b7a1a326dc47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1010,9 +1010,9 @@ private[spark] class BlockManager( info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. - if (!info.waitForReady()) { + if (blockInfo.get(blockId).isEmpty || !info.waitForReady()) { // If we get here, the block write failed. - logWarning(s"Block $blockId was marked as failure. Nothing to drop") + logWarning(s"Block $blockId was marked as failure or already dropped. Nothing to drop") return None } @@ -1089,15 +1089,17 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or tachyon store") + if (blockInfo.get(blockId).isEmpty) { + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or tachyon store") + } + blockInfo.remove(blockId) } - blockInfo.remove(blockId) if (tellMaster && info.tellMaster) { val status = getCurrentBlockStatus(blockId, info) reportBlockStatus(blockId, info, status) @@ -1126,12 +1128,14 @@ private[spark] class BlockManager( val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) if (time < cleanupTime && shouldDrop(id)) { info.synchronized { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { tachyonStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") + if (blockInfo.get(id).isEmpty) { + val level = info.level + if (level.useMemory) { memoryStore.remove(id) } + if (level.useDisk) { diskStore.remove(id) } + if (level.useOffHeap) { tachyonStore.remove(id) } + iterator.remove() + logInfo(s"Dropped block $id") + } } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status)