Skip to content

Commit

Permalink
[SPARK-7855] Move bypassMergeSort-handling from ExternalSorter to own…
Browse files Browse the repository at this point in the history
… component

Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file.

The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code.

In addition to re-arranging code, this patch deletes a bunch of dead code.  The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / apache#3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code.

There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that.

This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes:

- DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called.
- DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes.  Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics).
- The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite.  This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration.
- The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle.
- Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`.
- FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits:

bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup
8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment
03f35a4 [Josh Rosen] Minor fix to cleanup logic.
b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite.
8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter.
16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called.
96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call
8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation
08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer()
d7f9938 [Josh Rosen] Add missing overrides; fix compilation
71d76ff [Josh Rosen] Update Javadoc
bf0d98f [Josh Rosen] Add comment to clarify confusing factory code
5197f73 [Josh Rosen] Add missing private[this]
30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java
bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled
0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods
25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter
0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics
7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile()
6320112 [Josh Rosen] Add missing negation in deletion success check.
d267e0d [Josh Rosen] Fix style issue
7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop()
25aa3bd [Josh Rosen] Make sure to delete outputFile after errors.
931ca68 [Josh Rosen] Refactor tests.
6a35716 [Josh Rosen] Refactor logic for deciding when to bypass
4b03539 [Josh Rosen] Move conf prior to first use
1265b25 [Josh Rosen] Fix some style errors and comments.
02355ef [Josh Rosen] More simplification
d4cb536 [Josh Rosen] Delete more unused code
bb96678 [Josh Rosen] Add missing interface file
b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code
6185ee2 [Josh Rosen] WIP towards moving bypass code into own file.
8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable
19bccd6 [Josh Rosen] Remove duplicated buffer creation code.
18959bb [Josh Rosen] Move comparator methods closer together.
  • Loading branch information
JoshRosen authored and jeanlyn committed Jun 12, 2015
1 parent 1b06853 commit 5ce893d
Show file tree
Hide file tree
Showing 17 changed files with 741 additions and 426 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

/**
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. This is essentially identical to
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
* <p>
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specific, and</li>
* <li>the number of partitions is less than
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
*
* This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was
* refactored into its own class in order to reduce code complexity; see SPARK-7855 for details.
* <p>
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
*/
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {

private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetrics writeMetrics;
private final Serializer serializer;

/** Array of file writers, one for each partition */
private BlockObjectWriter[] partitionWriters;

public BypassMergeSortShuffleWriter(
SparkConf conf,
BlockManager blockManager,
Partitioner partitioner,
ShuffleWriteMetrics writeMetrics,
Serializer serializer) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.numPartitions = partitioner.numPartitions();
this.blockManager = blockManager;
this.partitioner = partitioner;
this.writeMetrics = writeMetrics;
this.serializer = serializer;
}

@Override
public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new BlockObjectWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
}

@Override
public long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

@Override
public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (BlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
logger.error("Error while deleting file for block {}", writer.blockId());
}
}
} finally {
partitionWriters = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;

import scala.Product2;
import scala.collection.Iterator;

import org.apache.spark.annotation.Private;
import org.apache.spark.TaskContext;
import org.apache.spark.storage.BlockId;

/**
* Interface for objects that {@link SortShuffleWriter} uses to write its output files.
*/
@Private
public interface SortShuffleFileWriter<K, V> {

void insertAll(Iterator<Product2<K, V>> records) throws IOException;

/**
* Write all the data added into this shuffle sorter into a file in the disk store. This is
* called by the SortShuffleWriter and can go through an efficient path of just concatenating
* binary files if we decided to avoid merge-sorting.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException;

void stop() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.shuffle.sort

import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
Expand All @@ -35,7 +36,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private val blockManager = SparkEnv.get.blockManager

private var sorter: ExternalSorter[K, V, _] = null
private var sorter: SortShuffleFileWriter[K, V] = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -49,22 +50,31 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else if (dep.keyOrdering.isDefined) {
sorter = new ExternalSorter[K, V, V](
new ExternalSorter[K, V, V](
None, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
} else if (SortShuffleWriter.shouldBypassMergeSort(
SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, keyOrdering = None)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need local aggregation and sorting, write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleWriter[K, V](SparkEnv.get.conf, blockManager, dep.partitioner,
writeMetrics, Serializer.getSerializer(dep.serializer))

} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
new ExternalSorter[K, V, V](
aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
Expand Down Expand Up @@ -104,3 +114,13 @@ private[spark] class SortShuffleWriter[K, V, C](
}
}

private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(
conf: SparkConf,
numPartitions: Int,
aggregator: Option[Aggregator[_, _, _]],
keyOrdering: Option[Ordering[_]]): Boolean = {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
numPartitions <= bypassMergeThreshold && aggregator.isEmpty && keyOrdering.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
private var objOut: SerializationStream = null
private var initialized = false
private var hasBeenClosed = false
private var commitAndCloseHasBeenCalled = false

/**
* Cursors used to represent positions in the file.
Expand Down Expand Up @@ -167,20 +168,22 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
bs.flush()
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
commitAndCloseHasBeenCalled = true
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)

if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down Expand Up @@ -228,6 +231,10 @@ private[spark] class DiskBlockObjectWriter(
}

override def fileSegment(): FileSegment = {
if (!commitAndCloseHasBeenCalled) {
throw new IllegalStateException(
"fileSegment() is only valid after commitAndClose() has been called")
}
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import java.io.File
* based off an offset and a length.
*/
private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) {
require(offset >= 0, s"File segment offset cannot be negative (got $offset)")
require(length >= 0, s"File segment length cannot be negative (got $length)")
override def toString: String = {
"(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
}
Expand Down
Loading

0 comments on commit 5ce893d

Please sign in to comment.