Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7855] Move bypassMergeSort-handling from ExternalSorter to own component #6397

Closed

Conversation

JoshRosen
Copy link
Contributor

Spark's ExternalSorter writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, spark.shuffle.sort.bypassMergeThreshold, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file.

The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code.

In addition to re-arranging code, this patch deletes a bunch of dead code. The main entry point into ExternalSorter is insertAll() and in SPARK-4479 / #3422 this method was modified to completely bypass in-memory buffering of records when bypassMergeSort takes effect. As a result, some of the spilling and merging code paths will no longer be called when bypassMergeSort is used, so we should be able to safely remove that code.

There's an open JIRA (SPARK-6026) for removing the bypassMergeThreshold parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that.

This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes:

  • DiskBlockObjectWriter now throws an exception if fileSegment() is called before commitAndClose() has been called.
  • DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes. Calling revertPartialWritesAndClose() on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics).
  • The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite. This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration.
  • The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle.
  • Our tests now check that shuffleBytesWritten == totalShuffleBytesRead.
  • FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset.


// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old code, it was pretty hard to trace through usages of this variable to figure out where it was reset / updated. After removing the hash bypass code, I noticed that this variable's scope can be reduced to a local variable in spillToMergebableFiles.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33475 has finished for PR 6397 at commit 02355ef.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33476 has finished for PR 6397 at commit 931ca68.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33478 has started for PR 6397 at commit d267e0d.

@JoshRosen
Copy link
Contributor Author

Woah, looks like the test log is being flooded with warnings about being unable to delete spill files:

13:26:02.102 ERROR org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter: Unable to delete file for partition i. 
13:26:02.102 ERROR org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter: Unable to delete file for partition i. 
13:26:02.102 ERROR org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter: Unable to delete file for partition i. 
13:26:02.102 ERROR org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter: Unable to delete file for partition i. 

It looks like I had the if condition negated; will fix now.

@JoshRosen
Copy link
Contributor Author

Alright, I think that this is ready for review so I'm going to remove the WIP tag. I'd like feedback on testing: I think that the tests here, combined with the implicit testing when this path is used during other shuffle write paths, should be sufficient to convince yourself that I haven't broken anything seriously when porting the bypass code into its own file. It's worth calling out what I don't have good unit tests for, though: we don't have any tests which explicitly try to read per-partition output from the combined output file. AFAIK the ExternalSorterSuite didn't explicitly test this, either. I might end up adding a separate test utility for emulating how IndexShuffleBlockResolver will read chunks of the output file and index file in order to serve shuffles. This is part of an effort to avoid writing new end-to-end tests for testing basic shuffle functionality, since those tend to be slow and aren't great for testing things like metrics.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33597 has finished for PR 6397 at commit 8522b6a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Hmm, interesting test failure:

org.apache.spark.metrics.InputOutputMetricsSuite.shuffle records written metrics
 Error Details
0 did not equal 100000

@JoshRosen
Copy link
Contributor Author

Ah, I found the problem: we're closing the partition writers at the wrong place. We should be calling commitAndClose() at the end of insertAll(). The current usage of DiskBlockObjectWriter isn't legal because we call fileSegment() before we've called commitAndClose().

According to BlockObjectWriter's API contract:

  /**
   * Returns the file segment of committed data that this Writer has written.
   * This is only valid after commitAndClose() has been called.
   */
  def fileSegment(): FileSegment

However, DiskBlockObjectWriter doesn't have assertions to catch if fileSegment() is called at the incorrect time. I'll add these assertions, then fix my new writer code.

I also noticed that InputOutputMetricsSuite is one of the only places with an end-to-end test of the shuffle records written metric. This test probably belongs in ShuffleSuite, since it should be tested for each shuffle manager. We should also add another test to cover these metrics for shuffles which perform aggregation, since these shuffles may use a different code path.

@JoshRosen
Copy link
Contributor Author

Actually, I spoke slightly too soon: it looks like we do call commitAndClose() in writePartitionedFile(), but then it looks like we turn around and call revertPartialWritesAndClose() in stop() on a DiskBlockObjectWriter that's already closed. It looks like this ends up rolling back the shuffle metrics for the block that we've already committed. We probably shouldn't be calling revertPartialWritesAndClose() here anyways, but I think that BlockObjectWriter should guard against calling revert on a writer that's already been closed by making that operation into a no-op if the writer is already closed.

@JoshRosen
Copy link
Contributor Author

I added a bunch more tests to DiskBlockObjectWriter to ensure that all of the close methods are idempotent. I've also moved the metrics test from InputOutputMetricsSuite into ShuffleSuite so that they're run against all shuffle managers and have added a test to check that the metrics are correct when shuffles perform aggregation. I also added some additional checks to ensure that shuffleBytesRead == shuffleBytesWritten. /cc @ksakellis, since this touches some of your code in InputOutputMetricsSuite.

@JoshRosen
Copy link
Contributor Author

In case reviewers missed this in the updated PR description, here's a better summary of the new changes:

This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes:

  • DiskBlockObjectWriter now throws an exception if fileSegment() is called before commitAndClose() has been called.
  • DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes. Calling revertPartialWritesAndClose() on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics).
  • The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite. This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration.
  • The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle.
  • Our tests now check that shuffleBytesWritten == totalShuffleBytesRead.

@JoshRosen
Copy link
Contributor Author

