Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Oct 1, 2024
1 parent 8412dea commit 8c9f0cd
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.hint;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner;
import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;

Expand Down Expand Up @@ -119,7 +120,8 @@ public static Optional<String> getTableAlias(RelNode node) {
public static boolean canTransposeToTableScan(RelNode node) {
return node instanceof LogicalProject // computed column on table
|| node instanceof LogicalFilter
|| node instanceof LogicalSnapshot;
|| node instanceof LogicalSnapshot
|| node instanceof WatermarkAssigner;
}

/** Returns the qualified name of a table scan, otherwise returns empty. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.flink.table.planner.plan.nodes.calcite

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
*/
package org.apache.flink.table.planner.plan.nodes.calcite

import com.google.common.collect.ImmutableList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
import org.apache.calcite.rel.hint.{Hintable, RelHint}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName

import java.util

import scala.collection.JavaConversions._

/** Relational operator that generates [[org.apache.flink.streaming.api.watermark.Watermark]]. */
Expand All @@ -36,7 +36,7 @@ abstract class WatermarkAssigner(
inputRel: RelNode,
val rowtimeFieldIndex: Int,
val watermarkExpr: RexNode)
extends SingleRel(cluster, traits, inputRel) {
extends SingleRel(cluster, traits, inputRel) with Hintable {

override def deriveRowType(): RelDataType = {
val inputRowType = inputRel.getRowType
Expand Down Expand Up @@ -74,4 +74,12 @@ abstract class WatermarkAssigner(
/** Copies a new WatermarkAssigner. */
def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode

override def getHints: ImmutableList[RelHint] = {
inputRel match {
case hintable: Hintable =>
hintable.getHints
case _ =>
ImmutableList.of()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,17 @@ void setup() {
util.tableEnv().executeSql(ddl4);
}


@Test
void simpleTranspose() {
util.verifyRelPlan("SELECT a, c FROM SimpleTable");
}

@Test
void simpleTranspose2() {
util.verifyRelPlan("SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL");
}

@Test
void transposeWithReorder() {
util.verifyRelPlan("SELECT b, a FROM SimpleTable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,40 @@ LogicalProject(a=[$0])
]]>
</Resource>
</TestCase>
<TestCase name="simpleTranspose2">
<Resource name="sql">
<![CDATA[SELECT /*+ STATE_TTL('SimpleTable'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IS NOT NULL($4)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{SimpleTable=1d, vt=3d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, SimpleTable]])
+- LogicalAggregate(group=[{0}])
+- LogicalProject(b=[$1])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IS NOT NULL($4)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, SimpleTable]])
+- LogicalAggregate(group=[{0}])
+- LogicalProject(b=[$0])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($1, 5000:INTERVAL SECOND)])
+- LogicalProject(b=[$1], d=[$3])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable]])
]]>
</Resource>
</TestCase>
<TestCase name="transposeWithIncludeComputedRowTime">
<Resource name="sql">
<![CDATA[SELECT a, b, d FROM VirtualTable]]>
Expand Down

0 comments on commit 8c9f0cd

Please sign in to comment.