Skip to content

Commit

Permalink
Fix bug when aggregator is used but map-side combine is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 26, 2015
1 parent 0d3dcc0 commit bc1a820
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[spark] class SortShuffleWriter[K, V, C](
new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else if (SortShuffleWriter.shouldBypassMergeSort(
SparkEnv.get.conf, dep.partitioner.numPartitions, dep.aggregator, dep.keyOrdering)) {
SparkEnv.get.conf, dep.partitioner.numPartitions, aggregator = None, dep.keyOrdering)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need local aggregation and sorting, write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
Expand All @@ -67,7 +67,8 @@ private[spark] class SortShuffleWriter[K, V, C](
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
new ExternalSorter[K, V, V](
aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)

Expand Down

0 comments on commit bc1a820

Please sign in to comment.