Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7855] Move bypassMergeSort-handling from ExternalSorter to own component #6397

Closed
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
18959bb
Move comparator methods closer together.
JoshRosen May 25, 2015
19bccd6
Remove duplicated buffer creation code.
JoshRosen May 25, 2015
8d0678c
Move diskBytesSpilled getter next to variable
JoshRosen May 25, 2015
6185ee2
WIP towards moving bypass code into own file.
JoshRosen May 25, 2015
b6cc1eb
Realize that bypass never buffers; proceed to delete tons of code
JoshRosen May 25, 2015
bb96678
Add missing interface file
JoshRosen May 25, 2015
d4cb536
Delete more unused code
JoshRosen May 25, 2015
02355ef
More simplification
JoshRosen May 25, 2015
1265b25
Fix some style errors and comments.
JoshRosen May 25, 2015
4b03539
Move conf prior to first use
JoshRosen May 25, 2015
6a35716
Refactor logic for deciding when to bypass
JoshRosen May 25, 2015
931ca68
Refactor tests.
JoshRosen May 25, 2015
25aa3bd
Make sure to delete outputFile after errors.
JoshRosen May 25, 2015
7f15f7b
Back out extra cleanup-handling code, since this is already covered i…
JoshRosen May 25, 2015
d267e0d
Fix style issue
JoshRosen May 25, 2015
6320112
Add missing negation in deletion success check.
JoshRosen May 25, 2015
7af7aea
Combine spill() and spillToMergeableFile()
JoshRosen May 25, 2015
0d9848c
Make it more clear that curWriteMetrics is now only used for spill me…
JoshRosen May 25, 2015
25b964f
Rename SortShuffleSorter to SortShuffleFileWriter
JoshRosen May 26, 2015
0d3dcc0
Remove unnecessary overloaded methods
JoshRosen May 26, 2015
bc1a820
Fix bug when aggregator is used but map-side combine is disabled
JoshRosen May 26, 2015
30ef2c8
Convert BypassMergeSortShuffleWriter to Java
JoshRosen May 26, 2015
5197f73
Add missing private[this]
JoshRosen May 26, 2015
bf0d98f
Add comment to clarify confusing factory code
JoshRosen May 26, 2015
71d76ff
Update Javadoc
JoshRosen May 27, 2015
d7f9938
Add missing overrides; fix compilation
JoshRosen May 27, 2015
08e40f3
Remove excessively clever (and wrong) implementation of newBuffer()
JoshRosen May 27, 2015
8522b6a
Do not perform a map-side sort unless we're also doing map-side aggre…
JoshRosen May 27, 2015
96811b4
Remove confusing taskMetrics.shuffleWriteMetrics() optional call
JoshRosen May 28, 2015
16564eb
Guard against calling fileSegment() before commitAndClose() has been …
JoshRosen May 28, 2015
8b8fb9e
Add more tests + defensive programming to DiskBlockObjectWriter.
JoshRosen May 28, 2015
b5cc35b
Move shuffle metrics tests to ShuffleSuite.
JoshRosen May 28, 2015
03f35a4
Minor fix to cleanup logic.
JoshRosen May 28, 2015
8b216c4
Guard against negative offsets and lengths in FileSegment
JoshRosen May 29, 2015
bf3f3f6
Merge remote-tracking branch 'origin/master' into external-sorter-byp…
JoshRosen May 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

/**
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. This is essentially identical to
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
* <p>
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specific, and</li>
* <li>the number of partitions is less than
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
*
* This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was
* refactored into its own class in order to reduce code complexity; see SPARK-7855 for details.
* <p>
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
*/
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {

private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetrics writeMetrics;
private final Serializer serializer;

/** Array of file writers, one for each partition */
private BlockObjectWriter[] partitionWriters;

public BypassMergeSortShuffleWriter(
SparkConf conf,
BlockManager blockManager,
Partitioner partitioner,
ShuffleWriteMetrics writeMetrics,
Serializer serializer) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.numPartitions = partitioner.numPartitions();
this.blockManager = blockManager;
this.partitioner = partitioner;
this.writeMetrics = writeMetrics;
this.serializer = serializer;
}

