Skip to content

Commit

Permalink
[fix](planner) separate table's isPartitioned() method (apache#28163)
Browse files Browse the repository at this point in the history
This PR apache#27515 change the logic if Table's `isPartitioned()` method.
But this method has 2 usages:

1. To check whether a table is range or list partitioned, for some DML operation such as Alter, Export.

    For this case, it should return true if the table is range or list partitioned. even if it has only
    one partition and one buckets.

2. To check whether the data is distributed (either by partitions or by buckets), for query planner.

    For this case, it should return true if table has more than one bucket. Even if this table is not
    range or list partitioned, if it has more than one bucket, it should return true.

So we should separate this method into 2, for different usages.
Otherwise, it may cause some unreasonable plan shape
  • Loading branch information
morningman authored and 胥剑旭 committed Dec 14, 2023
1 parent ecf6d6e commit fecdbd5
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ public int getAsInt() {
BuildIndexClause buildIndexClause = (BuildIndexClause) alterClause;
IndexDef indexDef = buildIndexClause.getIndexDef();
Index index = buildIndexClause.getIndex();
if (!olapTable.isPartitioned()) {
if (!olapTable.isPartitionedTable()) {
List<String> specifiedPartitions = indexDef.getPartitionNames();
if (!specifiedPartitions.isEmpty()) {
throw new DdlException("table " + olapTable.getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void checkPartitions(Env env) throws AnalysisException {
table.readLock();
try {
// check table
if (!table.isPartitioned()) {
if (!table.isPartitionedTable()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
Table.TableType tblType = table.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,15 +878,15 @@ private void analyzePlanHints() throws AnalysisException {
}
for (String hint : planHints) {
if (SHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && !isRepartition) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint);
}
isRepartition = Boolean.TRUE;
} else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitioned()) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && isRepartition) {
Expand Down
15 changes: 14 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,23 @@ public Status getIntersectPartNamesWith(OlapTable anotherTbl, List<String> inter
}

@Override
public boolean isPartitioned() {
public boolean isPartitionedTable() {
return !PartitionType.UNPARTITIONED.equals(partitionInfo.getType());
}

// Return true if data is distributed by one more partitions or buckets.
@Override
public boolean isPartitionDistributed() {
int numSegs = 0;
for (Partition part : getPartitions()) {
numSegs += part.getDistributionInfo().getBucketNum();
if (numSegs > 1) {
return true;
}
}
return false;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Expand Down
8 changes: 7 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,14 @@ public void readFields(DataInput in) throws IOException {
}

// return if this table is partitioned.
// For OlapTable, return true only if its partition type is RANGE or HASH
public boolean isPartitionedTable() {
return false;
}

// return if this table is partitioned, for planner.
// For OlapTable ture when is partitioned, or distributed by hash when no partition
public boolean isPartitioned() {
public boolean isPartitionDistributed() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private void checkPartitions(ConnectContext ctx, TableName tblName) throws Analy
+ tblType + " type, do not support EXPORT.");
}
// check table
if (!table.isPartitioned()) {
if (!table.isPartitionedTable()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
for (String partitionName : this.partitionsNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ isPartialUpdate, isFromNativeInsertStmt, groupExpression, getLogicalProperties()
* get output physical properties
*/
public PhysicalProperties getRequirePhysicalProperties() {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo();
if (distributionInfo instanceof HashDistributionInfo) {
HashDistributionInfo hashDistributionInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ PlanFragment createInsertFragment(
boolean needRepartition = false;
boolean needMerge = false;
if (isFragmentPartitioned(inputFragment)) {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
if (stmt.getDataPartition().getType() == TPartitionType.RANDOM) {
return inputFragment;
}
Expand All @@ -138,7 +138,7 @@ PlanFragment createInsertFragment(
needMerge = true;
}
} else {
if (targetTable.isPartitioned()) {
if (targetTable.isPartitionDistributed()) {
if (isRepart != null && isRepart) {
needRepartition = true;
} else {
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
Expand Down Expand Up @@ -682,4 +683,36 @@ public void testEliminatingSortNode() throws Exception {
Assertions.assertFalse(plan1.contains("order by:"));
}
}

@Test
public void testInsertPlan() throws Exception {
FeConstants.runningUnitTest = true;
// 1. should not contains exchange node in old planner
boolean v = connectContext.getSessionVariable().isEnableNereidsPlanner();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(false);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false));
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}

// 1. should not contains exchange node in new planner
v = connectContext.getSessionVariable().isEnableNereidsPlanner();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false));
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}
}
}

0 comments on commit fecdbd5

Please sign in to comment.