From 8c9f0cd21bccb026df9c3ecbd686697475180aa4 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Tue, 1 Oct 2024 13:19:52 +0200 Subject: [PATCH] fix --- .../flink/table/planner/hint/FlinkHints.java | 4 ++- .../calcite/LogicalWatermarkAssigner.scala | 2 ++ .../nodes/calcite/WatermarkAssigner.scala | 14 ++++++-- ...ectWatermarkAssignerTransposeRuleTest.java | 6 ++++ ...jectWatermarkAssignerTransposeRuleTest.xml | 34 +++++++++++++++++++ 5 files changed, 56 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java index 18f13d7547247c..bcbc1dca533815 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.hint; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner; import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule; import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; @@ -119,7 +120,8 @@ public static Optional getTableAlias(RelNode node) { public static boolean canTransposeToTableScan(RelNode node) { return node instanceof LogicalProject // computed column on table || node instanceof LogicalFilter - || node instanceof LogicalSnapshot; + || node instanceof LogicalSnapshot + || node instanceof WatermarkAssigner; } /** Returns the qualified name of a table scan, otherwise returns empty. */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala index 89a9cb7e3c4928..74006a6fc35e25 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala @@ -17,8 +17,10 @@ */ package org.apache.flink.table.planner.plan.nodes.calcite +import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala index e4baf13567a2ef..6fe7d31a3e3e2f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala @@ -17,16 +17,16 @@ */ package org.apache.flink.table.planner.plan.nodes.calcite +import com.google.common.collect.ImmutableList import org.apache.flink.table.planner.calcite.FlinkTypeFactory - import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} +import org.apache.calcite.rel.hint.{Hintable, RelHint} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName import java.util - import scala.collection.JavaConversions._ /** Relational operator that generates [[org.apache.flink.streaming.api.watermark.Watermark]]. */ @@ -36,7 +36,7 @@ abstract class WatermarkAssigner( inputRel: RelNode, val rowtimeFieldIndex: Int, val watermarkExpr: RexNode) - extends SingleRel(cluster, traits, inputRel) { + extends SingleRel(cluster, traits, inputRel) with Hintable { override def deriveRowType(): RelDataType = { val inputRowType = inputRel.getRowType @@ -74,4 +74,12 @@ abstract class WatermarkAssigner( /** Copies a new WatermarkAssigner. */ def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode + override def getHints: ImmutableList[RelHint] = { + inputRel match { + case hintable: Hintable => + hintable.getHints + case _ => + ImmutableList.of() + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java index d8968d95ed4ed9..5e73b3205bafcf 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java @@ -105,11 +105,17 @@ void setup() { util.tableEnv().executeSql(ddl4); } + @Test void simpleTranspose() { util.verifyRelPlan("SELECT a, c FROM SimpleTable"); } + @Test + void simpleTranspose2() { + util.verifyRelPlan("SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL"); + } + @Test void transposeWithReorder() { util.verifyRelPlan("SELECT b, a FROM SimpleTable"); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml index 261f438fb83f18..bf91fe3884142c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml @@ -183,6 +183,40 @@ LogicalProject(a=[$0]) ]]> + + + + + + + + + + +