Skip to content

Commit

Permalink
Merge pull request alteryx#130 from aarondav/shuffle
Browse files Browse the repository at this point in the history
Memory-optimized shuffle file consolidation

Reduces overhead of each shuffle block for consolidation from >300 bytes to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks, net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime of the shuffle phase in this test was only around 2% slower, while the reduce phase was 40% faster, when compared to not using any shuffle file consolidation.

This is accomplished by replacing the map from ShuffleBlockId to FileSegment (i.e., block id to where it's located), which had high overhead due to being a gigantic, timestamped, concurrent map with a more space-efficient structure. Namely, the following are introduced (I have omitted the word "Shuffle" from some names for clarity):
**ShuffleFile** - there is one ShuffleFile per consolidated shuffle file on disk. We store an array of offsets into the physical shuffle file for each ShuffleMapTask that wrote into the file. This is sufficient to reconstruct FileSegments for mappers that are in the file.
**FileGroup** - contains a set of ShuffleFiles, one per reducer, that a MapTask can use to write its output. There is one FileGroup created per _concurrent_ MapTask. The FileGroup contains an array of the mapIds that have been written to all files in the group. The positions of elements in this array map directly onto the positions in each ShuffleFile's offsets array.

In order to locate the FileSegment associated with a BlockId, we have another structure which maps each reducer to the set of ShuffleFiles that were created for it. (There will be as many ShuffleFiles per reducer as there are FileGroups.) To lookup a given ShuffleBlockId (shuffleId, reducerId, mapId), we thus search through all ShuffleFiles associated with that reducer.

As a time optimization, we ensure that FileGroups are only reused for MapTasks with monotonically increasing mapIds. This allows us to perform a binary search to locate a mapId inside a group, and also enables potential future optimization (based on the usual monotonic access order).

(cherry picked from commit 7a26104)
Signed-off-by: Reynold Xin <rxin@apache.org>
  • Loading branch information
rxin committed Nov 5, 2013
1 parent 1d11e43 commit 7e00dee
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 110 deletions.
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,26 +146,26 @@ private[spark] class ShuffleMapTask(
metrics = Some(context.taskMetrics)

val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null
var buckets: ShuffleWriterGroup = null
val shuffleBlockManager = blockManager.shuffleBlockManager
var shuffle: ShuffleWriterGroup = null
var success = false

try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
shuffle.writers(bucketId).write(pair)
}

// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
val size = writer.fileSegment().length
totalBytes += size
Expand All @@ -179,19 +179,20 @@ private[spark] class ShuffleMapTask(
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (buckets != null) {
buckets.writers.foreach(_.revertPartialWrites())
if (shuffle != null) {
shuffle.writers.foreach(_.revertPartialWrites())
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) {
buckets.writers.foreach(_.close())
shuffle.releaseWriters(buckets)
if (shuffle != null && shuffle.writers != null) {
shuffle.writers.foreach(_.close())
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
context.executeOnCompleteCallbacks()
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{InputStream, OutputStream}
import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
Expand Down Expand Up @@ -47,7 +47,7 @@ private[spark] class BlockManager(
extends Logging {

val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
Expand Down Expand Up @@ -517,15 +517,11 @@ private[spark] class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int)
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ abstract class BlockObjectWriter(val blockId: BlockId) {

/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int,
compressStream: OutputStream => OutputStream)
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int,
compressStream: OutputStream => OutputStream)
extends BlockObjectWriter(blockId)
with Logging
{
Expand Down Expand Up @@ -111,16 +111,15 @@ class DiskBlockObjectWriter(
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialPosition = 0L
private var lastValidPosition = 0L
private val initialPosition = file.length()
private var lastValidPosition = initialPosition
private var initialized = false
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
initialPosition = channel.position
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ package org.apache.spark.storage
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random}
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.Utils

/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
Expand All @@ -35,7 +34,8 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging {
private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
extends PathResolver with Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
Expand All @@ -47,54 +47,23 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null

// Stores only Blocks which have been specifically mapped to segments of files
// (rather than the default, which maps a Block to a whole file).
// This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks.
private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment]

val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup)

addShutdownHook()

/**
* Creates a logical mapping from the given BlockId to a segment of a file.
* This will cause any accesses of the logical BlockId to be directed to the specified
* physical location.
*/
def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) {
blockToFileSegmentMap.put(blockId, fileSegment)
}

/**
* Returns the phyiscal file segment in which the given BlockId is located.
* If the BlockId has been mapped to a specific FileSegment, that will be returned.
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/
def getBlockLocation(blockId: BlockId): FileSegment = {
if (blockToFileSegmentMap.internalMap.containsKey(blockId)) {
blockToFileSegmentMap.get(blockId).get
if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else {
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
}
}

/**
* Simply returns a File to place the given Block into. This does not physically create the file.
* If filename is given, that file will be used. Otherwise, we will use the BlockId to get
* a unique filename.
*/
def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = {
val actualFilename = if (filename == "") blockId.name else filename
val file = getFile(actualFilename)
if (!allowAppending && file.exists()) {
throw new IllegalStateException(
"Attempted to create file that already exists: " + actualFilename)
}
file
}

private def getFile(filename: String): File = {
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
Expand All @@ -119,6 +88,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)

private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
Expand Down Expand Up @@ -151,10 +122,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
}
}

private def cleanup(cleanupTime: Long) {
blockToFileSegmentMap.clearOldValues(cleanupTime)
}

private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val bytes = _bytes.duplicate()
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
val file = diskManager.createBlockFile(blockId, allowAppending = false)
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel()
while (bytes.remaining > 0) {
channel.write(bytes)
Expand All @@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage

logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = diskManager.createBlockFile(blockId, allowAppending = false)
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
val length = file.length
Expand Down
Loading

0 comments on commit 7e00dee

Please sign in to comment.