diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 41cdf78cf896ae..d7695850d2a5ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -278,7 +278,7 @@ public PhysicalProperties visitPhysicalHashJoin( case RIGHT_SEMI_JOIN: case RIGHT_ANTI_JOIN: case RIGHT_OUTER_JOIN: - if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { + if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) { return new PhysicalProperties(rightHashSpec); } else { // retain left shuffle type, since coordinator use left most node to schedule fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 0fa2d87f92f135..d17d7485cce930 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -249,7 +249,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin updatedForLeft = Optional.empty(); Optional updatedForRight = Optional.empty(); - if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { + if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) { // check colocate join with scan return true; } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java index 43436355ae17aa..472d2e169dba8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java @@ -83,8 +83,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) { List output = olapScan.getOutput(); List baseOutput = olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId()); List hashColumns = Lists.newArrayList(); - for (Slot slot : output) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : output) { if (((SlotReference) slot).getColumn().get().getNameWithoutMvPrefix() .equals(column.getName())) { hashColumns.add(slot.getExprId()); @@ -92,8 +92,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) { } } if (hashColumns.size() != hashDistributionInfo.getDistributionColumns().size()) { - for (Slot slot : baseOutput) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : baseOutput) { // If the length of the column in the bucket key changes after DDL, the length cannot be // determined. As a result, some bucket fields are lost in the query execution plan. // So here we use the column name to avoid this problem @@ -109,8 +109,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List output = olapScan.getOutput(); List hashColumns = Lists.newArrayList(); - for (Slot slot : output) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : output) { // If the length of the column in the bucket key changes after DDL, the length cannot be // determined. As a result, some bucket fields are lost in the query execution plan. // So here we use the column name to avoid this problem diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index 25f84c096c8e07..9a5e56b41af48f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; @@ -44,6 +45,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -214,13 +216,14 @@ public static boolean shouldColocateJoin(AbstractPhysicalJoin conjuncts) { if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { return false; @@ -242,12 +245,47 @@ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, Distr boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions)) && (leftTablePartitions.size() <= 1); ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); - if (noNeedCheckColocateGroup - || (colocateIndex.isSameGroup(leftTableId, rightTableId) - && !colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) { + if (noNeedCheckColocateGroup) { return true; } - return false; + if (!colocateIndex.isSameGroup(leftTableId, rightTableId) + || colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) { + return false; + } + + Set equalIndices = new HashSet<>(); + for (Expression expr : conjuncts) { + // only simple equal predicate can use colocate join + if (!(expr instanceof EqualPredicate)) { + return false; + } + Expression leftChild = ((EqualPredicate) expr).left(); + Expression rightChild = ((EqualPredicate) expr).right(); + if (!(leftChild instanceof SlotReference) || !(rightChild instanceof SlotReference)) { + return false; + } + + SlotReference leftSlot = (SlotReference) leftChild; + SlotReference rightSlot = (SlotReference) rightChild; + Integer leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId()); + Integer rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId()); + if (leftIndex == null) { + leftIndex = rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId()); + rightIndex = leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId()); + } + if (!Objects.equals(leftIndex, rightIndex)) { + return false; + } + if (leftIndex != null) { + equalIndices.add(leftIndex); + } + } + // on conditions must contain all distributed columns + if (equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) { + return true; + } else { + return false; + } } public static Set getJoinOutputExprIdSet(Plan left, Plan right) { diff --git a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy new file mode 100644 index 00000000000000..efef99695065aa --- /dev/null +++ b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_colocate_join_of_column_order") { + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """ + // distributed by k1,k2 + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` ( + `k1` varchar(64) NULL, + `k2` varchar(64) NULL, + `v` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`,`k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "colocate_with" = "group_column_order" + ); + """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """ + // distributed by k2,k1 + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` ( + `k1` varchar(64) NULL, + `k2` varchar(64) NULL, + `v` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`,`k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "colocate_with" = "group_column_order" + ); + """ + sql """insert into test_colocate_join_of_column_order_t1 values('k1','k2',11);""" + sql """insert into test_colocate_join_of_column_order_t2 values('k1','k2',11);""" + + sql """set enable_nereids_planner=true; """ + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;") + contains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and a.v=b.v;") + contains "COLOCATE" + } + + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """ + + // multi tables + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """ + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_ta` ( `c1` bigint NULL, `c2` bigint NULL) + DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3"); + """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """ + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tb` ( `c1` bigint NULL, `c2` bigint NULL) + DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3"); + """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """ + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tc` ( `c1` bigint NULL, `c2` bigint NULL) + DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3"); + """ + sql """insert into test_colocate_join_of_column_order_ta values(1,1);""" + sql """insert into test_colocate_join_of_column_order_tb values(1,1);""" + sql """insert into test_colocate_join_of_column_order_tc values(1,1);""" + + explain { + sql("""select /*+ set_var(disable_join_reorder=true) */ * from test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as bigint) c2 from test_colocate_join_of_column_order_tb) test_colocate_join_of_column_order_tb on test_colocate_join_of_column_order_ta.c1 = test_colocate_join_of_column_order_tb.c2 join [shuffle] test_colocate_join_of_column_order_tc on test_colocate_join_of_column_order_tb.c2 = test_colocate_join_of_column_order_tc.c1;"""); + contains "COLOCATE" + } + + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """ +}