Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Oct 1, 2024
1 parent 8c9f0cd commit 4e0a0af
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ public RelBuilder windowAggregate(
public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
final RelNode input = build();
final RelNode relNode =
LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
LogicalWatermarkAssigner.create(
cluster, input, Collections.emptyList(), rowtimeFieldIndex, watermarkExpr);
return push(relNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.calcite.tools.RuleSets;
import org.immutables.value.Value;

import java.util.Collections;

/**
* Traverses an event time temporal table join {@link RelNode} tree and update the right child to
* set {@link FlinkLogicalTableSourceScan}'s eventTimeSnapshot property to true which will prevent
Expand Down Expand Up @@ -140,7 +142,11 @@ private RelNode transmitSnapshotRequirement(RelNode node) {
final RelNode newChild = transmitSnapshotRequirement(child);
if (newChild != child) {
return wma.copy(
wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr());
wma.getTraitSet(),
newChild,
Collections.emptyList(),
wma.rowtimeFieldIndex(),
wma.watermarkExpr());
}
return wma;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ private RelNode pushDownTransformedWatermark(
watermark.copy(
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
exchange.getInput(),
Collections.emptyList(),
newRowTimeFieldIndex,
newWatermarkExpr);
final RelNode newChangelogNormalize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
*/
package org.apache.flink.table.planner.plan.nodes.calcite

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util

/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which generates
* [[org.apache.flink.streaming.api.watermark.Watermark]]. This class corresponds to Calcite logical
Expand All @@ -32,16 +33,22 @@ final class LogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) {
extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) {

override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {
new LogicalWatermarkAssigner(cluster, traits, input, hintList, rowtimeFieldIndex, watermarkExpr)
}
}

Expand All @@ -50,9 +57,10 @@ object LogicalWatermarkAssigner {
def create(
cluster: RelOptCluster,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode): LogicalWatermarkAssigner = {
val traits = cluster.traitSetOf(Convention.NONE)
new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr)
new LogicalWatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@
*/
package org.apache.flink.table.planner.plan.nodes.calcite

import com.google.common.collect.ImmutableList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
import org.apache.calcite.rel.hint.{Hintable, RelHint}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rel.hint.{Hintable, RelHint}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName

import java.util
import java.util.ArrayList

import scala.collection.JavaConversions._

/** Relational operator that generates [[org.apache.flink.streaming.api.watermark.Watermark]]. */
abstract class WatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
val hints: util.List[RelHint],
val rowtimeFieldIndex: Int,
val watermarkExpr: RexNode)
extends SingleRel(cluster, traits, inputRel) with Hintable {
extends SingleRel(cluster, traits, inputRel)
with Hintable {

override def deriveRowType(): RelDataType = {
val inputRowType = inputRel.getRowType
Expand Down Expand Up @@ -68,18 +73,21 @@ abstract class WatermarkAssigner(
}

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr)
}

/** Copies a new WatermarkAssigner. */
def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode
def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode

override def getHints: ImmutableList[RelHint] = {
inputRel match {
case hintable: Hintable =>
hintable.getHints
case _ =>
ImmutableList.of()
}
val arrayHints = hints.toArray(new Array[RelHint](0))
ImmutableList.copyOf(arrayHints)
}

def withHints(hintList: util.List[RelHint]): RelNode
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util
import java.util.Collections

/**
* Sub-class of [[WatermarkAssigner]] that is a relational operator which generates
* [[org.apache.flink.streaming.api.watermark.Watermark]].
Expand All @@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr)
extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr)
with FlinkLogicalRel {

/** Copies a new WatermarkAssigner. */
override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {
new FlinkLogicalWatermarkAssigner(
cluster,
traitSet,
input,
hints,
rowtimeFieldIndex,
watermarkExpr)
}
}

class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) {
Expand Down Expand Up @@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner {
watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = {
val cluster = input.getCluster
val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkExpr)
new FlinkLogicalWatermarkAssigner(
cluster,
traitSet,
input,
Collections.emptyList(),
rowtimeFieldIndex,
watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,33 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rex.RexNode

import java.util

import scala.collection.JavaConversions._

/** Stream physical RelNode for [[WatermarkAssigner]]. */
class StreamPhysicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode,
hints: util.List[RelHint],
rowtimeFieldIndex: Int,
watermarkExpr: RexNode)
extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, watermarkExpr)
extends WatermarkAssigner(cluster, traits, inputRel, hints, rowtimeFieldIndex, watermarkExpr)
with StreamPhysicalRel {

override def requireWatermark: Boolean = false

override def copy(
traitSet: RelTraitSet,
input: RelNode,
hints: util.List[RelHint],
rowtime: Int,
watermark: RexNode): RelNode = {
new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark)
}

/** Fully override this method to have a better display name of this RelNode. */
Expand Down Expand Up @@ -75,4 +80,15 @@ class StreamPhysicalWatermarkAssigner(
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}

override def withHints(hintList: util.List[RelHint]): RelNode = {

new StreamPhysicalWatermarkAssigner(
cluster,
traitSet,
input,
hints,
rowtimeFieldIndex,
watermarkExpr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config

import java.util.Collections

/** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */
class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) {

Expand All @@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(
watermarkAssigner.getCluster,
traitSet,
convertInput,
Collections.emptyList(),
watermarkAssigner.rowtimeFieldIndex,
watermarkAssigner.watermarkExpr
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ void setup() {
util.tableEnv().executeSql(ddl4);
}


@Test
void simpleTranspose() {
util.verifyRelPlan("SELECT a, c FROM SimpleTable");
}

@Test
void simpleTranspose2() {
util.verifyRelPlan("SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL");
util.verifyRelPlan(
"SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ LogicalProject(a=[$0])
</TestCase>
<TestCase name="simpleTranspose2">
<Resource name="sql">
<![CDATA[SELECT /*+ STATE_TTL('SimpleTable'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL]]>
<![CDATA[SELECT /*+ STATE_TTL('st'='1d', 'vt' = '3d') */ st.* FROM SimpleTable st LEFT JOIN(SELECT DISTINCT b FROM VirtualTable) vt ON st.b = vt.b WHERE vt.b IS NOT NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalFilter(condition=[IS NOT NULL($4)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{SimpleTable=1d, vt=3d}]]])
+- LogicalJoin(condition=[=($1, $4)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{st=1d, vt=3d}]]])
:- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, SimpleTable]])
+- LogicalAggregate(group=[{0}])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,7 @@ case class StreamTableTestUtil(
sourceRel.getCluster,
sourceRel.getTraitSet,
sourceRel,
Collections.emptyList(),
rowtimeFieldIdx,
expr
)
Expand Down

0 comments on commit 4e0a0af

Please sign in to comment.