Skip to content

Commit

Permalink
[pipeline](exec) disable shared scan in default and disable shared sc…
Browse files Browse the repository at this point in the history
…an in limit with where scan (#25952) (#26815)
  • Loading branch information
HappenLee authored Nov 13, 2023
1 parent 2f74832 commit d25f02c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean disable_shared_scan = false;

@ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_cpu_hard_limit = false;

@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,11 +1255,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream())
.map(loc -> loc.backend_id).distinct().count();
return (int) (parallelInstance * numBackend);
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().getEnableSharedScan()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return scanRangeLocations.size();
}
Expand Down Expand Up @@ -1316,7 +1314,7 @@ public boolean getShouldColoScan() {
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
public boolean isKeySearch() {
protected boolean isKeySearch() {
List<SlotRef> whereSlot = Lists.newArrayList();
for (Expr conjunct : conjuncts) {
if (conjunct instanceof BinaryPredicate) {
Expand Down
23 changes: 21 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
Expand Down Expand Up @@ -146,6 +147,13 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti
*/
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);

// If scan is key search, should not enable the shared scan opt to prevent the performance problem
// 1. where contain the eq or in expr of key column slot
// 2. key column slot is distribution column and first column
protected boolean isKeySearch() {
return false;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
Expand Down Expand Up @@ -643,7 +651,18 @@ public static TScanRangeLocations createSingleScanRangeLocations(FederationBacke
return scanRangeLocation;
}

public boolean isKeySearch() {
return false;
// some scan should not enable the shared scan opt to prevent the performance problem
// 1. is key search
// 2. session variable not enable_shared_scan
public boolean shouldDisableSharedScan() {
boolean enableShardScan = false;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
enableShardScan = ConnectContext.get().getSessionVariable().getEnableSharedScan();
}
return isKeySearch() || !enableShardScan;
}

public boolean haveLimitAndConjunts() {
return hasLimit() && !conjuncts.isEmpty();
}
}
27 changes: 18 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1724,28 +1724,37 @@ private void computeFragmentHosts() throws Exception {
}).findFirst();

// disable shared scan optimization if one of conditions below is met:
// 1. Use non-pipeline engine
// 2. Number of scan ranges is larger than instances
// 3. This fragment has a colocated scan node
// 4. This fragment has a FileScanNode
// 5. Disable shared scan optimization by session variable
if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum
|| (node.isPresent() && node.get().getShouldColoScan())
// 1. Use non-pipeline or pipelineX engine
// 2. This fragment has a colocated scan node
// 3. This fragment has a FileScanNode
// 4. Disable shared scan optimization by session variable
if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan())
|| (node.isPresent() && node.get() instanceof FileScanNode)
|| (node.isPresent() && node.get().isKeySearch())
|| Config.disable_shared_scan) {
|| (node.isPresent() && node.get().shouldDisableSharedScan())) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
}
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}

perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false);
} else {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}

perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true);
}
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PIPELINE_ENGINE = "enable_pipeline_engine";

public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";

public static final String ENABLE_AGG_STATE = "enable_agg_state";

public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
Expand Down Expand Up @@ -696,6 +698,10 @@ public class SessionVariable implements Serializable, Writable {
expType = ExperimentalType.EXPERIMENTAL)
private boolean enablePipelineEngine = true;

@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, expType = ExperimentalType.EXPERIMENTAL,
needForward = true)
private boolean enableSharedScan = false;

@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, expType = ExperimentalType.EXPERIMENTAL)
public boolean enableAggState = false;

Expand Down Expand Up @@ -2617,6 +2623,10 @@ public boolean getEnablePipelineEngine() {
return enablePipelineEngine;
}

public boolean getEnableSharedScan() {
return enableSharedScan;
}

public static boolean enablePipelineEngine() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
Expand Down

0 comments on commit d25f02c

Please sign in to comment.