Skip to content

Commit

Permalink
[bugfix](iceberg)revert count(*) directly returned by fe (#38566)
Browse files Browse the repository at this point in the history
## Proposed changes

related #34928
  • Loading branch information
wuwenchi authored Aug 2, 2024
1 parent 9111da8 commit 2459a3e
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long fileSplitSize;
public long rowCount = 0;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -209,12 +210,6 @@ private List<Split> doGetSplits() throws UserException {
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long rowCount = getCountFromSnapshot();
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount >= 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}

CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
Expand Down Expand Up @@ -268,6 +263,12 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) {
// we can create a special empty split and skip the plan process
return splits.isEmpty() ? splits : Collections.singletonList(splits.get(0));
}

selectedPartitionNum = partitionPathSet.size();

return splits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -58,20 +57,16 @@
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -81,7 +76,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -633,23 +627,7 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
}
}

if (physicalPlan instanceof PhysicalResultSink
&& physicalPlan.child(0) instanceof PhysicalHashAggregate && !getScanNodes().isEmpty()
&& getScanNodes().get(0) instanceof IcebergScanNode) {
List<Column> columns = Lists.newArrayList();
NamedExpression output = physicalPlan.getOutput().get(0);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(
Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount))));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
return Optional.empty();
} else {
return Optional.empty();
}
return Optional.empty();
}

private void setFormatOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
Expand Down Expand Up @@ -637,25 +636,13 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
return Optional.empty();
}
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}
List<SelectListItem> selectItems = parsedSelectStmt.getSelectList().getItems();
List<Column> columns = new ArrayList<>(selectItems.size());
List<String> columnLabels = parsedSelectStmt.getColLabels();
List<String> data = new ArrayList<>();
if ((singleNodePlanner.getScanNodes().size() > 0 && singleNodePlanner.getScanNodes().get(0)
instanceof IcebergScanNode) && (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0)) {
SelectListItem item = selectItems.get(0);
Expr expr = item.getExpr();
String columnName = columnLabels.get(0);
columns.add(new Column(columnName, expr.getType()));
data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount));
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}
FormatOptions options = FormatOptions.getDefault();
for (int i = 0; i < selectItems.size(); i++) {
SelectListItem item = selectItems.get(i);
Expand Down

0 comments on commit 2459a3e

Please sign in to comment.