Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 23, 2014
1 parent 4e1514e commit b76b95f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ object BlockFetcherIterator {
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val maxRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", maxRequestSize: " + maxRequestSize)
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
Expand All @@ -177,7 +177,7 @@ object BlockFetcherIterator {
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= maxRequestSize) {
if (curRequestSize >= targetRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curRequestSize = 0
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {

val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)

override def getSize(blockId: BlockId): Long = {
diskManager.getBlockLocation(blockId).length
}
Expand Down Expand Up @@ -87,7 +89,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage

val buffer =
// For small files, directly read rather than memory map
if (segment.length < 2 * 4096) {
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
try {
channel.read(buf, segment.offset)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.storage.memoryMapThreshold</td>
<td>2 * 4096</td>
<td>
Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
mapping has high overhead for blocks close to or below the page size of the operating system.
</td>
</tr>
<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
Expand Down

0 comments on commit b76b95f

Please sign in to comment.