Skip to content

Commit

Permalink
[Improvement](scan) Refine ignoring data distribution without join (#…
Browse files Browse the repository at this point in the history
…37282)

Refine ignoring data distribution without join
  • Loading branch information
Gabriel39 authored Jul 5, 2024
1 parent 41501d9 commit 560e352
Showing 1 changed file with 5 additions and 113 deletions.
118 changes: 5 additions & 113 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
Expand All @@ -89,7 +88,6 @@
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
Expand All @@ -103,7 +101,6 @@
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
Expand Down Expand Up @@ -625,7 +622,7 @@ public TPipelineFragmentParams getStreamLoadPlan() throws Exception {
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());

Map<TNetworkAddress, TPipelineFragmentParams> tExecPlanFragmentParams
= ((FragmentExecParams) this.fragmentExecParamsMap.values().toArray()[0]).toTPipelineParams(0);
= ((FragmentExecParams) this.fragmentExecParamsMap.values().toArray()[0]).toThrift(0);
TPipelineFragmentParams fragmentParams = tExecPlanFragmentParams.values().stream().findFirst().get();
return fragmentParams;
}
Expand Down Expand Up @@ -781,7 +778,7 @@ private void sendPipelineCtx() throws TException, RpcException, UserException {
// 1. set up exec states
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
Map<TNetworkAddress, TPipelineFragmentParams> tParams = params.toTPipelineParams(backendIdx);
Map<TNetworkAddress, TPipelineFragmentParams> tParams = params.toThrift(backendIdx);

boolean needCheckBackendState = false;
if (queryOptions.getQueryType() == TQueryType.LOAD && profileFragmentId == 0) {
Expand Down Expand Up @@ -1855,8 +1852,8 @@ protected void computeFragmentHosts() throws Exception {
leftMostNode.getNumInstances());
boolean forceToLocalShuffle = context != null
&& context.getSessionVariable().isForceToLocalShuffle();
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream().allMatch(
scanNode -> scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size()))
boolean ignoreStorageDataDistribution = forceToLocalShuffle || (node.isPresent()
&& node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
&& useNereids);
if (node.isPresent() && ignoreStorageDataDistribution) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
Expand Down Expand Up @@ -3028,112 +3025,7 @@ public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}

List<TExecPlanFragmentParams> toThrift(int backendNum) {
List<TExecPlanFragmentParams> paramsList = Lists.newArrayList();
Set<SortNode> topnSortNodes = scanNodes.stream()
.filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode -> scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
Set<Integer> topnFilterSources = topnSortNodes.stream().map(
sort -> sort.getId().asInt()).collect(Collectors.toSet());
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
TExecPlanFragmentParams params = new TExecPlanFragmentParams();
params.setIsNereids(context != null ? context.getState().isNereids() : false);
params.setProtocolVersion(PaloInternalServiceVersion.V1);
params.setFragment(fragment.toThrift());
params.setDescTbl(descTable);
params.setParams(new TPlanFragmentExecParams());
params.params.setQueryId(queryId);
params.params.setFragmentInstanceId(instanceExecParam.instanceId);

Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
}
if (!topnFilterSources.isEmpty()) {
// topn_filter_source_node_ids is used by nereids not by legacy planner.
// if there is no topnFilterSources, do not set it.
// topn_filter_source_node_ids=null means legacy planner
params.params.topn_filter_source_node_ids = Lists.newArrayList(topnFilterSources);
}
params.params.setPerNodeScanRanges(scanRanges);
params.params.setPerExchNumSenders(perExchNumSenders);

if (tWorkloadGroups != null) {
params.setWorkloadGroups(tWorkloadGroups);
}
params.params.setDestinations(destinations);
params.params.setSenderId(i);
params.params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
params.setCurrentConnectFe(currentConnectFE);
params.setBackendNum(backendNum++);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
params.params.setSendQueryStatisticsWithEveryBatch(
fragment.isTransferQueryStatisticsWithEveryBatch());
params.params.setRuntimeFilterParams(new TRuntimeFilterParams());
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
.map(r -> r.getFilterId().asInt()).collect(Collectors.toSet());

for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
}
List<FRuntimeFilterTargetParam> fParams = ridToTargetParam.get(rf.getFilterId());
if (rf.hasRemoteTargets()) {
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParamsV2 = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam : fParams) {
if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
} else {
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
new TRuntimeFilterTargetParamsV2());
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_addr
= targetParam.targetFragmentInstanceAddr;
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
= new ArrayList<>();
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
.add(targetParam.targetFragmentInstanceId);
}
}
params.params.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(),
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParamsV2.values()));
} else {
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
for (FRuntimeFilterTargetParam targetParam : fParams) {
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
targetParam.targetFragmentInstanceAddr));
}
params.params.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
targetParams);
}
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt())
? 1 : entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
params.params.runtime_filter_params.putToRidToRuntimeFilter(
rf.getFilterId().asInt(), rf.toThrift());
}
}
params.setFileScanParams(fileScanRangeParamsMap);
paramsList.add(params);
}
return paramsList;
}

Map<TNetworkAddress, TPipelineFragmentParams> toTPipelineParams(int backendNum) {
Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
long memLimit = queryOptions.getMemLimit();
// 2. update memory limit for colocate join
if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
Expand Down

0 comments on commit 560e352

Please sign in to comment.