Skip to content

Commit

Permalink
Add unit tests + Fix bugs found through tests
Browse files Browse the repository at this point in the history
This covers all JSON de/serialization logic and block manager
reporting blocks with updated storage statuses during put.
  • Loading branch information
andrewor14 committed Mar 15, 2014
1 parent 45fd84c commit 650eb12
Show file tree
Hide file tree
Showing 5 changed files with 647 additions and 44 deletions.
33 changes: 22 additions & 11 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ private[spark] class BlockManager(
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
(storageLevel, memSize, diskSize)
Expand Down Expand Up @@ -498,8 +500,11 @@ private[spark] class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
def getDiskWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int) : BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
Expand Down Expand Up @@ -639,15 +644,17 @@ private[spark] class BlockManager(
}
}

// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
marked = true
putBlockInfo.markReady(size)
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
updatedBlocks += ((blockId, putBlockStatus))
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
Expand Down Expand Up @@ -731,7 +738,11 @@ private[spark] class BlockManager(
/**
* Write a block consisting of a single object.
*/
def putSingle(blockId: BlockId, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
def putSingle(
blockId: BlockId,
value: Any,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
put(blockId, Iterator(value), level, tellMaster)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
Expand Down
27 changes: 2 additions & 25 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,6 @@ private[spark] object JsonProtocol {
("Result" -> result) ~ json
}

def storageStatusToJson(storageStatus: StorageStatus): JValue = {
val blockManagerId = blockManagerIdToJson(storageStatus.blockManagerId)
val blocks = JArray(
storageStatus.blocks.toList.map { case (id, status) =>
("Block ID" -> blockIdToJson(id)) ~
("Status" -> blockStatusToJson(status))
})
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> storageStatus.maxMem) ~
("Blocks" -> blocks)
}

def rddInfoToJson(rddInfo: RDDInfo): JValue = {
val storageLevel = storageLevelToJson(rddInfo.storageLevel)
("RDD ID" -> rddInfo.id) ~
Expand Down Expand Up @@ -365,7 +353,7 @@ private[spark] object JsonProtocol {
}

def exceptionToJson(exception: Exception): JValue = {
("Message" -> exception.toString) ~
("Message" -> exception.getMessage) ~
("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
}

Expand Down Expand Up @@ -516,6 +504,7 @@ private[spark] object JsonProtocol {
metrics.hostname = (json \ "Host Name").extract[String]
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
metrics.resultSize = (json \ "Result Size").extract[Long]
metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
Expand Down Expand Up @@ -605,18 +594,6 @@ private[spark] object JsonProtocol {
}
}

def storageStatusFromJson(json: JValue): StorageStatus = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
val blocks = (json \ "Blocks").extract[List[JValue]].map { block =>
val id = blockIdFromJson(block \ "Block ID")
val status = blockStatusFromJson(block \ "Status")
(id, status)
}
val blockMap = mutable.Map[BlockId, BlockStatus](blocks: _*)
new StorageStatus(blockManagerId, maxMem, blockMap)
}

def rddInfoFromJson(json: JValue): RDDInfo = {
val rddId = (json \ "RDD ID").extract[Int]
val name = (json \ "Name").extract[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}

Expand Down Expand Up @@ -492,12 +492,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
assert(store.getSingle("a1").isDefined, "a2 was not in store")
assert(store.getSingle("a2").isDefined, "a3 was not in store")
assert(store.getSingle("a3").isDefined, "a1 was not in store")
assert(store.getSingle("a1").isDefined, "a2 was not in store")
assert(store.getSingle("a2").isDefined, "a3 was not in store")
assert(store.getSingle("a3").isDefined, "a1 was not in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a1") == None, "a1 was in store")
Expand Down Expand Up @@ -663,6 +660,60 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}

test("updated block statuses") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
val list = List.fill(2)(new Array[Byte](200))
val bigList = List.fill(8)(new Array[Byte](200))

// 1 updated block (i.e. list1)
val updatedBlocks1 =
store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)

// 1 updated block (i.e. list2)
val updatedBlocks2 =
store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)

// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 =
store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
id match {
case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE)
case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
case _ => fail("Updated block is neither list1 nor list3")
}
}
assert(store.get("list3").isDefined, "list3 was not in store")

// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 =
store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
id match {
case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY)
case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
case _ => fail("Updated block is neither list2 nor list4")
}
}
assert(store.get("list4").isDefined, "list4 was not in store")

// No updated blocks - nothing is kicked out of memory because list5 is too big to be added
val updatedBlocks5 =
store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks5.size === 0)
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list4").isDefined, "list4 was not in store")
assert(!store.get("list5").isDefined, "list5 was in store")
}

test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr)
store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
Expand Down
Loading

0 comments on commit 650eb12

Please sign in to comment.