@Override
public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new BlockObjectWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioner.getPartition(key) is different from the previous codes. The previous codes calls getPartition which doesn't call partitioner.getPartition(key) if there is only 1 partition. I'm not sure if such optimization does matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing this. I left this out on purpose, but should have probably commented on the diff to explain why.

I think that the reason why ExternalSorter skips the partitioner.getPartition(key) call when there is only one partition is because ExternalSorter is also used for non-shuffle contexts for which we don't define a partitioner (such as the reduce-side sort in sortByKey(). In those cases, we obviously want to avoid unnecessary hashing.

BypassMergeSortShuffleWriter is only used for shuffles, though, and I expect that it's extremely rare to have shuffles that shuffle everything to a single partition (collecting results to the driver is handled by different code). Therefore, I chose to leave out that check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Fair enough. LGTM now.

}

for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
}

@Override
public long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

@Override
public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (BlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
logger.error("Error while deleting file for block {}", writer.blockId());
}
}
} finally {
partitionWriters = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;

import scala.Product2;
import scala.collection.Iterator;

import org.apache.spark.annotation.Private;
import org.apache.spark.TaskContext;
import org.apache.spark.storage.BlockId;

/**
* Interface for objects that {@link SortShuffleWriter} uses to write its output files.
*/
@Private
public interface SortShuffleFileWriter<K, V> {

void insertAll(Iterator<Product2<K, V>> records) throws IOException;

/**
* Write all the data added into this shuffle sorter into a file in the disk store. This is
* called by the SortShuffleWriter and can go through an efficient path of just concatenating
* binary files if we decided to avoid merge-sorting.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException;

void stop() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.spark.shuffle.sort

import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
Expand All @@ -35,7 +36,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private val blockManager = SparkEnv.get.blockManager

private var sorter: ExternalSorter[K, V, _] = null
private var sorter: SortShuffleFileWriter[K, V] = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -49,18 +50,27 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else if (SortShuffleWriter.shouldBypassMergeSort(
SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, keyOrdering = None)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need local aggregation and sorting, write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleWriter[K, V](SparkEnv.get.conf, blockManager, dep.partitioner,
writeMetrics, Serializer.getSerializer(dep.serializer))
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
new ExternalSorter[K, V, V](
aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
Expand Down Expand Up @@ -100,3 +110,13 @@ private[spark] class SortShuffleWriter[K, V, C](
}
}

private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(
conf: SparkConf,
numPartitions: Int,
aggregator: Option[Aggregator[_, _, _]],
keyOrdering: Option[Ordering[_]]): Boolean = {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
numPartitions <= bypassMergeThreshold && aggregator.isEmpty && keyOrdering.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[spark] class DiskBlockObjectWriter(
private var objOut: SerializationStream = null
private var initialized = false
private var hasBeenClosed = false
private var commitAndCloseHasBeenCalled = false

/**
* Cursors used to represent positions in the file.
Expand Down Expand Up @@ -167,20 +168,22 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
bs.flush()
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
commitAndCloseHasBeenCalled = true
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)

if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down Expand Up @@ -228,6 +231,10 @@ private[spark] class DiskBlockObjectWriter(
}

override def fileSegment(): FileSegment = {
if (!commitAndCloseHasBeenCalled) {
throw new IllegalStateException(
"fileSegment() is only valid after commitAndClose() has been called")
}
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import java.io.File
* based off an offset and a length.
*/
private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) {
require(offset >= 0, s"File segment offset cannot be negative (got $offset)")
require(length >= 0, s"File segment length cannot be negative (got $length)")
override def toString: String = {
"(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
}
Expand Down
Loading