Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle dependencies #6773

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
} else if (dependency.aggregator.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
false
} else if (dependency.keyOrdering.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
false
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
mapSideCombine = false
)))

// Shuffles with key orderings are supported as long as no aggregator is specified
assert(canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = None,
mapSideCombine = false
)))

}

test("unsupported shuffle dependencies") {
Expand All @@ -100,22 +109,14 @@ class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
mapSideCombine = false
)))

// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = None,
mapSideCombine = false
)))
// We do not support shuffles that perform aggregation
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = None,
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
mapSideCombine = false
)))
// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.catalyst.errors.attachTree
Expand Down Expand Up @@ -81,11 +82,7 @@ case class Exchange(
shuffleManager.isInstanceOf[UnsafeShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
// If we're using the original SortShuffleManager and the number of output partitions is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just try to understand the context. Is this if (newOrdering.nonEmpty) part not necessary because of our recent change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment in PR description; even if an ordering was defined I don't think it would be used to sort on the map side because we don't do map side sort in shuffle unless we specify an aggregator

Expand All @@ -96,8 +93,11 @@ case class Exchange(
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
// shuffle dependency does not specify an aggregator or ordering and the record serializer
// has certain properties. If this optimization is enabled, we can safely avoid the copy.
//
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
// need to check whether the optimization is enabled and supported by our serializer.
//
// This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
false
Expand All @@ -108,9 +108,12 @@ case class Exchange(
// both cases, we must copy.
true
}
} else {
} else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check isn't fixing any bugs / correctness issues yet, but I thought that it might guard against hard-to-find future bugs if someone defines a new shuffle manager without updating the logic here. It's extremely unlikely that anyone would do this, but in light of proposals like ParquetShuffleManager it seemed like the safest option.

// We're using hash-based shuffle, so we don't need to copy.
false
} else {
// Catch-all case to safely handle any future ShuffleManager implementations.
true
}
}

Expand Down