Skip to content

Commit

Permalink
Added a test and feedback on mateis review
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 27, 2014
1 parent a637a18 commit 4943351
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
val defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf)
_conf: SparkConf)
extends Logging {

def conf = _conf
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
Expand Down
26 changes: 12 additions & 14 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,19 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val segment = diskManager.getBlockLocation(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel()

val buffer =
try {
// For small files, directly read rather than memory map
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
channel.read(buf, segment.offset)
buf.rewind()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
}
} finally {
channel.close()
try {
// For small files, directly read rather than memory map
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
channel.read(buf, segment.offset)
buf.flip()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
}
buffer
} finally {
channel.close()
}
}

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.spark.storage

import java.nio.ByteBuffer
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays

import akka.actor._
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}

class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
private val conf = new SparkConf(false)
var store: BlockManager = null
Expand Down Expand Up @@ -657,4 +656,51 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") == None, "a1 should not be in store")
}
}

test("reads of memory-mapped and non memory-mapped files are equivalent") {
val confKey = "spark.storage.memoryMapThreshold"

// Create a non-trivial (not all zeros) byte array
var counter = 0.toByte
def incr = {counter = (counter + 1).toByte; counter;}
val bytes = Array.fill[Byte](1000)(incr)
val byteBuffer = ByteBuffer.wrap(bytes)

val blockId = BlockId("rdd_1_2")

// This sequence of mocks makes these tests fairly brittle. It would
// be nice to refactor classes involved in disk storage in a way that
// allows for easier testing.
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("java.io.tmpdir"))

when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val mapped = diskStoreMapped.getBytes(blockId).get

when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString))
val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val notMapped = diskStoreNotMapped.getBytes(blockId).get

// Not possible to do isInstanceOf due to visibility of HeapByteBuffer
assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
"Expected HeapByteBuffer for un-mapped read")
assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")

def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
val array = new Array[Byte](in.remaining())
in.get(array)
array
}

val mappedAsArray = arrayFromByteBuffer(mapped)
val notMappedAsArray = arrayFromByteBuffer(notMapped)
assert(Arrays.equals(mappedAsArray, bytes))
assert(Arrays.equals(notMappedAsArray, bytes))
}
}
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.storage.memoryMapThreshold</td>
<td>2 * 4096</td>
<td>8192</td>
<td>
Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
Expand Down

0 comments on commit 4943351

Please sign in to comment.