Skip to content

Commit

Permalink
[opt](Nereids) support defer materailized with project
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Feb 11, 2025
1 parent cbcebb1 commit 9c0511a
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -3071,8 +3072,10 @@ public AutoIncrementGenerator getAutoIncrementGenerator() {
* @param selectedIndexId the index want to scan
*/
public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
boolean useStoreRow = this.storeRowColumn()
&& CollectionUtils.isEmpty(getTableProperty().getCopiedRowStoreColumns());
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(this.storeRowColumn());
fetchOption.setFetchRowStore(useStoreRow);
fetchOption.setUseTwoPhaseFetch(true);

// get backend by tag
Expand All @@ -3096,7 +3099,7 @@ public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {

fetchOption.setNodesInfo(nodesInfo);

if (!this.storeRowColumn()) {
if (!useStoreRow) {
List<TColumn> columnsDesc = Lists.newArrayList();
getColumnDesc(selectedIndexId, columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,13 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
List<Expr> allProjectionExprs = Lists.newArrayList();
List<Slot> slots = null;
// TODO FE/BE do not support multi-layer-project on MultiDataSink now.
if (project.hasMultiLayerProjection() && !(inputFragment instanceof MultiCastPlanFragment)) {
if (project.hasMultiLayerProjection()
&& !(inputFragment instanceof MultiCastPlanFragment)
// TODO support for two phase read with project, remove it after refactor
&& !(project.child() instanceof PhysicalDeferMaterializeTopN)
&& !(project.child() instanceof PhysicalDeferMaterializeOlapScan
|| (project.child() instanceof PhysicalFilter
&& ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
int layerCount = project.getMultiLayerProjects().size();
for (int i = 0; i < layerCount; i++) {
List<NamedExpression> layer = project.getMultiLayerProjects().get(i);
Expand Down Expand Up @@ -2008,37 +2014,28 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}

if (inputPlanNode instanceof ScanNode) {
TupleDescriptor projectionTuple = null;
// slotIdsByOrder is used to ensure the ScanNode's output order is same with current Project
// if we change the output order in translate project, the upper node will receive wrong order
// tuple, since they get the order from project.getOutput() not scan.getOutput()./
projectionTuple = generateTupleDesc(slots,
((ScanNode) inputPlanNode).getTupleDesc().getTable(), context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(projectionTuple);

// TODO: this is a temporary scheme to support two phase read when has project.
// we need to refactor all topn opt into rbo stage.
// TODO support for two phase read with project, remove this if after refactor
if (!(project.child() instanceof PhysicalDeferMaterializeOlapScan
|| (project.child() instanceof PhysicalFilter
&& ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
TupleDescriptor projectionTuple = generateTupleDesc(slots,
((ScanNode) inputPlanNode).getTupleDesc().getTable(), context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(projectionTuple);
}
if (inputPlanNode instanceof OlapScanNode) {
ArrayList<SlotDescriptor> olapScanSlots =
context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
SlotDescriptor lastSlot = olapScanSlots.get(olapScanSlots.size() - 1);
if (lastSlot.getColumn() != null
&& lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
injectRowIdColumnSlot(projectionTuple);
SlotRef slotRef = new SlotRef(lastSlot);
inputPlanNode.getProjectList().add(slotRef);
requiredByProjectSlotIdSet.add(lastSlot.getId());
requiredSlotIdSet.add(lastSlot.getId());
}
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, context);
} else {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
if (project.child() instanceof PhysicalDeferMaterializeTopN) {
inputFragment.setOutputExprs(projectionExprs);
} else {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
}
}
return inputFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -44,6 +45,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* rewrite simple top n query to defer materialize slot not use for sort or predicate
Expand All @@ -54,34 +56,90 @@ public class DeferMaterializeTopNResult implements RewriteRuleFactory {
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalOlapScan()))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child()))
logicalResultSink(
logicalTopN(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> deferMaterialize(r, r.child(),
Optional.empty(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan())))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(filter), filter.child());
})
logicalResultSink(
logicalTopN(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.empty(),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalProject(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> {
LogicalProject<LogicalFilter<LogicalOlapScan>> project = r.child().child();
LogicalFilter<LogicalOlapScan> filter = project.child();
return deferMaterialize(r, r.child(), Optional.of(project),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalProject(logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
)).then(r -> {
LogicalProject<?> upperProject = r.child();
LogicalProject<LogicalFilter<LogicalOlapScan>> bottomProject = r.child().child().child();
List<NamedExpression> projections = upperProject.mergeProjections(bottomProject);
LogicalProject<?> project = upperProject.withProjects(projections);
LogicalFilter<LogicalOlapScan> filter = bottomProject.child();
return deferMaterialize(r, r.child().child(), Optional.of(project),
Optional.of(filter), filter.child());
})
)
);
}

private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSink,
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalFilter<? extends Plan>> logicalFilter,
LogicalOlapScan logicalOlapScan) {
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalProject<? extends Plan>> logicalProject,
Optional<LogicalFilter<? extends Plan>> logicalFilter, LogicalOlapScan logicalOlapScan) {
Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column");
SlotReference columnId = SlotReference.fromColumn(
logicalOlapScan.getTable(), rowId, logicalOlapScan.getQualifier());
Expand All @@ -94,11 +152,24 @@ private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSin
.map(NamedExpression::getExprId)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
if (logicalProject.isPresent()) {
deferredMaterializedExprIds.retainAll(logicalProject.get().getInputSlots().stream()
.map(NamedExpression::getExprId).collect(Collectors.toSet()));
}
LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan(
logicalOlapScan, deferredMaterializedExprIds, columnId);
Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan);
if (logicalProject.isPresent()) {
ImmutableList.Builder<NamedExpression> requiredSlots = ImmutableList.builder();
logicalProject.get().getInputSlots().forEach(requiredSlots::add);
requiredSlots.add(columnId);
root = new LogicalProject<>(requiredSlots.build(), root);
}
root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) logicalTopN.withChildren(root),
deferredMaterializedExprIds, columnId);
if (logicalProject.isPresent()) {
root = logicalProject.get().withChildren(root);
}
root = logicalResultSink.withChildren(root);
return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? extends Plan>) root,
logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId());
Expand Down

0 comments on commit 9c0511a

Please sign in to comment.