From b6cc1ebe63ada7a557fd1b5129481f30b6d3afc8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 01:31:07 -0700 Subject: [PATCH] Realize that bypass never buffers; proceed to delete tons of code --- .../sort/BypassMergeSortShuffleWriter.scala | 173 ++++-------------- .../shuffle/sort/SortShuffleWriter.scala | 3 +- .../collection/PartitionedAppendOnlyMap.scala | 4 - .../collection/PartitionedPairBuffer.scala | 4 - .../PartitionedSerializedPairBuffer.scala | 4 - .../WritablePartitionedPairCollection.scala | 36 ++-- 6 files changed, 48 insertions(+), 176 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.scala index 9130eb0d60d71..d18f0b1e826b1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.scala @@ -19,8 +19,6 @@ package org.apache.spark.shuffle.sort import java.io.{File, FileInputStream, FileOutputStream} -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager, BlockObjectWriter} @@ -44,10 +42,9 @@ private[spark] class BypassMergeSortShuffleWriter[K, V]( conf: SparkConf, blockManager: BlockManager, partitioner: Partitioner, + writeMetrics: ShuffleWriteMetrics, serializer: Option[Serializer] = None) - extends Logging - with Spillable[WritablePartitionedPairCollection[K, V]] - with SortShuffleSorter[K, V] { + extends Logging with SortShuffleSorter[K, V] { private[this] val numPartitions = partitioner.numPartitions private[this] val shouldPartition = numPartitions > 1 @@ -55,7 +52,6 @@ private[spark] class BypassMergeSortShuffleWriter[K, V]( if (shouldPartition) partitioner.getPartition(key) else 0 } - private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true) @@ -63,110 +59,36 @@ private[spark] class BypassMergeSortShuffleWriter[K, V]( private val ser = Serializer.getSerializer(serializer) private val serInstance = ser.newInstance() - /** - * Allocates a new buffer. Called in the constructor and after every spill. - */ - private def newBuffer: () => WritablePartitionedPairCollection[K, V] with SizeTracker = { - val useSerializedPairBuffer = - conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.supportsRelocationOfSerializedObjects - if (useSerializedPairBuffer) { - val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - () => new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) - } else { - () => new PartitionedPairBuffer[K, V] - } - } - private var buffer = newBuffer() - - private var _diskBytesSpilled = 0L - def diskBytesSpilled: Long = _diskBytesSpilled - - /** - * Information about a spilled file. - * - * @param file the file - * @param blockId the block id - * @param serializerBatchSizes sizes, in bytes, of "batches" written by the serializer as we - * periodically reset its stream - * @param elementsPerPartition the number of elements in each partition, used to efficiently - * kepe track of partitions when merging. - */ - private[this] case class SpilledFile( - file: File, - blockId: BlockId, - serializerBatchSizes: Array[Long], - elementsPerPartition: Array[Long]) - private val spills = new ArrayBuffer[SpilledFile] - - /** Array of file writers for each partition, used if we've spilled */ + /** Array of file writers for each partition */ private var partitionWriters: Array[BlockObjectWriter] = _ - /** - * Write metrics for spill. This is initialized when partitionWriters is created */ - private var spillWriteMetrics: ShuffleWriteMetrics = _ - def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = { - // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies + assert (partitionWriters == null) if (records.hasNext) { - spill( - WritablePartitionedIterator.fromIterator(records.map { kv => - ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[V]) - }) - ) - } - } - - /** - * Spill the current in-memory collection to disk if needed. - * - * @param usingMap whether we're using a map or buffer as our current in-memory collection - */ - private def maybeSpillCollection(usingMap: Boolean): Unit = { - if (spillingEnabled && maybeSpill(buffer, buffer.estimateSize())) { - buffer = newBuffer() - } - } - - /** - * Spill our in-memory collection to separate files, one for each partition, then clears the - * collection. - */ - override protected[this] def spill(collection: WritablePartitionedPairCollection[K, V]): Unit = { - spill(collection.writablePartitionedIterator()) - } - - private def spill(iterator: WritablePartitionedIterator): Unit = { - // Create our file writers if we haven't done so yet - if (partitionWriters == null) { - spillWriteMetrics = new ShuffleWriteMetrics() val openStartTime = System.nanoTime partitionWriters = Array.fill(numPartitions) { - // Because these files may be read during shuffle, their compression must be controlled by - // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use - // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = blockManager.diskBlockManager.createTempShuffleBlock() - val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, - spillWriteMetrics) + val writer = + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics) writer.open() } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. - spillWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) - } + writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) - // No need to sort stuff, just write each element out - while (iterator.hasNext) { - val partitionId = iterator.nextPartition() - iterator.writeNext(partitionWriters(partitionId)) + while (records.hasNext) { + val record = records.next() + val key: K = record._1 + partitionWriters(getPartition(key)).write(key, record._2) + } } } /** - * Write all the data added into this ExternalSorter into a file in the disk store. This is + * Write all the data added into this writer into a single file in the disk store. This is * called by the SortShuffleWriter and can go through an efficient path of just concatenating - * binary files if we decided to avoid merge-sorting. + * the per-partition binary files. * * @param blockId block ID to write to. The index file will be blockId.name + ".index". * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. @@ -180,55 +102,28 @@ private[spark] class BypassMergeSortShuffleWriter[K, V]( // Track location of each range in the output file val lengths = new Array[Long](numPartitions) - if (spills.isEmpty) { - // Case where we only have in-memory data - assert (partitionWriters == null) - assert (spillWriteMetrics == null) - - val it = buffer.writablePartitionedIterator() - while (it.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) - val partitionId = it.nextPartition() - while (it.hasNext && it.nextPartition() == partitionId) { - it.writeNext(writer) - } - writer.commitAndClose() - val segment = writer.fileSegment() - lengths(partitionId) = segment.length - } - } else { - // Case where we have both in-memory and spilled data. - assert (partitionWriters != null) - assert (spillWriteMetrics != null) - // For simplicity, spill out the current in-memory collection so that everything is in files. - spill(buffer) - partitionWriters.foreach(_.commitAndClose()) - val out = new FileOutputStream(outputFile, true) - val writeStartTime = System.nanoTime - Utils.tryWithSafeFinally { - for (i <- 0 until numPartitions) { - val in = new FileInputStream(partitionWriters(i).fileSegment().file) - Utils.tryWithSafeFinally { - lengths(i) = Utils.copyStream(in, out, closeStreams = false, transferToEnabled) - } { - in.close() - } + // TODO: handle case where partition writers is null (e.g. we haven't written any data). + + partitionWriters.foreach(_.commitAndClose()) + // Concatenate the per-partition files. + val out = new FileOutputStream(outputFile, true) + val writeStartTime = System.nanoTime + Utils.tryWithSafeFinally { + for (i <- 0 until numPartitions) { + val in = new FileInputStream(partitionWriters(i).fileSegment().file) + Utils.tryWithSafeFinally { + lengths(i) = Utils.copyStream(in, out, closeStreams = false, transferToEnabled) + } { + in.close() } - } { - out.close() - context.taskMetrics.shuffleWriteMetrics.foreach { m => - m.incShuffleWriteTime(System.nanoTime - writeStartTime) + if (blockManager.diskBlockManager.getFile(partitionWriters(i).blockId).delete()) { + logError("Unable to delete file for partition i. ") } } - } - context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) - context.taskMetrics.shuffleWriteMetrics.foreach { m => - if (spillWriteMetrics != null) { - m.incShuffleBytesWritten(spillWriteMetrics.shuffleBytesWritten) - m.incShuffleWriteTime(spillWriteMetrics.shuffleWriteTime) - m.incShuffleRecordsWritten(spillWriteMetrics.shuffleRecordsWritten) + } { + out.close() + context.taskMetrics.shuffleWriteMetrics.foreach { m => + m.incShuffleWriteTime(System.nanoTime - writeStartTime) } } @@ -236,8 +131,6 @@ private[spark] class BypassMergeSortShuffleWriter[K, V]( } def stop(): Unit = { - spills.foreach(s => s.file.delete()) - spills.clear() if (partitionWriters != null) { partitionWriters.foreach { w => w.revertPartialWritesAndClose() diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 039330a4d529e..45ae4d2c4a22b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -65,7 +65,8 @@ private[spark] class SortShuffleWriter[K, V, C]( // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. - new BypassMergeSortShuffleWriter[K, V](conf, blockManager, dep.partitioner, dep.serializer) + new BypassMergeSortShuffleWriter[K, V]( + conf, blockManager, dep.partitioner, writeMetrics, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala index e2e2f1faae9d1..d0d25b43d0477 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedAppendOnlyMap.scala @@ -34,10 +34,6 @@ private[spark] class PartitionedAppendOnlyMap[K, V] destructiveSortedIterator(comparator) } - def writablePartitionedIterator(): WritablePartitionedIterator = { - WritablePartitionedIterator.fromIterator(super.iterator) - } - def insert(partition: Int, key: K, value: V): Unit = { update((partition, key), value) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala index e8332e1a87eac..5a6e9a9580e9b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala @@ -71,10 +71,6 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) iterator } - override def writablePartitionedIterator(): WritablePartitionedIterator = { - WritablePartitionedIterator.fromIterator(iterator) - } - private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] { var pos = 0 diff --git a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala index ac9ea6393628f..89fb4f105b178 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala @@ -119,10 +119,6 @@ private[spark] class PartitionedSerializedPairBuffer[K, V]( override def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]) : WritablePartitionedIterator = { sort(keyComparator) - writablePartitionedIterator - } - - override def writablePartitionedIterator(): WritablePartitionedIterator = { new WritablePartitionedIterator { // current position in the meta buffer in ints var pos = 0 diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala index f26d1618c9200..dbf5e50ccf6af 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala @@ -47,13 +47,20 @@ private[spark] trait WritablePartitionedPairCollection[K, V] { */ def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]) : WritablePartitionedIterator = { - WritablePartitionedIterator.fromIterator(partitionedDestructiveSortedIterator(keyComparator)) - } + val it = partitionedDestructiveSortedIterator(keyComparator) + new WritablePartitionedIterator { + var cur = if (it.hasNext) it.next() else null - /** - * Iterate through the data and write out the elements instead of returning them. - */ - def writablePartitionedIterator(): WritablePartitionedIterator + def writeNext(writer: BlockObjectWriter): Unit = { + writer.write(cur._1._2, cur._2) + cur = if (it.hasNext) it.next() else null + } + + def hasNext(): Boolean = cur != null + + def nextPartition(): Int = cur._1._1 + } + } } private[spark] object WritablePartitionedPairCollection { @@ -94,20 +101,3 @@ private[spark] trait WritablePartitionedIterator { def nextPartition(): Int } - -private[spark] object WritablePartitionedIterator { - def fromIterator(it: Iterator[((Int, _), _)]): WritablePartitionedIterator = { - new WritablePartitionedIterator { - var cur = if (it.hasNext) it.next() else null - - def writeNext(writer: BlockObjectWriter): Unit = { - writer.write(cur._1._2, cur._2) - cur = if (it.hasNext) it.next() else null - } - - def hasNext(): Boolean = cur != null - - def nextPartition(): Int = cur._1._1 - } - } -}