From 34e526e4cc7f6b45ca302a187591b654ba0f2d8d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 11 Jun 2015 18:02:08 -0700 Subject: [PATCH 1/4] Enable Tungsten shuffle for non-agg shuffles w/ key orderings --- .../shuffle/unsafe/UnsafeShuffleManager.scala | 3 --- .../unsafe/UnsafeShuffleManagerSuite.scala | 19 ++++++++++--------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala index f2bfef376d3ca..df7bbd64247dd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala @@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging { } else if (dependency.aggregator.isDefined) { log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined") false - } else if (dependency.keyOrdering.isDefined) { - log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined") - false } else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) { log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions") diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala index a73e94e05575e..6727934d8c7ca 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala @@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers { mapSideCombine = false ))) + // Shuffles with key orderings are supported as long as no aggregator is specified + assert(canUseUnsafeShuffle(shuffleDep( + partitioner = new HashPartitioner(2), + serializer = kryo, + keyOrdering = Some(mock(classOf[Ordering[Any]])), + aggregator = None, + mapSideCombine = false + ))) + } test("unsupported shuffle dependencies") { @@ -100,14 +109,7 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers { mapSideCombine = false ))) - // We do not support shuffles that perform any kind of aggregation or sorting of keys - assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(2), - serializer = kryo, - keyOrdering = Some(mock(classOf[Ordering[Any]])), - aggregator = None, - mapSideCombine = false - ))) + // We do not support shuffles that perform aggregation assert(!canUseUnsafeShuffle(shuffleDep( partitioner = new HashPartitioner(2), serializer = kryo, @@ -115,7 +117,6 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers { aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), mapSideCombine = false ))) - // We do not support shuffles that perform any kind of aggregation or sorting of keys assert(!canUseUnsafeShuffle(shuffleDep( partitioner = new HashPartitioner(2), serializer = kryo, From 269089a9fea215be11ade867d5553e80481db140 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 11 Jun 2015 18:09:16 -0700 Subject: [PATCH 2/4] Avoid unnecessary copy in SQL Exchange --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 6fa7ccc6cc89b..d8a40fe5a7d5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -81,11 +81,7 @@ case class Exchange( shuffleManager.isInstanceOf[UnsafeShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) - if (newOrdering.nonEmpty) { - // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, - // which requires a defensive copy. - true - } else if (sortBasedShuffleOn) { + if (sortBasedShuffleOn) { val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) { // If we're using the original SortShuffleManager and the number of output partitions is From 07bb2c95f3b7ddd931574726d577d10db9df972e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 11 Jun 2015 18:13:12 -0700 Subject: [PATCH 3/4] Update comment to clarify circumstances under which shuffle operates on serialized records --- .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index d8a40fe5a7d5c..4c90eb24b9abf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -92,7 +92,7 @@ case class Exchange( } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting // them. This optimization is guarded by a feature-flag and is only applied in cases where - // shuffle dependency does not specify an ordering and the record serializer has certain + // shuffle dependency does not specify an aggregator and the record serializer has certain // properties. If this optimization is enabled, we can safely avoid the copy. // // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081). From 7a1412982bcd4ec11733376888ab8370173e7977 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 13 Jun 2015 10:52:46 -0700 Subject: [PATCH 4/4] Revise comments; add handler to guard against future ShuffleManager implementations --- .../org/apache/spark/sql/execution/Exchange.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4c90eb24b9abf..29227dd72ca9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -21,6 +21,7 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager import org.apache.spark.sql.catalyst.errors.attachTree @@ -92,8 +93,11 @@ case class Exchange( } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting // them. This optimization is guarded by a feature-flag and is only applied in cases where - // shuffle dependency does not specify an aggregator and the record serializer has certain - // properties. If this optimization is enabled, we can safely avoid the copy. + // shuffle dependency does not specify an aggregator or ordering and the record serializer + // has certain properties. If this optimization is enabled, we can safely avoid the copy. + // + // Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only + // need to check whether the optimization is enabled and supported by our serializer. // // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081). false @@ -104,9 +108,12 @@ case class Exchange( // both cases, we must copy. true } - } else { + } else if (shuffleManager.isInstanceOf[HashShuffleManager]) { // We're using hash-based shuffle, so we don't need to copy. false + } else { + // Catch-all case to safely handle any future ShuffleManager implementations. + true } }