-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on #3422
Conversation
Test build #23766 has started for PR 3422 at commit
|
Test build #23766 has finished for PR 3422 at commit
|
Test FAILed. |
@@ -350,7 +359,7 @@ private[spark] class ExternalSorter[K, V, C]( | |||
} | |||
} | |||
|
|||
val it = collection.iterator // No need to sort stuff, just write each element out | |||
val it = iterator // No need to sort stuff, just write each element out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just use iterator
in the code below instead of giving it a different name here
Change to ExternalSorter looks okay to me, though it might now create files in cases where none might've been needed. But it seems fine as a way to fix this. The main place where it could be less efficient is on the reduce side, where you'd call iterator() instead of writePartitionedFile(), but in that case you always have aggregator or ordering set in the current code, so bypassMergeSort would be false. |
@@ -132,7 +132,7 @@ private[spark] class ExternalSorter[K, V, C]( | |||
// files open at a time and thus more memory allocated to buffers. | |||
private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) | |||
private val bypassMergeSort = | |||
(numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty) | |||
numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind reverting some of these style changes (e.g. heap empty below). I'd like to be as surgical as possible with this patch and keep the surface area of the changes minimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
(Maybe I should disable some more IDEA inspections, just couldn't help to make these changes as they are highlighted there and shouting at me...)
Test build #23789 has started for PR 3422 at commit
|
Test build #23789 has finished for PR 3422 at commit
|
Test PASSed. |
LGTM from the sort side. |
Thanks Cheng! I'm going to merge this into branch 1.2 and master. |
… shuffle is on This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`, 1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and 2. avoids defensive copies in `Exchange` operator <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits: 591f2e9 [Cheng Lian] Passes all shuffle suites 0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed ed5df3c [Cheng Lian] Fixes styling changes f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on (cherry picked from commit a6d7b61) Signed-off-by: Michael Armbrust <michael@databricks.com>
@@ -748,6 +759,12 @@ private[spark] class ExternalSorter[K, V, C]( | |||
|
|||
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled | |||
context.taskMetrics.diskBytesSpilled += diskBytesSpilled | |||
context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was reading through ExternalSorter
to try to understand how shuffle write time metrics are calculated and came across this line. This style is confusing to a casual reader: it looks like the logic here is "if shuffle write metrics are defined and merge sort is bypassed, then run this block", but this is slightly obfuscated by the fact that we're filtering an option with a filter function that doesn't depend on that option's value.
For next time, I think we should just use a simple if
statement instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry for that. "Learned" this from Michael, won't do this again, lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lies 😜
I don't think I've ever written a .filter(_ => ...
though I will admit guilt for merging all but one instance in the codebase...
@@ -205,6 +205,13 @@ private[spark] class ExternalSorter[K, V, C]( | |||
map.changeValue((getPartition(kv._1), kv._1), update) | |||
maybeSpillCollection(usingMap = true) | |||
} | |||
} else if (bypassMergeSort) { | |||
// SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skipping this buffering seems to make it so that much of the rest of the bypassMergeSort
-handling code is no longer needed. For example, if we don't buffer then we won't need to spill, so we can remove the code that deals with merging spills in the bypassMergeSort
case. Based on this, I've opened #6397 to remove all of this now-unused code and to move the handling of the bypassMergeSort
path into its own file. It would be great if this PR's reviewers could look at that PR to double-check my reasoning.
… component Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file. The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code. In addition to re-arranging code, this patch deletes a bunch of dead code. The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / #3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code. There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that. This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes: - DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called. - DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes. Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics). - The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite. This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration. - The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle. - Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`. - FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset. Author: Josh Rosen <joshrosen@databricks.com> Closes #6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits: bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup 8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment 03f35a4 [Josh Rosen] Minor fix to cleanup logic. b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite. 8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter. 16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called. 96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call 8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation 08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer() d7f9938 [Josh Rosen] Add missing overrides; fix compilation 71d76ff [Josh Rosen] Update Javadoc bf0d98f [Josh Rosen] Add comment to clarify confusing factory code 5197f73 [Josh Rosen] Add missing private[this] 30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled 0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods 25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter 0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics 7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile() 6320112 [Josh Rosen] Add missing negation in deletion success check. d267e0d [Josh Rosen] Fix style issue 7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop() 25aa3bd [Josh Rosen] Make sure to delete outputFile after errors. 931ca68 [Josh Rosen] Refactor tests. 6a35716 [Josh Rosen] Refactor logic for deciding when to bypass 4b03539 [Josh Rosen] Move conf prior to first use 1265b25 [Josh Rosen] Fix some style errors and comments. 02355ef [Josh Rosen] More simplification d4cb536 [Josh Rosen] Delete more unused code bb96678 [Josh Rosen] Add missing interface file b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code 6185ee2 [Josh Rosen] WIP towards moving bypass code into own file. 8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable 19bccd6 [Josh Rosen] Remove duplicated buffer creation code. 18959bb [Josh Rosen] Move comparator methods closer together.
… component Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file. The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code. In addition to re-arranging code, this patch deletes a bunch of dead code. The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / apache#3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code. There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that. This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes: - DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called. - DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes. Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics). - The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite. This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration. - The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle. - Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`. - FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits: bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup 8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment 03f35a4 [Josh Rosen] Minor fix to cleanup logic. b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite. 8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter. 16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called. 96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call 8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation 08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer() d7f9938 [Josh Rosen] Add missing overrides; fix compilation 71d76ff [Josh Rosen] Update Javadoc bf0d98f [Josh Rosen] Add comment to clarify confusing factory code 5197f73 [Josh Rosen] Add missing private[this] 30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled 0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods 25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter 0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics 7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile() 6320112 [Josh Rosen] Add missing negation in deletion success check. d267e0d [Josh Rosen] Fix style issue 7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop() 25aa3bd [Josh Rosen] Make sure to delete outputFile after errors. 931ca68 [Josh Rosen] Refactor tests. 6a35716 [Josh Rosen] Refactor logic for deciding when to bypass 4b03539 [Josh Rosen] Move conf prior to first use 1265b25 [Josh Rosen] Fix some style errors and comments. 02355ef [Josh Rosen] More simplification d4cb536 [Josh Rosen] Delete more unused code bb96678 [Josh Rosen] Add missing interface file b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code 6185ee2 [Josh Rosen] WIP towards moving bypass code into own file. 8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable 19bccd6 [Josh Rosen] Remove duplicated buffer creation code. 18959bb [Josh Rosen] Move comparator methods closer together.
… component Spark's `ExternalSorter` writes shuffle output files during sort-based shuffle. Sort-shuffle contains a configuration, `spark.shuffle.sort.bypassMergeThreshold`, which causes ExternalSorter to skip sorting and merging and simply write separate files per partition, which are then concatenated together to form the final map output file. The code paths used during this bypass are almost completely separate from ExternalSorter's other code paths, so refactoring them into a separate file can significantly simplify the code. In addition to re-arranging code, this patch deletes a bunch of dead code. The main entry point into ExternalSorter is `insertAll()` and in SPARK-4479 / apache#3422 this method was modified to completely bypass in-memory buffering of records when `bypassMergeSort` takes effect. As a result, some of the spilling and merging code paths will no longer be called when `bypassMergeSort` is used, so we should be able to safely remove that code. There's an open JIRA ([SPARK-6026](https://issues.apache.org/jira/browse/SPARK-6026)) for removing the `bypassMergeThreshold` parameter and code paths; I have not done that here, but the changes in this patch will make removing that parameter significantly easier if we ever decide to do that. This patch also makes several improvements to shuffle-related tests and adds more defensive checks to certain shuffle classes: - DiskBlockObjectWriter now throws an exception if `fileSegment()` is called before `commitAndClose()` has been called. - DiskBlockObjectWriter's close methods are now idempotent, so calling any of the close methods twice in a row will no longer result in incorrect shuffle write metrics changes. Calling `revertPartialWritesAndClose()` on a closed DiskBlockObjectWriter now has no effect (before, it might mess up the metrics). - The end-to-end shuffle record count metrics tests have been moved from InputOutputMetricsSuite to ShuffleSuite. This means that these tests will now be run against all shuffle implementations rather than just the default shuffle configuration. - The end-to-end metrics tests now include a test of a job which performs aggregation in the shuffle. - Our tests now check that `shuffleBytesWritten == totalShuffleBytesRead`. - FileSegment now throws IllegalArgumentException if it is constructed with a negative length or offset. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#6397 from JoshRosen/external-sorter-bypass-cleanup and squashes the following commits: bf3f3f6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into external-sorter-bypass-cleanup 8b216c4 [Josh Rosen] Guard against negative offsets and lengths in FileSegment 03f35a4 [Josh Rosen] Minor fix to cleanup logic. b5cc35b [Josh Rosen] Move shuffle metrics tests to ShuffleSuite. 8b8fb9e [Josh Rosen] Add more tests + defensive programming to DiskBlockObjectWriter. 16564eb [Josh Rosen] Guard against calling fileSegment() before commitAndClose() has been called. 96811b4 [Josh Rosen] Remove confusing taskMetrics.shuffleWriteMetrics() optional call 8522b6a [Josh Rosen] Do not perform a map-side sort unless we're also doing map-side aggregation 08e40f3 [Josh Rosen] Remove excessively clever (and wrong) implementation of newBuffer() d7f9938 [Josh Rosen] Add missing overrides; fix compilation 71d76ff [Josh Rosen] Update Javadoc bf0d98f [Josh Rosen] Add comment to clarify confusing factory code 5197f73 [Josh Rosen] Add missing private[this] 30ef2c8 [Josh Rosen] Convert BypassMergeSortShuffleWriter to Java bc1a820 [Josh Rosen] Fix bug when aggregator is used but map-side combine is disabled 0d3dcc0 [Josh Rosen] Remove unnecessary overloaded methods 25b964f [Josh Rosen] Rename SortShuffleSorter to SortShuffleFileWriter 0d9848c [Josh Rosen] Make it more clear that curWriteMetrics is now only used for spill metrics 7af7aea [Josh Rosen] Combine spill() and spillToMergeableFile() 6320112 [Josh Rosen] Add missing negation in deletion success check. d267e0d [Josh Rosen] Fix style issue 7f15f7b [Josh Rosen] Back out extra cleanup-handling code, since this is already covered in stop() 25aa3bd [Josh Rosen] Make sure to delete outputFile after errors. 931ca68 [Josh Rosen] Refactor tests. 6a35716 [Josh Rosen] Refactor logic for deciding when to bypass 4b03539 [Josh Rosen] Move conf prior to first use 1265b25 [Josh Rosen] Fix some style errors and comments. 02355ef [Josh Rosen] More simplification d4cb536 [Josh Rosen] Delete more unused code bb96678 [Josh Rosen] Add missing interface file b6cc1eb [Josh Rosen] Realize that bypass never buffers; proceed to delete tons of code 6185ee2 [Josh Rosen] WIP towards moving bypass code into own file. 8d0678c [Josh Rosen] Move diskBytesSpilled getter next to variable 19bccd6 [Josh Rosen] Remove duplicated buffer creation code. 18959bb [Josh Rosen] Move comparator methods closer together.
This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in
ExternalSorter
,MutableRow
backed row objects must be copied, andExchange
operator