Skip to content

Commit

Permalink
fix 1
Browse files Browse the repository at this point in the history
  • Loading branch information
BePPPower committed Jul 12, 2024
1 parent bca8b07 commit d494308
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,6 @@ protected void initSchemaParams() throws UserException {
params.setSrcTupleId(-1);
}

/**
* Reset required_slots in contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
*/
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
updateRequiredSlots();
}

private void updateRequiredSlots() throws UserException {
params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createJdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createOdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,6 @@ private boolean supportNativeReader() {
}
}

//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
String cols = desc.getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(","));
for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,6 @@ private <T> String encodeObjectToString(T t, ObjectMapperProvider objectMapperPr
}
}

// When calling 'setTrinoConnectorParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, trino_connector_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap();
Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap();
List<ColumnHandle> columnHandles = new ArrayList<>();
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
String colName = slotDescriptor.getColumn().getName();
if (columnMetadataMap.containsKey(colName)) {
columnHandles.add(columnHandleMap.get(colName));
}
}

for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.trino_connector_params.setTrinoConnectorColumnHandles(
encodeObjectToString(columnHandles, objectMapperProvider));
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
Expand Down Expand Up @@ -275,6 +273,9 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
throw new AnalysisException("tables with unknown column stats: " + builder);
}
}
for (ScanNode scanNode : context.getScanNodes()) {
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
}
return rootFragment;
}

Expand Down Expand Up @@ -646,7 +647,6 @@ public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorCon
)
);
context.getTopnFilterContext().translateTarget(esScan, esScanNode, context);
Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -697,7 +697,6 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
)
);
context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
Expand All @@ -723,7 +722,6 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand All @@ -748,7 +746,6 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
Expand Down Expand Up @@ -827,8 +824,6 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
);
context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
DataPartition dataPartition = DataPartition.RANDOM;
Expand Down Expand Up @@ -915,7 +910,6 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand All @@ -937,7 +931,6 @@ public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Pl
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, tvfRelation);

// TODO: it is weird update label in this way
Expand Down Expand Up @@ -2446,22 +2439,16 @@ private void updateScanSlotsMaterialization(ScanNode scanNode,
if (scanNode.getTupleDesc().getSlots().isEmpty()) {
scanNode.getTupleDesc().getSlots().add(smallest);
}
try {
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
} catch (UserException e) {
Util.logAndThrowRuntimeException(LOG,
"User Exception while reset external file scan node contexts.", e);
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TOlapScanNode;
import org.apache.doris.thrift.TOlapTableIndex;
Expand Down Expand Up @@ -1738,6 +1739,14 @@ public void finalizeForNereids() {
computeStatsForNereids();
// NOTICE: must call here to get selected tablet row count to let block rules work well.
mockRowCountInStatistic();

outputColumnUniqueIds.clear();
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
outputColumnUniqueIds.add(slot.getColumn().getUniqueId());
}
}

private void computeStatsForNereids() {
Expand All @@ -1756,17 +1765,6 @@ Set<String> getDistributionColumnNames() {
: Sets.newTreeSet();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();
for (SlotDescriptor slot : context.getTupleDesc(this.getTupleId()).getSlots()) {
if (requiredByProjectSlotIdSet.contains(slot.getId()) && slot.getColumn() != null) {
outputColumnUniqueIds.add(slot.getColumn().getUniqueId());
}
}
}

@Override
public StatsDelta genStatsDelta() throws AnalysisException {
return new StatsDelta(Env.getCurrentEnv().getCurrentCatalog().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,6 @@ protected boolean isKeySearch() {
return false;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
}

private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
Expand Down

0 comments on commit d494308

Please sign in to comment.