-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
create and use palantir-hadoop profile #51
Conversation
0a0151f
to
ffb7ef9
Compare
Apparently the build failed because of |
@@ -2543,6 +2543,16 @@ | |||
</profile> | |||
|
|||
<profile> | |||
<id>palantir-hadoop</id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this hadoop-palantir
to match the other profiles hadoop-X
format
@sjrand yes I believe that env variables are only available for forks coming from the github.com/palantir repo, not personal repos. Change your PR to be from |
I don't think I can make the PR from |
(In other words can I have write perms?) |
@sjrand added |
Closed in favor of #54, which is the same change, but from a branch within this repo. |
… more scenarios such as PartitioningCollection ### What changes were proposed in this pull request? This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios: 1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`. 2. Handle `PartitioningCollection`, which may contain `HashPartitioning` ### Why are the changes needed? 1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side. The following will not consider the right-side `HashPartitioning`: ``` val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2") df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2")) join.explain == Physical Plan == *(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0. +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. For the scenario 2), the current behavior does not handle `PartitioningCollection`: ``` val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2") val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3")) join2.explain == Physical Plan == *(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner :- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed : +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed : +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64] +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(7) LocalTableScan [_1#24, _2#25] ``` ### Does this PR introduce _any_ user-facing change? Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed: 1. Senario 1): ``` == Physical Plan == *(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0 +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. Scenario 2): ``` == Physical Plan == *(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner :- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58] +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(6) LocalTableScan [_1#24, _2#25] ``` ### How was this patch tested? Added tests. Closes apache#29074 from imback82/reorder_keys. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@robert3005 @pwoody @ash211