Skip to content

Commit

Permalink
[FLINK-36007][table] Apply changes from FLINK-36277 to AdaptiveJoin
Browse files Browse the repository at this point in the history
This closes #25895

--------------------

Co-authored-by: xuyang <xyzhong188@163.com>
  • Loading branch information
snuyanzin and xuyangzhong committed Jan 4, 2025
1 parent ec169a8 commit f08e5ec
Showing 1 changed file with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ LogicalProject(EXPR$0=[$1])
+- LogicalFilter(condition=[AND(=($0, $4), =($0, $8))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalJoin(condition=[true], joinType=[inner])
: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3, d3)]]])
: :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalTableScan(table=[[default_catalog, default_database, T3]])
]]>
</Resource>
<Resource name="optimized exec plan">
Expand All @@ -52,13 +52,13 @@ Calc(select=[EXPR$0])
: +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2], build=[right])
: :- Exchange(distribution=[hash[a1]])
: : +- Calc(select=[a1, b1])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
: +- Exchange(distribution=[hash[a2]])
: +- Calc(select=[a2])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
: +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
+- Exchange(distribution=[hash[a3]])
+- Calc(select=[a3])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3, d3)]]], fields=[a3, b3, c3, d3])
+- TableSourceScan(table=[[default_catalog, default_database, T3]], fields=[a3, b3, c3, d3])
]]>
</Resource>
</TestCase>
Expand All @@ -71,17 +71,17 @@ Calc(select=[EXPR$0])
LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], a2=[$4], b2=[$5], c2=[$6], d2=[$7])
+- LogicalFilter(condition=[=($0, $4)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
AdaptiveJoin(originalJoin=[SortMergeJoin], joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, d1, a2, b2, c2, d2])
:- Exchange(distribution=[hash[a1]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
+- Exchange(distribution=[hash[a2]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
]]>
</Resource>
</TestCase>
Expand All @@ -94,17 +94,17 @@ AdaptiveJoin(originalJoin=[SortMergeJoin], joinType=[InnerJoin], where=[(a1 = a2
LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], a2=[$4], b2=[$5], c2=[$6], d2=[$7])
+- LogicalFilter(condition=[=($0, $4)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
AdaptiveJoin(originalJoin=[ShuffleHashJoin], joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, d1, a2, b2, c2, d2], build=[left])
:- Exchange(distribution=[hash[a1]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
+- Exchange(distribution=[hash[a2]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
]]>
</Resource>
</TestCase>
Expand All @@ -117,17 +117,17 @@ AdaptiveJoin(originalJoin=[ShuffleHashJoin], joinType=[InnerJoin], where=[(a1 =
LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], a2=[$4], b2=[$5], c2=[$6], d2=[$7])
+- LogicalFilter(condition=[=($0, $4)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
AdaptiveJoin(originalJoin=[ShuffleHashJoin], joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, d1, a2, b2, c2, d2], build=[left])
:- Exchange(distribution=[hash[a1]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
+- Exchange(distribution=[hash[a2]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
]]>
</Resource>
</TestCase>
Expand All @@ -140,16 +140,16 @@ AdaptiveJoin(originalJoin=[ShuffleHashJoin], joinType=[InnerJoin], where=[(a1 =
LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], a2=[$4], b2=[$5], c2=[$6], d2=[$7])
+- LogicalFilter(condition=[=($0, $4)])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, d1, a2, b2, c2, d2], isBroadcast=[true], build=[right])
:- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
:- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
+- Exchange(distribution=[broadcast])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
]]>
</Resource>
</TestCase>
Expand All @@ -168,10 +168,10 @@ LogicalProject(a=[$0], a0=[$1], b=[$2], c=[$3], d=[$4])
:- LogicalUnion(all=[true])
: :- LogicalProject(a=[$0])
: : +- LogicalProject(a=[$0])
: : +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
: : +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
: +- LogicalProject(a=[$0])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="optimized exec plan">
Expand All @@ -180,11 +180,11 @@ AdaptiveJoin(originalJoin=[ShuffleHashJoin], joinType=[LeftOuterJoin], where=[(a
:- Exchange(distribution=[hash[a]])
: +- Union(all=[true], union=[a])
: :- Calc(select=[a1 AS a])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
: +- Calc(select=[a2 AS a])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
: +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
+- Exchange(distribution=[hash[a]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
Expand All @@ -204,27 +204,27 @@ LogicalProject(a=[$0], d2=[$1])
+- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
:- LogicalProject(a=[$4])
: +- LogicalJoin(condition=[=($4, $0)], joinType=[inner])
: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
: :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
: +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+- LogicalProject(d2=[$7])
+- LogicalJoin(condition=[=($7, $0)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
MultipleInput(readOrder=[0,1,1,2], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d2)], select=[a, d2], build=[left])\n:- Calc(select=[a])\n: +- HashJoin(joinType=[InnerJoin], where=[(a = a1)], select=[a1, a], build=[left])\n: :- [#1] Exchange(distribution=[hash[a1]])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- Calc(select=[d2])\n +- HashJoin(joinType=[InnerJoin], where=[(d2 = a)], select=[a, d2], build=[left])\n :- [#2] Exchange(distribution=[hash[a]])\n +- [#4] Exchange(distribution=[hash[d2]])\n])
:- Exchange(distribution=[hash[a1]])
: +- Calc(select=[a1])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1, c1, d1])
:- Exchange(distribution=[hash[a]])(reuse_id=[1])
: +- Calc(select=[a])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
: +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, d])
:- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d2]])
+- Calc(select=[d2])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]], fields=[a2, b2, c2, d2])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2, c2, d2])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit f08e5ec

Please sign in to comment.