diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index d31a3c3806b3cc..bd8d0b9c93a0fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -101,6 +101,7 @@ import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -3071,8 +3072,10 @@ public AutoIncrementGenerator getAutoIncrementGenerator() { * @param selectedIndexId the index want to scan */ public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) { + boolean useStoreRow = this.storeRowColumn() + && CollectionUtils.isEmpty(getTableProperty().getCopiedRowStoreColumns()); TFetchOption fetchOption = new TFetchOption(); - fetchOption.setFetchRowStore(this.storeRowColumn()); + fetchOption.setFetchRowStore(useStoreRow); fetchOption.setUseTwoPhaseFetch(true); // get backend by tag @@ -3096,7 +3099,7 @@ public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) { fetchOption.setNodesInfo(nodesInfo); - if (!this.storeRowColumn()) { + if (!useStoreRow) { List columnsDesc = Lists.newArrayList(); getColumnDesc(selectedIndexId, columnsDesc, null, null); fetchOption.setColumnDesc(columnsDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 91cef4aae530e0..bb65624cde461b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1900,7 +1900,13 @@ public PlanFragment visitPhysicalProject(PhysicalProject project List allProjectionExprs = Lists.newArrayList(); List slots = null; // TODO FE/BE do not support multi-layer-project on MultiDataSink now. - if (project.hasMultiLayerProjection() && !(inputFragment instanceof MultiCastPlanFragment)) { + if (project.hasMultiLayerProjection() + && !(inputFragment instanceof MultiCastPlanFragment) + // TODO support for two phase read with project, remove it after refactor + && !(project.child() instanceof PhysicalDeferMaterializeTopN) + && !(project.child() instanceof PhysicalDeferMaterializeOlapScan + || (project.child() instanceof PhysicalFilter + && ((PhysicalFilter) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) { int layerCount = project.getMultiLayerProjects().size(); for (int i = 0; i < layerCount; i++) { List layer = project.getMultiLayerProjects().get(i); @@ -2008,37 +2014,28 @@ public PlanFragment visitPhysicalProject(PhysicalProject project } if (inputPlanNode instanceof ScanNode) { - TupleDescriptor projectionTuple = null; - // slotIdsByOrder is used to ensure the ScanNode's output order is same with current Project - // if we change the output order in translate project, the upper node will receive wrong order - // tuple, since they get the order from project.getOutput() not scan.getOutput()./ - projectionTuple = generateTupleDesc(slots, - ((ScanNode) inputPlanNode).getTupleDesc().getTable(), context); - inputPlanNode.setProjectList(projectionExprs); - inputPlanNode.setOutputTupleDesc(projectionTuple); - - // TODO: this is a temporary scheme to support two phase read when has project. - // we need to refactor all topn opt into rbo stage. + // TODO support for two phase read with project, remove this if after refactor + if (!(project.child() instanceof PhysicalDeferMaterializeOlapScan + || (project.child() instanceof PhysicalFilter + && ((PhysicalFilter) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) { + TupleDescriptor projectionTuple = generateTupleDesc(slots, + ((ScanNode) inputPlanNode).getTupleDesc().getTable(), context); + inputPlanNode.setProjectList(projectionExprs); + inputPlanNode.setOutputTupleDesc(projectionTuple); + } if (inputPlanNode instanceof OlapScanNode) { - ArrayList olapScanSlots = - context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots(); - SlotDescriptor lastSlot = olapScanSlots.get(olapScanSlots.size() - 1); - if (lastSlot.getColumn() != null - && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) { - injectRowIdColumnSlot(projectionTuple); - SlotRef slotRef = new SlotRef(lastSlot); - inputPlanNode.getProjectList().add(slotRef); - requiredByProjectSlotIdSet.add(lastSlot.getId()); - requiredSlotIdSet.add(lastSlot.getId()); - } ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet); } updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context); } else { - TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); - inputPlanNode.setProjectList(projectionExprs); - inputPlanNode.setOutputTupleDesc(tupleDescriptor); + if (project.child() instanceof PhysicalDeferMaterializeTopN) { + inputFragment.setOutputExprs(projectionExprs); + } else { + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context); + inputPlanNode.setProjectList(projectionExprs); + inputPlanNode.setOutputTupleDesc(tupleDescriptor); + } } return inputFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java index 6d90d81349d08f..818721926c56ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; import org.apache.doris.qe.ConnectContext; @@ -44,6 +45,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * rewrite simple top n query to defer materialize slot not use for sort or predicate @@ -54,51 +56,124 @@ public class DeferMaterializeTopNResult implements RewriteRuleFactory { public List buildRules() { return ImmutableList.of( RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( - logicalResultSink(logicalTopN(logicalOlapScan())) - .when(r -> r.child().getLimit() < getTopNOptLimitThreshold()) - .whenNot(r -> r.child().getOrderKeys().isEmpty()) - .when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr) - .allMatch(Expression::isColumnFromTable)) - .when(r -> r.child().child().getTable().getEnableLightSchemaChange()) - .when(r -> r.child().child().getTable().isDupKeysOrMergeOnWrite()) - .then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child())) + logicalResultSink( + logicalTopN( + logicalOlapScan() + .when(s -> s.getTable().getEnableLightSchemaChange()) + .when(s -> s.getTable().isDupKeysOrMergeOnWrite()) + ).when(t -> t.getLimit() < getTopNOptLimitThreshold()) + .whenNot(t -> t.getOrderKeys().isEmpty()) + .when(t -> t.getOrderKeys().stream() + .map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + ).then(r -> deferMaterialize(r, r.child(), + Optional.empty(), Optional.empty(), r.child().child())) ), RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( - logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan()))) - .when(r -> r.child().getLimit() < getTopNOptLimitThreshold()) - .whenNot(r -> r.child().getOrderKeys().isEmpty()) - .when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr) - .allMatch(Expression::isColumnFromTable)) - .when(r -> r.child().child().child().getTable().getEnableLightSchemaChange()) - .when(r -> r.child().child().child().getTable().isDupKeysOrMergeOnWrite()) - .then(r -> { - LogicalFilter filter = r.child().child(); - return deferMaterialize(r, r.child(), Optional.of(filter), filter.child()); - }) + logicalResultSink( + logicalTopN( + logicalFilter( + logicalOlapScan() + .when(s -> s.getTable().getEnableLightSchemaChange()) + .when(s -> s.getTable().isDupKeysOrMergeOnWrite()) + ) + ).when(t -> t.getLimit() < getTopNOptLimitThreshold()) + .whenNot(t -> t.getOrderKeys().isEmpty()) + .when(t -> t.getOrderKeys().stream() + .map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + ).then(r -> { + LogicalFilter filter = r.child().child(); + return deferMaterialize(r, r.child(), Optional.empty(), + Optional.of(filter), filter.child()); + }) + ), + RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( + logicalResultSink( + logicalTopN( + logicalProject( + logicalFilter( + logicalOlapScan() + .when(s -> s.getTable().getEnableLightSchemaChange()) + .when(s -> s.getTable().isDupKeysOrMergeOnWrite()) + ) + ) + ).when(t -> t.getLimit() < getTopNOptLimitThreshold()) + .whenNot(t -> t.getOrderKeys().isEmpty()) + .when(t -> t.getOrderKeys().stream() + .map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + ).then(r -> { + LogicalProject> project = r.child().child(); + LogicalFilter filter = project.child(); + return deferMaterialize(r, r.child(), Optional.of(project), + Optional.of(filter), filter.child()); + }) + ), + RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build( + logicalResultSink(logicalProject( + logicalTopN( + logicalProject(logicalFilter( + logicalOlapScan() + .when(s -> s.getTable().getEnableLightSchemaChange()) + .when(s -> s.getTable().isDupKeysOrMergeOnWrite()) + ) + ) + ).when(t -> t.getLimit() < getTopNOptLimitThreshold()) + .whenNot(t -> t.getOrderKeys().isEmpty()) + .when(t -> t.getOrderKeys().stream() + .map(OrderKey::getExpr) + .allMatch(Expression::isColumnFromTable)) + )).then(r -> { + LogicalProject upperProject = r.child(); + LogicalProject> bottomProject = r.child().child().child(); + List projections = upperProject.mergeProjections(bottomProject); + LogicalProject project = upperProject.withProjects(projections); + LogicalFilter filter = bottomProject.child(); + return deferMaterialize(r, r.child().child(), Optional.of(project), + Optional.of(filter), filter.child()); + }) ) ); } private Plan deferMaterialize(LogicalResultSink logicalResultSink, - LogicalTopN logicalTopN, Optional> logicalFilter, - LogicalOlapScan logicalOlapScan) { + LogicalTopN logicalTopN, Optional> logicalProject, + Optional> logicalFilter, LogicalOlapScan logicalOlapScan) { Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column"); SlotReference columnId = SlotReference.fromColumn( logicalOlapScan.getTable(), rowId, logicalOlapScan.getQualifier()); + Set orderKeys = Sets.newHashSet(); Set deferredMaterializedExprIds = Sets.newHashSet(logicalOlapScan.getOutputExprIdSet()); logicalFilter.ifPresent(filter -> filter.getConjuncts() .forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()))); logicalTopN.getOrderKeys().stream() .map(OrderKey::getExpr) .map(Slot.class::cast) + .peek(orderKeys::add) .map(NamedExpression::getExprId) .filter(Objects::nonNull) .forEach(deferredMaterializedExprIds::remove); + if (logicalProject.isPresent()) { + deferredMaterializedExprIds.retainAll(logicalProject.get().getInputSlots().stream() + .map(NamedExpression::getExprId).collect(Collectors.toSet())); + } LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan( logicalOlapScan, deferredMaterializedExprIds, columnId); Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan); + if (logicalProject.isPresent()) { + + ImmutableList.Builder requiredSlots = ImmutableList.builder(); + logicalProject.get().getInputSlots().forEach(requiredSlots::add); + requiredSlots.addAll(orderKeys); + requiredSlots.add(columnId); + root = new LogicalProject<>(requiredSlots.build(), root); + } root = new LogicalDeferMaterializeTopN<>((LogicalTopN) logicalTopN.withChildren(root), deferredMaterializedExprIds, columnId); + if (logicalProject.isPresent()) { + root = logicalProject.get().withChildren(root); + } root = logicalResultSink.withChildren(root); return new LogicalDeferMaterializeResultSink<>((LogicalResultSink) root, logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId());