Skip to content

Commit

Permalink
Merge pull request alteryx#7 from wannabeast/memorystore-fixes
Browse files Browse the repository at this point in the history
some minor fixes to MemoryStore

This is a repeat of alteryx#5, moved to its own branch in my repo.

This makes all updates to   on ; it skips on synchronizing the reads where it can get away with it.
  • Loading branch information
rxin committed Sep 26, 2013
2 parents 6566a19 + 9524b94 commit 560ee5c
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {

case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
case class Entry(value: Any, size: Long, deserialized: Boolean)

private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
@volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
Expand Down Expand Up @@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)

override def remove(blockId: String): Boolean = {
entries.synchronized {
val entry = entries.get(blockId)
val entry = entries.remove(blockId)
if (entry != null) {
entries.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
Expand All @@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def clear() {
entries.synchronized {
entries.clear()
currentMemory = 0
}
logInfo("MemoryStore cleared")
}
Expand Down Expand Up @@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
Expand Down

0 comments on commit 560ee5c

Please sign in to comment.