@sryza, this might also be of interest to you.

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33681 has finished for PR 6397 at commit 03f35a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Hmm, looks like this fails nearly all of the HashShuffleSuite tests. I found the problem: after 8b8fb9e, the finalPosition in DiskBlockObjectWriter was no longer being set when calling commitAndClose() on writers that hadn't ever been opened. This caused fileSegment() to return a segment with a negative length, causing problems during shuffle fetching / serving. I've fixed this problem and added requires to FileSegment to guard against negative offsets and lengths.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33716 has finished for PR 6397 at commit 8b216c4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33723 has finished for PR 6397 at commit 8b216c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioner.getPartition(key) is different from the previous codes. The previous codes calls getPartition which doesn't call partitioner.getPartition(key) if there is only 1 partition. I'm not sure if such optimization does matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing this. I left this out on purpose, but should have probably commented on the diff to explain why.

I think that the reason why ExternalSorter skips the partitioner.getPartition(key) call when there is only one partition is because ExternalSorter is also used for non-shuffle contexts for which we don't define a partitioner (such as the reduce-side sort in sortByKey(). In those cases, we obviously want to avoid unnecessary hashing.

BypassMergeSortShuffleWriter is only used for shuffles, though, and I expect that it's extremely rare to have shuffles that shuffle everything to a single partition (collecting results to the driver is handled by different code). Therefore, I chose to leave out that check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Fair enough. LGTM now.

@zsxwing
Copy link
Member

zsxwing commented May 29, 2015

LGTM except the partitioner.getPartition(key) issue.

…ass-cleanup

Conflicts:
	core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
	core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33775 has finished for PR 6397 at commit bf3f3f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Spill the current in-memory collection to disk, adding a new file to spills, and clear it.
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
if (bypassMergeSort) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify a little further, since we no longer buffered records in memory when bypassMergeSort was true, this branch would never be taken.

@rxin
Copy link
Contributor

rxin commented May 30, 2015

LGTM.

@asfgit asfgit closed this in a643002 May 30, 2015
@JoshRosen JoshRosen deleted the external-sorter-bypass-cleanup branch June 4, 2015 22:24
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
… component

Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file.

The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code.

In addition to re-arranging code, this patch deletes a bunch of dead code.  The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / apache#3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code.

There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that.

This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes:

- DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called.
- DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes.  Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics).
- The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite.  This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration.
- The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle.
- Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`.
- FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits:

bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup
8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment
03f35a4 [Josh Rosen] Minor fix to cleanup logic.
b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite.
8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter.
16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called.
96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call
8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation
08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer()
d7f9938 [Josh Rosen] Add missing overrides; fix compilation
71d76ff [Josh Rosen] Update Javadoc
bf0d98f [Josh Rosen] Add comment to clarify confusing factory code
5197f73 [Josh Rosen] Add missing private[this]
30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java
bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled
0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods
25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter
0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics
7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile()
6320112 [Josh Rosen] Add missing negation in deletion success check.
d267e0d [Josh Rosen] Fix style issue
7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop()
25aa3bd [Josh Rosen] Make sure to delete outputFile after errors.
931ca68 [Josh Rosen] Refactor tests.
6a35716 [Josh Rosen] Refactor logic for deciding when to bypass
4b03539 [Josh Rosen] Move conf prior to first use
1265b25 [Josh Rosen] Fix some style errors and comments.
02355ef [Josh Rosen] More simplification
d4cb536 [Josh Rosen] Delete more unused code
bb96678 [Josh Rosen] Add missing interface file
b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code
6185ee2 [Josh Rosen] WIP towards moving bypass code into own file.
8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable
19bccd6 [Josh Rosen] Remove duplicated buffer creation code.
18959bb [Josh Rosen] Move comparator methods closer together.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
… component

Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file.

The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code.

In addition to re-arranging code, this patch deletes a bunch of dead code.  The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / apache#3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code.

There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that.

This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes:

- DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called.
- DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes.  Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics).
- The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite.  This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration.
- The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle.
- Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`.
- FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits:

bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup
8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment
03f35a4 [Josh Rosen] Minor fix to cleanup logic.
b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite.
8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter.
16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called.
96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call
8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation
08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer()
d7f9938 [Josh Rosen] Add missing overrides; fix compilation
71d76ff [Josh Rosen] Update Javadoc
bf0d98f [Josh Rosen] Add comment to clarify confusing factory code
5197f73 [Josh Rosen] Add missing private[this]
30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java
bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled
0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods
25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter
0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics
7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile()
6320112 [Josh Rosen] Add missing negation in deletion success check.
d267e0d [Josh Rosen] Fix style issue
7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop()
25aa3bd [Josh Rosen] Make sure to delete outputFile after errors.
931ca68 [Josh Rosen] Refactor tests.
6a35716 [Josh Rosen] Refactor logic for deciding when to bypass
4b03539 [Josh Rosen] Move conf prior to first use
1265b25 [Josh Rosen] Fix some style errors and comments.
02355ef [Josh Rosen] More simplification
d4cb536 [Josh Rosen] Delete more unused code
bb96678 [Josh Rosen] Add missing interface file
b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code
6185ee2 [Josh Rosen] WIP towards moving bypass code into own file.
8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable
19bccd6 [Josh Rosen] Remove duplicated buffer creation code.
18959bb [Josh Rosen] Move comparator methods closer together.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants