From 08e40f34119d974ba1fd026316a837ba8b46ed15 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 27 May 2015 09:22:26 -0700 Subject: [PATCH] Remove excessively clever (and wrong) implementation of newBuffer() --- .../spark/util/collection/ExternalSorter.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index de06afb7be44e..ef2dbb7ff0ae0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -130,18 +130,16 @@ private[spark] class ExternalSorter[K, V, C]( // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) - // The idea here is to reduce minimize the visibility of variables like `kvChunkSize` and - // `useSerializedPairBuffer` without recomputing them on each call to `newBuffer()`. - private def newBuffer: () => WritablePartitionedPairCollection[K, C] with SizeTracker = { - val useSerializedPairBuffer = - ordering.isEmpty && - conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && - ser.supportsRelocationOfSerializedObjects + private val useSerializedPairBuffer = + ordering.isEmpty && + conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && + ser.supportsRelocationOfSerializedObjects + private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB + private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = { if (useSerializedPairBuffer) { - val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB - () => new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) + new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance) } else { - () => new PartitionedPairBuffer[K, C] + new PartitionedPairBuffer[K, C] } } // Data structures to store in-memory objects before we spill. Depending on whether we have an