Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2213][SQL] Sort Merge Join #3173

Closed
wants to merge 8 commits into from

Conversation

Ishiihara
Copy link
Contributor

This PR adds MergeJoin operator to Spark SQL. The semantics of MergeJoin operator is similar to Hive's Sort merge bucket join.

MergeJoin operator relies on SortBasedShuffle to create partitions that sorted by the join key. In each partition, we merge the two child iterators. The tricky part in merge step is handling duplicate join keys. To handle duplicate keys, we use a buffer to store all matching elements in right iterator for a certain join key. The buffer is used for generating join tuples when the join key of the next left element is the same as the current join key.

MergeJoin reduces extra memory consumption, in the current implementation, MergeJoin only needs memory that can hold elements of the key that has the most duplicates in right iterator.

For query optimization, we may resolve to MergeJoin when both relations are large and neither of the two can fit in memory. Currently, this heuristic is not added to optimizer. I would appreciate if you can add comments on how to resolve to MergeJoin in optimizer.

Currently, MergeJoin only supports inner join. However, it can be extended to support outer join. Will handle outer join in separate PRs.

@SparkQA
Copy link

SparkQA commented Nov 9, 2014

Test build #23112 has started for PR 3173 at commit 5cb98c3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 9, 2014

Test build #23112 has finished for PR 3173 at commit 5cb98c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression]) extends Distribution
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class MergeJoin(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23112/
Test FAILed.

val mergeJoin = joins.MergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just passing through, but is this right? It appears to be the same case as above, so I'm not sure when it would be matched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the hash inner join will not be matched. I am trying to resolve to merge join on inner join. I am still figuring out the best way to add merge join to query optimizer.

@SparkQA
Copy link

SparkQA commented Nov 9, 2014

Test build #23113 has started for PR 3173 at commit cc4647d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 9, 2014

Test build #23113 has finished for PR 3173 at commit cc4647d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression]) extends Distribution
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class MergeJoin(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23113/
Test FAILed.

@chenghao-intel
Copy link
Contributor

That's really nice to have the Sort-Merge-Join, as we did meet some of join queries couldn't run completely in real cases.

One high level comment on this, can we also keep the ShuffleHashJoin? It still can be faster than the Sort-Merge-Join in some cases, all we need is a configuration/strategy to map to different Join Operators.

BTW: do you have any performance comparison result can be shared with us?

@marmbrus
Copy link
Contributor

marmbrus commented Dec 2, 2014

/cc @yhuai for the changes to our partitioning API.

I also agree with @chenghao-intel that we probably want to keep ShuffleHashJoin and also that we need some performance comparison information.

* [[Expression Expressions]] will be co-located. Based on the context, this
* can mean such tuples are either co-located in the same partition or they will be contiguous
* within a single partition.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you want to update this comment.

@marmbrus
Copy link
Contributor

Thanks for working on this! However, since there are a few questions to address about how to select this operator, I suggest we move discussion to JIRA and close this issue for now. Please reopen once it is passing tests and ready for review.

@asfgit asfgit closed this in ef84dab Dec 12, 2014
@justinuang
Copy link

Hi, this looks great! Is there a reason why sort based join is not in spark core, only in spark SQL?

@lianhuiwang
Copy link
Contributor

@justinuang i think you are interesting in SPARK-5763.

asfgit pushed a commit that referenced this pull request Apr 15, 2015
Thanks for the initial work from Ishiihara in #3173

This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using [sort merge join](http://en.wikipedia.org/wiki/Sort-merge_join). In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient.

We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(`true`) and ShuffledHashJoin(`false`), probably we want the default value of it be `false` at first.

Author: Daoyuan Wang <daoyuan.wang@intel.com>
Author: Michael Armbrust <michael@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>

Closes #5208 from adrian-wang/smj and squashes the following commits:

2493b9f [Daoyuan Wang] fix style
5049d88 [Daoyuan Wang] propagate rowOrdering for RangePartitioning
f91a2ae [Daoyuan Wang] yin's comment: use external sort if option is enabled, add comments
f515cd2 [Daoyuan Wang] yin's comment: outputOrdering, join suite refine
ec8061b [Daoyuan Wang] minor change
413fd24 [Daoyuan Wang] Merge pull request #3 from marmbrus/pr/5208
952168a [Michael Armbrust] add type
5492884 [Michael Armbrust] copy when ordering
7ddd656 [Michael Armbrust] Cleanup addition of ordering requirements
b198278 [Daoyuan Wang] inherit ordering in project
c8e82a3 [Daoyuan Wang] fix style
6e897dd [Daoyuan Wang] hide boundReference from manually construct RowOrdering for key compare in smj
8681d73 [Daoyuan Wang] refactor Exchange and fix copy for sorting
2875ef2 [Daoyuan Wang] fix changed configuration
61d7f49 [Daoyuan Wang] add omitted comment
00a4430 [Daoyuan Wang] fix bug
078d69b [Daoyuan Wang] address comments: add comments, do sort in shuffle, and others
3af6ba5 [Daoyuan Wang] use buffer for only one side
171001f [Daoyuan Wang] change default outputordering
47455c9 [Daoyuan Wang] add apache license ...
a28277f [Daoyuan Wang] fix style
645c70b [Daoyuan Wang] address comments using sort
068c35d [Daoyuan Wang] fix new style and add some tests
925203b [Daoyuan Wang] address comments
07ce92f [Daoyuan Wang] fix ArrayIndexOutOfBound
42fca0e [Daoyuan Wang] code clean
e3ec096 [Daoyuan Wang] fix comment style..
2edd235 [Daoyuan Wang] fix outputpartitioning
57baa40 [Daoyuan Wang] fix sort eval bug
303b6da [Daoyuan Wang] fix several errors
95db7ad [Daoyuan Wang] fix brackets for if-statement
4464f16 [Daoyuan Wang] fix error
880d8e9 [Daoyuan Wang] sort merge join for spark sql
@Knight-Wu Knight-Wu mentioned this pull request Jul 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants