From bc1a8205fd0f1254966dcfbb337ef2ee05d83186 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 25 May 2015 20:31:54 -0700 Subject: [PATCH] Fix bug when aggregator is used but map-side combine is disabled --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 6161ad8cbbbcc..db25316adbd26 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -55,7 +55,7 @@ private[spark] class SortShuffleWriter[K, V, C]( new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else if (SortShuffleWriter.shouldBypassMergeSort( - SparkEnv.get.conf, dep.partitioner.numPartitions, dep.aggregator, dep.keyOrdering)) { + SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, dep.keyOrdering)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need local aggregation and sorting, write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge @@ -67,7 +67,8 @@ private[spark] class SortShuffleWriter[K, V, C]( // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. - new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) + new ExternalSorter[K, V, V]( + aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records)