Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8319] [CORE] [SQL] Update logic related to key orderings in shuffle dependencies #6773

Closed
wants to merge 4 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch updates two pieces of logic that are related to handling of keyOrderings in ShuffleDependencies:

  • The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever the shuffle dependency specifies a key ordering, but technically we only need to fall back when an aggregator is also specified. This patch updates the fallback logic to reflect this so that the Tungsten optimizations can apply to more workloads.

  • The SQL Exchange operator performs defensive copying of shuffle inputs when a key ordering is specified, but this is unnecessary. The copying was added to guard against cases where ExternalSorter would buffer non-serialized records in memory. When ExternalSorter is configured without an aggregator, it uses the following logic to determine whether to buffer records in a serialized or deserialized format:

     private val useSerializedPairBuffer =
        ordering.isEmpty &&
        conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
        ser.supportsRelocationOfSerializedObjects

    The newOrdering.isDefined branch in ExternalSorter.needToCopyObjectsBeforeShuffle, removed by this patch, is not necessary:

    • It was checked even if we weren't using sort-based shuffle, but this was unnecessary because only SortShuffleManager performs map-side sorting.
    • Map-side sorting during shuffle writing is only performed for shuffles that perform map-side aggregation as part of the shuffle (to see this, look at how SortShuffleWriter constructs ExternalSorter). Since SQL never pushes aggregation into Spark's shuffle, we can guarantee that both the aggregator and ordering will be empty and Spark SQL always uses serializers that support relocation, so sort-shuffle will use the serialized pair buffer unless the user has explicitly disabled it via the SparkConf feature-flag. Therefore, I think my optimization in Exchange should be safe.

@JoshRosen
Copy link
Contributor Author

/cc @yhuai for Exchange-related changes.

// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just try to understand the context. Is this if (newOrdering.nonEmpty) part not necessary because of our recent change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment in PR description; even if an ordering was defined I don't think it would be used to sort on the map side because we don't do map side sort in shuffle unless we specify an aggregator

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #34738 has finished for PR 6773 at commit 85a4628.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #34751 has finished for PR 6773 at commit 85a4628.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Jun 12, 2015

ah, i see. Exchange related part looks good to me.

@JoshRosen
Copy link
Contributor Author

This patch was pretty tricky to explain, so I've revised the description and have slightly extended the code comments. I think it should be good-to-go after the next Jenkins run, so I'll merge it unless there are any objections.

@@ -108,9 +108,12 @@ case class Exchange(
// both cases, we must copy.
true
}
} else {
} else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check isn't fixing any bugs / correctness issues yet, but I thought that it might guard against hard-to-find future bugs if someone defines a new shuffle manager without updating the logic here. It's extremely unlikely that anyone would do this, but in light of proposals like ParquetShuffleManager it seemed like the safest option.

@SparkQA
Copy link

SparkQA commented Jun 13, 2015

Test build #34830 has finished for PR 6773 at commit 7a14129.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Alright, I'm going to merge this into master.

@asfgit asfgit closed this in af31335 Jun 13, 2015
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…uffle dependencies

This patch updates two pieces of logic that are related to handling of keyOrderings in ShuffleDependencies:

- The Tungsten ShuffleManager falls back to regular SortShuffleManager whenever the shuffle dependency specifies a key ordering, but technically we only need to fall back when an aggregator is also specified. This patch updates the fallback logic to reflect this so that the Tungsten optimizations can apply to more workloads.

- The SQL Exchange operator performs defensive copying of shuffle inputs when a key ordering is specified, but this is unnecessary. The copying was added to guard against cases where ExternalSorter would buffer non-serialized records in memory.  When ExternalSorter is configured without an aggregator, it uses the following logic to determine whether to buffer records in a serialized or deserialized format:

   ```scala
     private val useSerializedPairBuffer =
        ordering.isEmpty &&
        conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
        ser.supportsRelocationOfSerializedObjects
   ```

   The `newOrdering.isDefined` branch in `ExternalSorter.needToCopyObjectsBeforeShuffle`, removed by this patch, is not necessary:

   - It was checked even if we weren't using sort-based shuffle, but this was unnecessary because only SortShuffleManager performs map-side sorting.
   - Map-side sorting during shuffle writing is only performed for shuffles that perform map-side aggregation as part of the shuffle (to see this, look at how SortShuffleWriter constructs ExternalSorter).  Since SQL never pushes aggregation into Spark's shuffle, we can guarantee that both the aggregator and ordering will be empty and Spark SQL always uses serializers that support relocation, so sort-shuffle will use the serialized pair buffer unless the user has explicitly disabled it via the SparkConf feature-flag.  Therefore, I think my optimization in Exchange should be safe.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6773 from JoshRosen/SPARK-8319 and squashes the following commits:

7a14129 [Josh Rosen] Revise comments; add handler to guard against future ShuffleManager implementations
07bb2c9 [Josh Rosen] Update comment to clarify circumstances under which shuffle operates on serialized records
269089a [Josh Rosen] Avoid unnecessary copy in SQL Exchange
34e526e [Josh Rosen] Enable Tungsten shuffle for non-agg shuffles w/ key orderings
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants