Skip to content

Commit

Permalink
Add comment explaining use of createTempShuffleBlock().
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Oct 22, 2014
1 parent 2c687b9 commit c8dd8f2
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ private[spark] class ExternalSorter[K, V, C](
private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = {
assert(!bypassMergeSort)

// Because these files may be read during shuffle, they must be compressed using
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
Expand Down Expand Up @@ -343,6 +346,9 @@ private[spark] class ExternalSorter[K, V, C](
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
partitionWriters = Array.fill(numPartitions) {
// Because these files may be read during shuffle, they must be compressed using
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
Expand Down

0 comments on commit c8dd8f2

Please sign in to comment.