Skip to content

Commit

Permalink
[fix)(colocate join) fix wrong use of colocate join (#37361)(#37729) (#…
Browse files Browse the repository at this point in the history
…37937)

cherry-pick from master #37361 #37729
  • Loading branch information
cambyzju authored Jul 17, 2024
1 parent 5f5ec1c commit 3bb5291
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public PhysicalProperties visitPhysicalHashJoin(
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use left most node to schedule fragment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
Optional<PhysicalProperties> updatedForRight = Optional.empty();

if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
// check colocate join with scan
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
List<Slot> output = olapScan.getOutput();
List<Slot> baseOutput = olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
List<ExprId> hashColumns = Lists.newArrayList();
for (Slot slot : output) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Slot slot : output) {
if (((SlotReference) slot).getColumn().get().getNameWithoutMvPrefix()
.equals(column.getName())) {
hashColumns.add(slot.getExprId());
}
}
}
if (hashColumns.size() != hashDistributionInfo.getDistributionColumns().size()) {
for (Slot slot : baseOutput) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Slot slot : baseOutput) {
// If the length of the column in the bucket key changes after DDL, the length cannot be
// determined. As a result, some bucket fields are lost in the query execution plan.
// So here we use the column name to avoid this problem
Expand All @@ -109,8 +109,8 @@ private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Slot> output = olapScan.getOutput();
List<ExprId> hashColumns = Lists.newArrayList();
for (Slot slot : output) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Column column : hashDistributionInfo.getDistributionColumns()) {
for (Slot slot : output) {
// If the length of the column in the bucket key changes after DDL, the length cannot be
// determined. As a result, some bucket fields are lost in the query execution plan.
// So here we use the column name to avoid this problem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
Expand All @@ -44,6 +45,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -214,13 +216,14 @@ public static boolean shouldColocateJoin(AbstractPhysicalJoin<PhysicalPlan, Phys
return false;
}
return couldColocateJoin((DistributionSpecHash) leftDistributionSpec,
(DistributionSpecHash) rightDistributionSpec);
(DistributionSpecHash) rightDistributionSpec, join.getHashJoinConjuncts());
}

/**
* could do colocate join with left and right child distribution spec.
*/
public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {
public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec,
List<Expression> conjuncts) {
if (ConnectContext.get() == null
|| ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
return false;
Expand All @@ -242,12 +245,47 @@ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, Distr
boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions))
&& (leftTablePartitions.size() <= 1);
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
if (noNeedCheckColocateGroup
|| (colocateIndex.isSameGroup(leftTableId, rightTableId)
&& !colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
if (noNeedCheckColocateGroup) {
return true;
}
return false;
if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
|| colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
return false;
}

Set<Integer> equalIndices = new HashSet<>();
for (Expression expr : conjuncts) {
// only simple equal predicate can use colocate join
if (!(expr instanceof EqualPredicate)) {
return false;
}
Expression leftChild = ((EqualPredicate) expr).left();
Expression rightChild = ((EqualPredicate) expr).right();
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof SlotReference)) {
return false;
}

SlotReference leftSlot = (SlotReference) leftChild;
SlotReference rightSlot = (SlotReference) rightChild;
Integer leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
Integer rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
if (leftIndex == null) {
leftIndex = rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
rightIndex = leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
}
if (!Objects.equals(leftIndex, rightIndex)) {
return false;
}
if (leftIndex != null) {
equalIndices.add(leftIndex);
}
}
// on conditions must contain all distributed columns
if (equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) {
return true;
} else {
return false;
}
}

public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_colocate_join_of_column_order") {
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
// distributed by k1,k2
sql """
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` (
`k1` varchar(64) NULL,
`k2` varchar(64) NULL,
`v` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`,`k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "group_column_order"
);
"""
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
// distributed by k2,k1
sql """
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` (
`k1` varchar(64) NULL,
`k2` varchar(64) NULL,
`v` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`,`k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "group_column_order"
);
"""
sql """insert into test_colocate_join_of_column_order_t1 values('k1','k2',11);"""
sql """insert into test_colocate_join_of_column_order_t2 values('k1','k2',11);"""

sql """set enable_nereids_planner=true; """
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;")
notContains "COLOCATE"
}
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2;")
notContains "COLOCATE"
}
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1;")
notContains "COLOCATE"
}
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
notContains "COLOCATE"
}
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
contains "COLOCATE"
}
explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and a.v=b.v;")
contains "COLOCATE"
}

sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """

// multi tables
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
sql """
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_ta` ( `c1` bigint NULL, `c2` bigint NULL)
DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3");
"""
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
sql """
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tb` ( `c1` bigint NULL, `c2` bigint NULL)
DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3");
"""
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
sql """
CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_tc` ( `c1` bigint NULL, `c2` bigint NULL)
DISTRIBUTED BY HASH(c1) PROPERTIES ( "replication_num" = "1", "colocate_with" = "group_column_order3");
"""
sql """insert into test_colocate_join_of_column_order_ta values(1,1);"""
sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""

explain {
sql("""select /*+ set_var(disable_join_reorder=true) */ * from test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as bigint) c2 from test_colocate_join_of_column_order_tb) test_colocate_join_of_column_order_tb on test_colocate_join_of_column_order_ta.c1 = test_colocate_join_of_column_order_tb.c2 join [shuffle] test_colocate_join_of_column_order_tc on test_colocate_join_of_column_order_tb.c2 = test_colocate_join_of_column_order_tc.c1;""");
contains "COLOCATE"
}

sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_ta`; """
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tb`; """
sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_tc`; """
}

0 comments on commit 3bb5291

Please sign in to comment.