Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Oct 3, 2024
1 parent 37a3992 commit 469ceff
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import org.apache.calcite.tools.RuleSets;
import org.immutables.value.Value;

import java.util.Collections;

/**
* Traverses an event time temporal table join {@link RelNode} tree and update the right child to
* set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent
Expand Down Expand Up @@ -144,7 +142,7 @@ private RelNode transmitSnapshotRequirement(RelNode node) {
return wma.copy(
wma.getTraitSet(),
newChild,
Collections.emptyList(),
wma.getHints(),
wma.rowtimeFieldIndex(),
wma.watermarkExpr());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class StreamPhysicalWatermarkAssigner(
}

override def withHints(hintList: util.List[RelHint]): RelNode = {

new StreamPhysicalWatermarkAssigner(
cluster,
traitSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ void before() {
+ " 'connector' = 'values'\n"
+ ")");

util.tableEnv()
.executeSql(
"CREATE TABLE tableWithWatermark1 (\n"
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c TIMESTAMP(3),"
+ " WATERMARK FOR c AS c"
+ ") WITH ("
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false'\n"
+ ")");

util.tableEnv()
.executeSql(
"CREATE TABLE tableWithWatermark2 ("
+ " a int,\n"
+ " b BIGINT,\n"
+ " c ROW<c1 TIMESTAMP(3)>,\n"
+ " d AS c.c1 + INTERVAL '5' SECOND,\n"
+ " WATERMARK FOR d as d - INTERVAL '5' second"
+ ") WITH ("
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false'\n"
+ ")");

util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as b4 from T3");

util.tableEnv()
Expand Down Expand Up @@ -297,6 +322,26 @@ void testAggStateTtlWithEmptyKV() {
"Invalid STATE_TTL hint, expecting at least one key-value options specified.");
}

@Test
void testWatermarkAssigner() {
String sql =
"\n"
+ "SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1\n"
+ "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n"
+ "ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL";
verify(sql);
}

@Test
void testWatermarkAssignerWithAliases() {
String sql =
"\n"
+ "SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM tableWithWatermark1 tww1\n"
+ "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n"
+ "ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL";
verify(sql);
}

private void verify(String sql) {
util.doVerifyPlan(
sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,4 @@ void transposeWithDuplicateColumns() {
void transposeWithWatermarkWithMultipleInput() {
util.verifyRelPlan("SELECT a FROM UdfTable");
}

@Test
void transposeWithHints() {
util.verifyRelPlan(
"SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st "
+ "LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt "
+ "ON st.b = vt.b WHERE vt.b IS NOT NULL");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,82 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkAssigner">
<Resource name="sql">
<![CDATA[
SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1
LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[IS NOT NULL($3)])
+- LogicalJoin(condition=[=($1, $3)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tableWithWatermark1=1d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS options:[tableWithWatermark1]]]])
: +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark1]])
+- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+- LogicalProject(b=[$1])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT NULL(b)])
: +- WatermarkAssigner(rowtime=[c], watermark=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c])
+- Exchange(distribution=[hash[b]])
+- GroupAggregate(groupBy=[b], select=[b])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b], where=[IS NOT NULL(b)])
+- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)])
+- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark2, project=[b, c], metadata=[]]], fields=[b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkAssignerWithAliases">
<Resource name="sql">
<![CDATA[
SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM tableWithWatermark1 tww1
LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[IS NOT NULL($3)])
+- LogicalJoin(condition=[=($1, $3)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tww1=1d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS options:[tww1]]]])
: +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark1]])
+- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+- LogicalProject(b=[$1])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT NULL(b)])
: +- WatermarkAssigner(rowtime=[c], watermark=[c])
: +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c])
+- Exchange(distribution=[hash[b]])
+- GroupAggregate(groupBy=[b], select=[b])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b], where=[IS NOT NULL(b)])
+- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)])
+- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark2, project=[b, c], metadata=[]]], fields=[b, c])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -180,40 +180,6 @@ LogicalProject(a=[$0])
+- LogicalWatermarkAssigner(rowtime=[b], watermark=[myFunc($1, $2)])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, UdfTable]])
]]>
</Resource>
</TestCase>
<TestCase name="transposeWithHints">
<Resource name="sql">
<![CDATA[SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IS NOT NULL($4)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{st=1d, vt=3d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, SimpleTable]])
+- LogicalAggregate(group=[{0}])
+- LogicalProject(b=[$1])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IS NOT NULL($4)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, SimpleTable]])
+- LogicalAggregate(group=[{0}])
+- LogicalProject(b=[$0])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($1, 5000:INTERVAL SECOND)])
+- LogicalProject(b=[$1], d=[$3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable]])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit 469ceff

Please sign in to comment.