Skip to content

Commit

Permalink
Remove excessively clever (and wrong) implementation of newBuffer()
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 27, 2015
1 parent d7f9938 commit 08e40f3
Showing 1 changed file with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,16 @@ private[spark] class ExternalSorter[K, V, C](
// grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)

// The idea here is to reduce minimize the visibility of variables like `kvChunkSize` and
// `useSerializedPairBuffer` without recomputing them on each call to `newBuffer()`.
private def newBuffer: () => WritablePartitionedPairCollection[K, C] with SizeTracker = {
val useSerializedPairBuffer =
ordering.isEmpty &&
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
ser.supportsRelocationOfSerializedObjects
private val useSerializedPairBuffer =
ordering.isEmpty &&
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
ser.supportsRelocationOfSerializedObjects
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
private def newBuffer(): WritablePartitionedPairCollection[K, C] with SizeTracker = {
if (useSerializedPairBuffer) {
val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
() => new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance)
new PartitionedSerializedPairBuffer(metaInitialRecords = 256, kvChunkSize, serInstance)
} else {
() => new PartitionedPairBuffer[K, C]
new PartitionedPairBuffer[K, C]
}
}
// Data structures to store in-memory objects before we spill. Depending on whether we have an
Expand Down

0 comments on commit 08e40f3

Please sign in to comment.