diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.java new file mode 100644 index 0000000000000..51c38046c4f07 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank; +import org.apache.flink.table.runtime.operators.rank.ConstantRankRange; +import org.apache.flink.table.runtime.operators.rank.RankRange; +import org.apache.flink.table.runtime.operators.rank.RankType; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.immutables.value.Value; + +import java.math.BigDecimal; +import java.util.List; + +/** + * Planner rule that removes the output column of rank number iff there is an equality condition for + * the rank column. + */ +@Value.Enclosing +public class ConstantRankNumberColumnRemoveRule + extends RelRule< + ConstantRankNumberColumnRemoveRule.ConstantRankNumberColumnRemoveRuleConfig> { + + public static final ConstantRankNumberColumnRemoveRule INSTANCE = + ConstantRankNumberColumnRemoveRule.ConstantRankNumberColumnRemoveRuleConfig.DEFAULT + .toRule(); + + public ConstantRankNumberColumnRemoveRule(ConstantRankNumberColumnRemoveRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalRank rank = call.rel(0); + boolean isRowNumber = rank.rankType() == RankType.ROW_NUMBER; + boolean constantRowNumber = false; + RankRange range = rank.rankRange(); + if (range instanceof ConstantRankRange) { + constantRowNumber = + ((ConstantRankRange) range).getRankStart() + == ((ConstantRankRange) range).getRankEnd(); + } + return isRowNumber && constantRowNumber && rank.outputRankNumber(); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalRank rank = call.rel(0); + long rowNumber = ((ConstantRankRange) rank.rankRange()).getRankStart(); + FlinkLogicalRank newRank = + new FlinkLogicalRank( + rank.getCluster(), + rank.getTraitSet(), + rank.getInput(), + rank.partitionKey(), + rank.orderKey(), + rank.rankType(), + rank.rankRange(), + rank.rankNumberType(), + false); + + RexBuilder rexBuilder = rank.getCluster().getRexBuilder(); + RexProgramBuilder programBuilder = new RexProgramBuilder(newRank.getRowType(), rexBuilder); + int fieldCount = rank.getRowType().getFieldCount(); + List fieldNames = rank.getRowType().getFieldNames(); + for (int i = 0; i < fieldCount; i++) { + if (i < fieldCount - 1) { + programBuilder.addProject(i, i, fieldNames.get(i)); + } else { + RexLiteral rowNumberLiteral = + rexBuilder.makeBigintLiteral(BigDecimal.valueOf(rowNumber)); + programBuilder.addProject(i, rowNumberLiteral, fieldNames.get(i)); + } + } + + RexProgram rexProgram = programBuilder.getProgram(); + RelNode calc = FlinkLogicalCalc.create(newRank, rexProgram); + call.transformTo(calc); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface ConstantRankNumberColumnRemoveRuleConfig extends RelRule.Config { + ConstantRankNumberColumnRemoveRule.ConstantRankNumberColumnRemoveRuleConfig DEFAULT = + ImmutableConstantRankNumberColumnRemoveRule.ConstantRankNumberColumnRemoveRuleConfig + .builder() + .operandSupplier(b0 -> b0.operand(FlinkLogicalRank.class).anyInputs()) + .description("ConstantRankNumberColumnRemoveRule") + .build(); + + @Override + default ConstantRankNumberColumnRemoveRule toRule() { + return new ConstantRankNumberColumnRemoveRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.scala deleted file mode 100644 index a1f68e3d6c8b5..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ConstantRankNumberColumnRemoveRule.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalRank} -import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rex.RexProgramBuilder - -import java.math.{BigDecimal => JBigDecimal} - -/** - * Planner rule that removes the output column of rank number iff there is a equality condition for - * the rank column. - */ -class ConstantRankNumberColumnRemoveRule - extends RelOptRule( - operand(classOf[FlinkLogicalRank], any()), - "ConstantRankNumberColumnRemoveRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val rank: FlinkLogicalRank = call.rel(0) - val isRowNumber = rank.rankType == RankType.ROW_NUMBER - val constantRowNumber = rank.rankRange match { - case range: ConstantRankRange => range.getRankStart == range.getRankEnd - case _ => false - } - isRowNumber && constantRowNumber && rank.outputRankNumber - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val rank: FlinkLogicalRank = call.rel(0) - val rowNumber = rank.rankRange.asInstanceOf[ConstantRankRange].getRankStart - val newRank = new FlinkLogicalRank( - rank.getCluster, - rank.getTraitSet, - rank.getInput, - rank.partitionKey, - rank.orderKey, - rank.rankType, - rank.rankRange, - rank.rankNumberType, - outputRankNumber = false) - - val rexBuilder = rank.getCluster.getRexBuilder - val programBuilder = new RexProgramBuilder(newRank.getRowType, rexBuilder) - val fieldCount = rank.getRowType.getFieldCount - val fieldNames = rank.getRowType.getFieldNames - for (i <- 0 until fieldCount) { - if (i < fieldCount - 1) { - programBuilder.addProject(i, i, fieldNames.get(i)) - } else { - val rowNumberLiteral = rexBuilder.makeBigintLiteral(new JBigDecimal(rowNumber)) - programBuilder.addProject(i, rowNumberLiteral, fieldNames.get(i)) - } - } - - val rexProgram = programBuilder.getProgram - val calc = FlinkLogicalCalc.create(newRank, rexProgram) - call.transformTo(calc) - } -} - -object ConstantRankNumberColumnRemoveRule { - val INSTANCE = new ConstantRankNumberColumnRemoveRule -}