From c8dd8f28be4382d02594607b50d694448027f8f9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Oct 2014 10:31:10 -0700 Subject: [PATCH] Add comment explaining use of createTempShuffleBlock(). --- .../org/apache/spark/util/collection/ExternalSorter.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6621bf8772abd..dae88b6365a8a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -264,6 +264,9 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) + // Because these files may be read during shuffle, they must be compressed using + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -343,6 +346,9 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { + // Because these files may be read during shuffle, they must be compressed using + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() }