From c0ab96ce83499849e8bd7e287fd2316e94f33cec Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sun, 3 Mar 2024 16:25:29 +0100 Subject: [PATCH] [FLINK-34598][table] Migrate RemoveRedundantLocalRankRule to java --- .../batch/RemoveRedundantLocalRankRule.java | 97 +++++++++++++++++++ .../batch/RemoveRedundantLocalRankRule.scala | 62 ------------ 2 files changed, 97 insertions(+), 62 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.java new file mode 100644 index 0000000000000..2d1aefe60dd89 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.immutables.value.Value; + +import java.util.Collections; + +/** + * Planner rule that matches a global {@link BatchPhysicalRank} on a local {@link + * BatchPhysicalRank}, and merge them into a global {@link BatchPhysicalRank}. + */ +@Value.Enclosing +public class RemoveRedundantLocalRankRule + extends RelRule { + + public static final RemoveRedundantLocalRankRule INSTANCE = + RemoveRedundantLocalRankRuleConfig.DEFAULT.toRule(); + + public RemoveRedundantLocalRankRule(RemoveRedundantLocalRankRuleConfig config) { + super(config); + } + + public boolean matches(RelOptRuleCall call) { + BatchPhysicalRank globalRank = call.rel(0); + BatchPhysicalRank localRank = call.rel(1); + return globalRank.isGlobal() + && !localRank.isGlobal() + && globalRank.rankType() == localRank.rankType() + && globalRank.partitionKey() == localRank.partitionKey() + && globalRank.orderKey() == globalRank.orderKey() + && globalRank.rankEnd() == localRank.rankEnd(); + } + + public void onMatch(RelOptRuleCall call) { + BatchPhysicalRank globalRank = call.rel(0); + RelNode inputOfLocalRank = call.rel(2); + RelNode newGlobalRank = + globalRank.copy( + globalRank.getTraitSet(), Collections.singletonList(inputOfLocalRank)); + call.transformTo(newGlobalRank); + } + + /** Configuration for {@link RemoveRedundantLocalRankRule}. */ + @Value.Immutable(singleton = false) + public interface RemoveRedundantLocalRankRuleConfig extends RelRule.Config { + RemoveRedundantLocalRankRule.RemoveRedundantLocalRankRuleConfig DEFAULT = + ImmutableRemoveRedundantLocalRankRule.RemoveRedundantLocalRankRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(BatchPhysicalRank.class) + .oneInput( + b1 -> + b1.operand(BatchPhysicalRank.class) + .oneInput( + b2 -> + b2.operand( + RelNode + .class) + .oneInput( + b3 -> + b3.operand( + FlinkConventions + .BATCH_PHYSICAL() + .getInterface()) + .noInputs())))) + .as(RemoveRedundantLocalRankRuleConfig.class); + + @Override + default RemoveRedundantLocalRankRule toRule() { + return new RemoveRedundantLocalRankRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala deleted file mode 100644 index ea0cd193ed710..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalRankRule.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.batch - -import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRank - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.rel.RelNode - -import scala.collection.JavaConversions._ - -/** - * Planner rule that matches a global [[BatchPhysicalRank]] on a local [[BatchPhysicalRank]], and - * merge them into a global [[BatchPhysicalRank]]. - */ -class RemoveRedundantLocalRankRule - extends RelOptRule( - operand( - classOf[BatchPhysicalRank], - operand( - classOf[BatchPhysicalRank], - operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))), - "RemoveRedundantLocalRankRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val globalRank: BatchPhysicalRank = call.rel(0) - val localRank: BatchPhysicalRank = call.rel(1) - globalRank.isGlobal && !localRank.isGlobal && - globalRank.rankType == localRank.rankType && - globalRank.partitionKey == localRank.partitionKey && - globalRank.orderKey == globalRank.orderKey && - globalRank.rankEnd == localRank.rankEnd - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val globalRank: BatchPhysicalRank = call.rel(0) - val inputOfLocalRank: RelNode = call.rel(2) - val newGlobalRank = globalRank.copy(globalRank.getTraitSet, List(inputOfLocalRank)) - call.transformTo(newGlobalRank) - } -} - -object RemoveRedundantLocalRankRule { - val INSTANCE: RelOptRule = new RemoveRedundantLocalRankRule -}