diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java index 71129e26d658e3..60a0c897eeeac7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java @@ -253,7 +253,8 @@ public RelBuilder windowAggregate( public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) { final RelNode input = build(); final RelNode relNode = - LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr); + LogicalWatermarkAssigner.create( + cluster, input, Collections.emptyList(), rowtimeFieldIndex, watermarkExpr); return push(relNode); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java index 89feca8a2b6826..bae001f0dc52c2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java @@ -41,6 +41,8 @@ import org.apache.calcite.tools.RuleSets; import org.immutables.value.Value; +import java.util.Collections; + /** * Traverses an event time temporal table join {@link RelNode} tree and update the right child to * set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent @@ -140,7 +142,11 @@ private RelNode transmitSnapshotRequirement(RelNode node) { final RelNode newChild = transmitSnapshotRequirement(child); if (newChild != child) { return wma.copy( - wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr()); + wma.getTraitSet(), + newChild, + Collections.emptyList(), + wma.rowtimeFieldIndex(), + wma.watermarkExpr()); } return wma; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java index 15d8f485994230..51963d79eb3acb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java @@ -288,6 +288,7 @@ private RelNode pushDownTransformedWatermark( watermark.copy( watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), exchange.getInput(), + Collections.emptyList(), newRowTimeFieldIndex, newWatermarkExpr); final RelNode newChangelogNormalize = diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala index 74006a6fc35e25..c9d3443f526cfe 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala @@ -17,12 +17,13 @@ */ package org.apache.flink.table.planner.plan.nodes.calcite -import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util + /** * Sub-class of [[WatermarkAssigner]] that is a relational operator which generates * [[org.apache.flink.streaming.api.watermark.Watermark]]. This class corresponds to Calcite logical @@ -32,16 +33,22 @@ final class LogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) { + extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) { override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) + } + + override def withHints(hintList: util.List[RelHint]): RelNode = { + new LogicalWatermarkAssigner(cluster, traits, input, hintList, rowtimeFieldIndex, watermarkExpr) } } @@ -50,9 +57,10 @@ object LogicalWatermarkAssigner { def create( cluster: RelOptCluster, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode): LogicalWatermarkAssigner = { val traits = cluster.traitSetOf(Convention.NONE) - new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) + new LogicalWatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala index 6fe7d31a3e3e2f..58137c195ad31e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala @@ -17,16 +17,19 @@ */ package org.apache.flink.table.planner.plan.nodes.calcite -import com.google.common.collect.ImmutableList import org.apache.flink.table.planner.calcite.FlinkTypeFactory + +import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} -import org.apache.calcite.rel.hint.{Hintable, RelHint} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.hint.{Hintable, RelHint} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName import java.util +import java.util.ArrayList + import scala.collection.JavaConversions._ /** Relational operator that generates [[org.apache.flink.streaming.api.watermark.Watermark]]. */ @@ -34,9 +37,11 @@ abstract class WatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, inputRel: RelNode, + val hints: util.List[RelHint], val rowtimeFieldIndex: Int, val watermarkExpr: RexNode) - extends SingleRel(cluster, traits, inputRel) with Hintable { + extends SingleRel(cluster, traits, inputRel) + with Hintable { override def deriveRowType(): RelDataType = { val inputRowType = inputRel.getRowType @@ -68,18 +73,21 @@ abstract class WatermarkAssigner( } override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr) + copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr) } /** Copies a new WatermarkAssigner. */ - def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode + def copy( + traitSet: RelTraitSet, + input: RelNode, + hints: util.List[RelHint], + rowtime: Int, + watermark: RexNode): RelNode override def getHints: ImmutableList[RelHint] = { - inputRel match { - case hintable: Hintable => - hintable.getHints - case _ => - ImmutableList.of() - } + val arrayHints = hints.toArray(new Array[RelHint](0)) + ImmutableList.copyOf(arrayHints) } + + def withHints(hintList: util.List[RelHint]): RelNode } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala index 795304a9f38bf4..7468c83142fd48 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala @@ -24,8 +24,12 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util +import java.util.Collections + /** * Sub-class of [[WatermarkAssigner]] that is a relational operator which generates * [[org.apache.flink.streaming.api.watermark.Watermark]]. @@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) + extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) with FlinkLogicalRel { /** Copies a new WatermarkAssigner. */ override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) } + override def withHints(hintList: util.List[RelHint]): RelNode = { + new FlinkLogicalWatermarkAssigner( + cluster, + traitSet, + input, + hints, + rowtimeFieldIndex, + watermarkExpr) + } } class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) { @@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner { watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = { val cluster = input.getCluster val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() - new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkExpr) + new FlinkLogicalWatermarkAssigner( + cluster, + traitSet, + input, + Collections.emptyList(), + rowtimeFieldIndex, + watermarkExpr) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala index 69973a8e6a3663..bc509cddc62110 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala @@ -26,8 +26,11 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util + import scala.collection.JavaConversions._ /** Stream physical RelNode for [[WatermarkAssigner]]. */ @@ -35,9 +38,10 @@ class StreamPhysicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, inputRel: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, watermarkExpr) + extends WatermarkAssigner(cluster, traits, inputRel, hints, rowtimeFieldIndex, watermarkExpr) with StreamPhysicalRel { override def requireWatermark: Boolean = false @@ -45,9 +49,10 @@ class StreamPhysicalWatermarkAssigner( override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) } /** Fully override this method to have a better display name of this RelNode. */ @@ -75,4 +80,15 @@ class StreamPhysicalWatermarkAssigner( FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } + + override def withHints(hintList: util.List[RelHint]): RelNode = { + + new StreamPhysicalWatermarkAssigner( + cluster, + traitSet, + input, + hints, + rowtimeFieldIndex, + watermarkExpr) + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala index c6b62b0d09e7d6..ba2e10f99d4048 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config +import java.util.Collections + /** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) { @@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule( watermarkAssigner.getCluster, traitSet, convertInput, + Collections.emptyList(), watermarkAssigner.rowtimeFieldIndex, watermarkAssigner.watermarkExpr ) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java index 5e73b3205bafcf..111a24e0fdce7f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.java @@ -105,7 +105,6 @@ void setup() { util.tableEnv().executeSql(ddl4); } - @Test void simpleTranspose() { util.verifyRelPlan("SELECT a, c FROM SimpleTable"); @@ -113,7 +112,8 @@ void simpleTranspose() { @Test void simpleTranspose2() { - util.verifyRelPlan("SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL"); + util.verifyRelPlan( + "SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL"); } @Test diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml index bf91fe3884142c..81d75cbd8a26d8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml @@ -185,13 +185,13 @@ LogicalProject(a=[$0]) - +