From 988f61d70f4584e5612e710c6d5181c1c311cedc Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sun, 3 Mar 2024 17:42:33 +0100 Subject: [PATCH] [FLINK-34596][table] Migrate RemoveRedundantLocalHashAggRule to java --- .../RemoveRedundantLocalHashAggRule.java | 104 ++++++++++++++++++ .../RemoveRedundantLocalHashAggRule.scala | 63 ----------- 2 files changed, 104 insertions(+), 63 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java new file mode 100644 index 00000000000000..e1cbdd8da13ee8 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashAggregate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.immutables.value.Value; + +/** + * There maybe exist a subTree like localHashAggregate -> globalHashAggregate which the middle + * shuffle is removed. The rule could remove redundant localHashAggregate node. + */ +@Value.Enclosing +public class RemoveRedundantLocalHashAggRule + extends RelRule { + + public static final RemoveRedundantLocalHashAggRule INSTANCE = + RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig.DEFAULT.toRule(); + + protected RemoveRedundantLocalHashAggRule(RemoveRedundantLocalHashAggRuleConfig config) { + super(config); + } + + public void onMatch(RelOptRuleCall call) { + BatchPhysicalHashAggregate globalAgg = call.rel(0); + BatchPhysicalLocalHashAggregate localAgg = call.rel(1); + RelNode inputOfLocalAgg = localAgg.getInput(); + BatchPhysicalHashAggregate newGlobalAgg = + new BatchPhysicalHashAggregate( + globalAgg.getCluster(), + globalAgg.getTraitSet(), + inputOfLocalAgg, + globalAgg.getRowType(), + inputOfLocalAgg.getRowType(), + inputOfLocalAgg.getRowType(), + localAgg.grouping(), + localAgg.auxGrouping(), + // Use the localAgg agg calls because the global agg call filters was + // removed, + // see BatchPhysicalHashAggRule for details. + localAgg.getAggCallToAggFunction(), + false); + call.transformTo(newGlobalAgg); + } + + /** Configuration for {@link RemoveRedundantLocalHashAggRule}. */ + @Value.Immutable(singleton = false) + public interface RemoveRedundantLocalHashAggRuleConfig extends RelRule.Config { + RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig DEFAULT = + ImmutableRemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig + .builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(BatchPhysicalHashAggregate.class) + .oneInput( + b1 -> + b1.operand( + BatchPhysicalLocalHashAggregate + .class) + .oneInput( + b2 -> + b2.operand( + RelNode + .class) + .oneInput( + b3 -> + b3.operand( + FlinkConventions + .BATCH_PHYSICAL() + .getInterface()) + .noInputs())))) + .withDescription("RemoveRedundantLocalHashAggRule") + .as( + RemoveRedundantLocalHashAggRule + .RemoveRedundantLocalHashAggRuleConfig.class); + + @Override + default RemoveRedundantLocalHashAggRule toRule() { + return new RemoveRedundantLocalHashAggRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala deleted file mode 100644 index 7378a71b29ba03..00000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.batch - -import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalHashAggregate, BatchPhysicalLocalHashAggregate} - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.rel.RelNode - -/** - * There maybe exist a subTree like localHashAggregate -> globalHashAggregate which the middle - * shuffle is removed. The rule could remove redundant localHashAggregate node. - */ -class RemoveRedundantLocalHashAggRule - extends RelOptRule( - operand( - classOf[BatchPhysicalHashAggregate], - operand( - classOf[BatchPhysicalLocalHashAggregate], - operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))), - "RemoveRedundantLocalHashAggRule") { - - override def onMatch(call: RelOptRuleCall): Unit = { - val globalAgg: BatchPhysicalHashAggregate = call.rel(0) - val localAgg: BatchPhysicalLocalHashAggregate = call.rel(1) - val inputOfLocalAgg = localAgg.getInput - val newGlobalAgg = new BatchPhysicalHashAggregate( - globalAgg.getCluster, - globalAgg.getTraitSet, - inputOfLocalAgg, - globalAgg.getRowType, - inputOfLocalAgg.getRowType, - inputOfLocalAgg.getRowType, - localAgg.grouping, - localAgg.auxGrouping, - // Use the localAgg agg calls because the global agg call filters was removed, - // see BatchPhysicalHashAggRule for details. - localAgg.getAggCallToAggFunction, - isMerge = false) - call.transformTo(newGlobalAgg) - } -} - -object RemoveRedundantLocalHashAggRule { - val INSTANCE = new RemoveRedundantLocalHashAggRule -}