Skip to content

Commit

Permalink
[Fix](ScanNode) Move the finalize phase of ScanNode to after the end …
Browse files Browse the repository at this point in the history
…of the Physical Translate phase. (#37565)

## Proposed changes

Currently, Doris first obtains splits and then performs projection.
After column pruning, it calls `updateRequiredSlots` to update the
scanRange information. However, the Trino connector's column pruning
pushdown needs to be completed before obtaining splits.

Therefore, we move the finalize phase of `ScanNode` to after the end of
the `Physical Translate` phase, so that `createScanRangeLocations` can
use the final columns which have been pruning.
  • Loading branch information
BePPPower authored and dataroaring committed Jul 24, 2024
1 parent fd2f06e commit 5b9e491
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
Expand All @@ -40,7 +39,6 @@
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -80,7 +78,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* FileQueryScanNode for querying the file access type of catalog, now only support
Expand Down Expand Up @@ -183,16 +180,6 @@ protected void initSchemaParams() throws UserException {
params.setSrcTupleId(-1);
}

/**
* Reset required_slots in contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
*/
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
updateRequiredSlots();
}

private void updateRequiredSlots() throws UserException {
params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
Expand All @@ -39,7 +38,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
Expand All @@ -59,7 +57,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class JdbcScanNode extends ExternalScanNode {
Expand Down Expand Up @@ -259,12 +256,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createJdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
Expand All @@ -33,7 +32,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
Expand All @@ -53,7 +51,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -118,12 +115,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createOdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.datasource.paimon.source;

import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
Expand All @@ -27,7 +26,6 @@
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand All @@ -38,7 +36,6 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -279,22 +276,6 @@ private boolean supportNativeReader() {
}
}

//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
String cols = desc.getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(","));
for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.datasource.trinoconnector.source;

import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
Expand All @@ -28,15 +27,13 @@
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTrinoConnectorFileDesc;
import org.apache.doris.trinoconnector.TrinoColumnMetadata;
Expand Down Expand Up @@ -321,31 +318,6 @@ private <T> String encodeObjectToString(T t, ObjectMapperProvider objectMapperPr
}
}

// When calling 'setTrinoConnectorParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, trino_connector_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap();
Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap();
List<ColumnHandle> columnHandles = new ArrayList<>();
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
String colName = slotDescriptor.getColumn().getName();
if (columnMetadataMap.containsKey(colName)) {
columnHandles.add(columnHandleMap.get(colName));
}
}

for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.trino_connector_params.setTrinoConnectorColumnHandles(
encodeObjectToString(columnHandles, objectMapperProvider));
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
Expand Down Expand Up @@ -276,6 +274,9 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
throw new AnalysisException("tables with unknown column stats: " + builder);
}
}
for (ScanNode scanNode : context.getScanNodes()) {
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
}
return rootFragment;
}

Expand Down Expand Up @@ -648,7 +649,6 @@ public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorCon
)
);
context.getTopnFilterContext().translateTarget(esScan, esScanNode, context);
Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -699,7 +699,6 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
)
);
context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
Expand All @@ -725,7 +724,6 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand All @@ -750,7 +748,6 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
Expand Down Expand Up @@ -829,8 +826,6 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
);
context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
DataPartition dataPartition = DataPartition.RANDOM;
Expand Down Expand Up @@ -917,7 +912,6 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand All @@ -939,7 +933,6 @@ public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Pl
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, tvfRelation);

// TODO: it is weird update label in this way
Expand Down Expand Up @@ -2003,6 +1996,7 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}
requiredSlotIdSet.add(lastSlot.getId());
}
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, slotIdsByOrder, context);
Expand Down Expand Up @@ -2452,22 +2446,16 @@ private void updateScanSlotsMaterialization(ScanNode scanNode,
if (scanNode.getTupleDesc().getSlots().isEmpty()) {
scanNode.getTupleDesc().getSlots().add(smallest);
}
try {
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
} catch (UserException e) {
Util.logAndThrowRuntimeException(LOG,
"User Exception while reset external file scan node contexts.", e);
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,11 @@ Set<String> getDistributionColumnNames() {
: Sets.newTreeSet();
}

@Override
/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();
Expand Down
10 changes: 0 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.StatisticalType;
Expand Down Expand Up @@ -177,15 +176,6 @@ protected boolean isKeySearch() {
return false;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
}

private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
Expand Down

0 comments on commit 5b9e491

Please sign in to comment.