Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
suyanNone committed Dec 8, 2014
1 parent e57e270 commit 55fa4ba
Showing 1 changed file with 19 additions and 20 deletions.
39 changes: 19 additions & 20 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1010,9 +1010,12 @@ private[spark] class BlockManager(
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (blockInfo.get(blockId).isEmpty || !info.waitForReady()) {
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
} else if(!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure or already dropped. Nothing to drop")
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
}

Expand Down Expand Up @@ -1089,17 +1092,15 @@ private[spark] class BlockManager(
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
if (blockInfo.get(blockId).isEmpty) {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
val status = getCurrentBlockStatus(blockId, info)
reportBlockStatus(blockId, info, status)
Expand Down Expand Up @@ -1128,14 +1129,12 @@ private[spark] class BlockManager(
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
if (blockInfo.get(id).isEmpty) {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { tachyonStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { tachyonStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
Expand Down

0 comments on commit 55fa4ba

Please sign in to comment.