diff --git a/fe/src/main/java/org/apache/doris/common/util/ListUtil.java b/fe/src/main/java/org/apache/doris/common/util/ListUtil.java new file mode 100644 index 00000000000000..d5dc38cc8be9d7 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/util/ListUtil.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +public class ListUtil { + /** + * split a list to multi expected number sublist + * for example: + * + * list is : [1, 2, 3, 4, 5, 6, 7] + * expectedSize is : 3 + * + * return : + * [1, 4, 7] + * [2, 5] + * [3, 6] + */ + public static List> splitBySize(List list, int expectedSize) + throws NullPointerException, IllegalArgumentException { + Preconditions.checkNotNull(list, "list must not be null"); + Preconditions.checkArgument(expectedSize > 0, "expectedSize must larger than 0"); + + int splitSize = Math.min(expectedSize, list.size()); + + List> result = new ArrayList>(splitSize); + for (int i = 0; i < splitSize; i++) { + result.add(new ArrayList<>()); + } + + int index = 0; + for (T t : list) { + result.get(index).add(t); + index = (index + 1) % splitSize; + } + + return result; + } +} diff --git a/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java index fa976c55a4724d..cf2079b45da30d 100644 --- a/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -294,6 +294,10 @@ public int getNumInstances() { return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances()); } + public boolean isShuffleJoin() { + return distrMode == DistributionMode.PARTITIONED; + } + enum DistributionMode { NONE("NONE"), BROADCAST("BROADCAST"), diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index bbcd1bdc8539a7..a0cb5fb82f50db 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -145,15 +145,8 @@ public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQuer singleNodePlanner = new SingleNodePlanner(plannerContext); PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); - singleNodePlanner.validatePlan(singleNodePlan); - List resultExprs = queryStmt.getResultExprs(); if (statment instanceof InsertStmt) { - if (queryOptions.isSetMt_dop() && queryOptions.mt_dop > 0) { - throw new NotImplementedException( - "MT_DOP not supported for plans with insert."); - } - InsertStmt insertStmt = (InsertStmt) statment; if (insertStmt.getOlapTuple() != null && !insertStmt.isStreaming()) { singleNodePlan = new OlapRewriteNode(plannerContext.getNextNodeId(), singleNodePlan, insertStmt); diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 6f7b186d6689e9..67e2621ebb490d 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -150,31 +150,6 @@ public PlanNode createSingleNodePlan() throws UserException, AnalysisException { return singleNodePlan; } - /** - * Checks that the given single-node plan is executable: - * - It may not contain right or full outer joins with no equi-join conjuncts that - * are not inside the right child of a SubplanNode. - * - MT_DOP > 0 is not supported for plans with base table joins or table sinks. - * Throws a NotImplementedException if plan validation fails. - */ - public void validatePlan(PlanNode planNode) throws NotImplementedException { - if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop > 0 - && (planNode instanceof HashJoinNode || planNode instanceof CrossJoinNode)) { - throw new NotImplementedException( - "MT_DOP not supported for plans with base table joins or table sinks."); - } - - // As long as MT_DOP is unset or 0 any join can run in a single-node plan. - if (ctx_.isSingleNodeExec() && - (!ctx_.getQueryOptions().isSetMt_dop() || ctx_.getQueryOptions().mt_dop == 0)) { - return; - } - - for (PlanNode child : planNode.getChildren()) { - validatePlan(child); - } - } - /** * Creates an EmptyNode that 'materializes' the tuples of the given stmt. * Marks all collection-typed slots referenced in stmt as non-materialized because diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 9110d16cedcb5e..b8b8e95271c492 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -26,6 +26,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.ListUtil; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.planner.DataPartition; @@ -33,7 +34,6 @@ import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; -import org.apache.doris.planner.MysqlScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; @@ -56,7 +56,6 @@ import org.apache.doris.thrift.TLoadErrorHubInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; -import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragmentDestination; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; @@ -78,7 +77,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,7 +85,6 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -365,13 +362,7 @@ public void exec() throws Exception { // compute Fragment Instance computeScanRangeAssignment(); - // if mt_dop <= 1 - if (queryOptions.mt_dop <= 1) { - computeFragmentExecParams(); - } else { - computeFragmentExecParamsForParallelExec(); - validate(); - } + computeFragmentExecParams(); traceInstance(); @@ -701,7 +692,7 @@ private void computeFragmentExecParams() throws Exception { // assign instance ids numBackends = 0; for (FragmentExecParams params : fragmentExecParamsMap.values()) { - LOG.debug("parameter has instances.{}", params.instanceExecParams.size()); + LOG.debug("fragment {} has instances {}", params.fragment.getFragmentId(), params.instanceExecParams.size()); for (int j = 0; j < params.instanceExecParams.size(); ++j) { // we add instance_num to query_id.lo to create a // globally-unique instance id @@ -856,15 +847,28 @@ private void computeFragmentHosts() throws Exception { } else { //normat fragment Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator(); + int parallelExecInstanceNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); TNetworkAddress key = (TNetworkAddress) entry.getKey(); Map> value = (Map>) entry.getValue(); - FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); + for (Integer planNodeId : value.keySet()) { - instanceParam.perNodeScanRanges.put(planNodeId, value.get(planNodeId)); + List perNodeScanRanges = value.get(planNodeId); + int expectedInstanceNum = 1; + if (parallelExecInstanceNum > 1) { + //the scan instance num should not larger than the tablets num + expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum); + } + List> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, + expectedInstanceNum); + + for (List scanRangeParams : perInstanceScanRanges) { + FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); + instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); + params.instanceExecParams.add(instanceParam); + } } - params.instanceExecParams.add(instanceParam); } } @@ -912,296 +916,6 @@ private boolean isColocateJoin(PlanNode node) { return false; } - private void computeFragmentExecParamsForParallelExec() throws Exception { - // create exec params and set instance_id, host, per_node_scan_ranges - computeFragmentInstances(fragmentExecParamsMap.get(fragments.get(0).getFragmentId())); - - // Set destinations, per_exch_num_senders, sender_id. - for (PlanFragment srcFragment : fragments) { - if (!(srcFragment.getSink() instanceof DataStreamSink)) { - continue; - } - final PlanFragmentId desFragmentId = srcFragment.getDestFragment().getFragmentId(); - final FragmentExecParams srcParams = fragmentExecParamsMap.get(srcFragment.getFragmentId()); - final FragmentExecParams destParams = fragmentExecParamsMap.get(desFragmentId); - - // populate src_params->destinations - for (int i = 0; i < destParams.instanceExecParams.size(); i++) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - dest.setFragment_instance_id(destParams.instanceExecParams.get(i).instanceId); - dest.setServer(toRpcHost(destParams.instanceExecParams.get(i).host)); - dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(i).host)); - srcParams.destinations.add(dest); - } - - final DataSink sinker = srcFragment.getSink(); - Preconditions.checkState( - sinker.getOutputPartition().getType() == TPartitionType.HASH_PARTITIONED - || sinker.getOutputPartition().getType() == TPartitionType.UNPARTITIONED - || sinker.getOutputPartition().getType() == TPartitionType.RANDOM); - - PlanNodeId exchId = sinker.getExchNodeId(); - Integer senderIdBase = destParams.perExchNumSenders.get(exchId); - if (senderIdBase == null) { - destParams.perExchNumSenders.put(exchId.asInt(), srcParams.instanceExecParams.size()); - senderIdBase = 0; - } else { - destParams.perExchNumSenders.put(exchId.asInt(), - senderIdBase + srcParams.instanceExecParams.size()); - } - - for (int i = 0; i < srcParams.instanceExecParams.size(); i++) { - FInstanceExecParam srcInstanceParam = srcParams.instanceExecParams.get(i); - srcInstanceParam.senderId = senderIdBase + i; - } - } - } - - // compute instances from fragment - private void computeFragmentInstances(FragmentExecParams params) throws Exception { - // // traverse input fragments - for (PlanFragmentId fragmentId : params.inputFragments) { - computeFragmentInstances(fragmentExecParamsMap.get(fragmentId)); - } - - // case 1: single instance executed at coordinator - final PlanFragment fragment = params.fragment; - if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { - Reference backendIdRef = new Reference(); - TNetworkAddress execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef); - if (execHostport == null) { - LOG.warn("DataPartition UNPARTITIONED, no scanNode Backend"); - throw new UserException("there is no scanNode Backend"); - } - TUniqueId instanceId = getNextInstanceId(); - FInstanceExecParam instanceParam = new FInstanceExecParam(instanceId, execHostport, - 0, params); - params.instanceExecParams.add(instanceParam); - this.addressToBackendID.put(execHostport, backendIdRef.getRef()); - return; - } - - if (containsUnionNode(fragment.getPlanRoot())) { - createUnionInstance(params); - return; - } - - PlanNode leftPlanNode = findLeftmostNode(fragment.getPlanRoot()); - if (leftPlanNode instanceof MysqlScanNode - || leftPlanNode instanceof OlapScanNode) { - // case 2: leaf fragment with leftmost scan - // TODO: check that there's only one scan in this fragment - createScanInstance(leftPlanNode.getId(), params); - } else { - // case 3: interior fragment without leftmost scan - // we assign the same hosts as those of our leftmost input fragment (so that a - // merge aggregation fragment runs on the hosts that provide the input data) - createCollocatedInstance(params); - } - } - - private List findScanNodes(PlanNode plan) { - List result = Lists.newArrayList(); - List nodeList = Lists.newArrayList(); - getAllNodes(plan, nodeList); - for (PlanNode node : nodeList) { - if (node instanceof MysqlScanNode - || node instanceof OlapScanNode) { - result.add(node.getId()); - } - } - return result; - } - - private void getAllNodes(PlanNode plan, List nodeList) { - if (plan.getChildren().size() > 0) { - nodeList.addAll(plan.getChildren()); - for (PlanNode child : plan.getChildren()) { - getAllNodes(child, nodeList); - } - } - nodeList.add(plan); - } - - private Set getScanHosts(PlanNodeId id, FragmentExecParams fragmentExecParams) { - Set result = Sets.newHashSet(); - for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) { - Map> planNodeToScanRangeParams - = fragmentExecParams.scanRangeAssignment.get(host); - for (Integer planNodeId : planNodeToScanRangeParams.keySet()) { - if (id.asInt() == planNodeId) { - result.add(host); - } - } - } - - return result; - } - - private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) - throws UserException { - int maxNumInstance = queryOptions.mt_dop; - if (maxNumInstance == 0) { - maxNumInstance = 1; - } - - if (fragmentExecParams.scanRangeAssignment.isEmpty()) { - // this scan doesn't have any scan ranges: run a single instance on the random backend - Reference backendIdRef = new Reference(); - TNetworkAddress execHostport = SimpleScheduler.getHost(this.idToBackend, backendIdRef); - if (execHostport == null) { - throw new UserException("there is no scanNode Backend"); - } - FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), execHostport, 0, - fragmentExecParams); - fragmentExecParams.instanceExecParams.add(instanceParam); - return; - } - - final int leftMostScanIdInt = leftMostScanId.asInt(); - int perFragmentInstanceIdx = 0; - for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) { - // evenly divide up the scan ranges of the leftmost scan between at most - // instances - final Map> scanMap = fragmentExecParams.scanRangeAssignment.get(host); - final List scanRangesList = scanMap.get(leftMostScanIdInt); - Preconditions.checkState(scanRangesList != null); - // try to load-balance scan ranges by assigning just beyond the average number of - // bytes to each instance - // TODO: fix shortcomings introduced by uneven split sizes, - // this could end up assigning 0 scan ranges to an instance - final int numInstance = Math.min(maxNumInstance, scanRangesList.size()); - Preconditions.checkState(numInstance != 0); - final List perHostInstanceExecParams = Lists.newArrayList(); - // create FInstanceExecParam in one host - for (int i = 0; i < numInstance; i++) { - final FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), - host, perFragmentInstanceIdx++, fragmentExecParams); - fragmentExecParams.instanceExecParams.add(instanceParam); - perHostInstanceExecParams.add(instanceParam); - List paramList = instanceParam.perNodeScanRanges.get(leftMostScanIdInt); - if (paramList == null) { - paramList = Lists.newArrayList(); - instanceParam.perNodeScanRanges.put(leftMostScanIdInt, paramList); - } - } - - // assign tablet - Collections.shuffle(scanRangesList); - for (int i = 0; i < scanRangesList.size(); i++) { - final TScanRangeParams scanRangeParams = scanRangesList.get(i); - final int position = i % numInstance; - perHostInstanceExecParams.get(position).perNodeScanRanges.get(leftMostScanIdInt).add(scanRangeParams); - } - } - } - - private void validate() { - int numFragments = 0; - for (PlanFragment fragment : fragments) { - // TODO chenhao fragment' id produced in palo may larger than fragment sizes, - // need to update this after merge latest impala plan codes - //Preconditions.checkState(fragment.getFragmentId().asInt() <= fragments.size()); - Preconditions.checkState(fragment.getFragmentId() - == fragmentExecParamsMap.get(fragment.getFragmentId()).fragment.getFragmentId()); - ++numFragments; - } - - Preconditions.checkState(numFragments == fragmentExecParamsMap.size()); - - // we assigned the correct number of scan ranges per (host, node id): - // assemble a map from host -> (map from node id -> #scan ranges) - Map> countMap = Maps.newHashMap(); - for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) { - for (FInstanceExecParam instanceExecParam : fragmentExecParams.instanceExecParams) { - Map planNodeIdToCount = countMap.get(instanceExecParam.host); - if (planNodeIdToCount == null) { - planNodeIdToCount = Maps.newHashMap(); - countMap.put(instanceExecParam.host, planNodeIdToCount); - } - - for (Integer planNodeId : instanceExecParam.perNodeScanRanges.keySet()) { - Integer count = planNodeIdToCount.get(planNodeId); - if (count == null) { - planNodeIdToCount.put(planNodeId, 0); - count = 0; - } - int lastCount = planNodeIdToCount.get(planNodeId); - planNodeIdToCount.put(planNodeId, lastCount + - instanceExecParam.perNodeScanRanges.get(planNodeId).size()); - } - } - } - - for (FragmentExecParams fragmentExecParams : fragmentExecParamsMap.values()) { - for (TNetworkAddress host : fragmentExecParams.scanRangeAssignment.keySet()) { - Preconditions.checkState(countMap.get(host).size() != 0); - final Map nodeCountMap = countMap.get(host); - Map> planNodeIdToScanRangeList - = fragmentExecParams.scanRangeAssignment.get(host); - for (Integer planNodeId : planNodeIdToScanRangeList.keySet()) { - Preconditions.checkState(nodeCountMap.get(planNodeId) > 0); - Preconditions.checkState(nodeCountMap.get(planNodeId) - == planNodeIdToScanRangeList.get(planNodeId).size()); - } - } - } - // TODO: add validation for BackendExecParams - } - - // create collocated instance according to inputFragments - private void createCollocatedInstance(FragmentExecParams fragmentExecParams) { - Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1); - final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams. - inputFragments.get(0)); - int perFragmentInstanceIdx = 0; - for (FInstanceExecParam inputInstanceParams : inputFragmentParams.instanceExecParams) { - FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), - inputInstanceParams.host, perFragmentInstanceIdx++, fragmentExecParams); - fragmentExecParams.instanceExecParams.add(instanceParam); - } - } - - private TUniqueId getNextInstanceId() { - TUniqueId result = nextInstanceId.deepCopy(); - nextInstanceId.lo++; - return result; - } - - - private void createUnionInstance(FragmentExecParams fragmentExecParams) { - final PlanFragment fragment = fragmentExecParams.fragment; - // Add hosts of scan nodes - List scanNodeIds = findScanNodes(fragment.getPlanRoot()); - - Set hostsSets = Sets.newHashSet(); - for(PlanNodeId id: scanNodeIds) { - hostsSets.addAll(getScanHosts(id, fragmentExecParams)); - } - - // UnionNode's child is not ScanNode - for (PlanFragmentId inputFragmentId : fragmentExecParams.inputFragments) { - FragmentExecParams inputeExecParams = fragmentExecParamsMap.get(inputFragmentId); - for (FInstanceExecParam instanceParam : inputeExecParams.instanceExecParams) { - hostsSets.add(instanceParam.host); - } - } - - // create a single instance per host - // TODO-MT: figure out how to parallelize Union - int perFragmentIdx = 0; - for (TNetworkAddress host : hostsSets) { - FInstanceExecParam instanceParam = new FInstanceExecParam(getNextInstanceId(), host, - perFragmentIdx++, fragmentExecParams); - // assign all scan ranges - fragmentExecParams.instanceExecParams.add(instanceParam); - if (fragmentExecParams.scanRangeAssignment.get(host) != null - && fragmentExecParams.scanRangeAssignment.get(host).size() > 0) { - instanceParam.perNodeScanRanges = fragmentExecParams.scanRangeAssignment.get(host); - } - } - } - // Returns the id of the leftmost node of any of the gives types in 'plan_root', // or INVALID_PLAN_NODE_ID if no such node present. private PlanNode findLeftmostNode(PlanNode plan) { diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 4fa1eda8754004..fd8658bf1ac2a2 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -67,7 +67,9 @@ public class SessionVariable implements Serializable, Writable { public static final String BATCH_SIZE = "batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join"; - public static final String MT_DOP = "mt_dop"; + public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; + public static final int MIN_EXEC_INSTANCE_NUM = 1; + public static final int MAX_EXEC_INSTANCE_NUM = 32; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -163,11 +165,7 @@ public class SessionVariable implements Serializable, Writable { // if true, need report to coordinator when plan fragment execute successfully. @VariableMgr.VarAttr(name = CODEGEN_LEVEL) - private int codegenLevel = 0; - - // multithreaded degree of intra-node parallelism - @VariableMgr.VarAttr(name = MT_DOP) - private int mtDop = 0; + private int codegenLevel = 0; @VariableMgr.VarAttr(name = BATCH_SIZE) private int batchSize = 1024; @@ -178,6 +176,13 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN) private boolean disableColocateJoin = false; + /* + * the parallel exec instance num for one Fragment in one BE + * 1 means disable this feature + */ + @VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM) + private int parallelExecInstanceNum = 1; + public long getMaxExecMemByte() { return maxExecMemByte; } @@ -402,14 +407,6 @@ public void setResourceGroup(String resourceGroup) { this.resourceGroup = resourceGroup; } - public int getMtDop() { - return this.mtDop; - } - - public void setMtDop(int mtDop) { - this.mtDop = mtDop; - } - public boolean isDisableColocateJoin() { return disableColocateJoin; } @@ -418,7 +415,21 @@ public void setDisableColocateJoin(boolean disableColocateJoin) { this.disableColocateJoin = disableColocateJoin; } - // Serialize to thrift object + public int getParallelExecInstanceNum() { + return parallelExecInstanceNum; + } + + public void setParallelExecInstanceNum(int parallelExecInstanceNum) { + if (parallelExecInstanceNum < MIN_EXEC_INSTANCE_NUM) { + this.parallelExecInstanceNum = MIN_EXEC_INSTANCE_NUM; + } else if (parallelExecInstanceNum > MAX_EXEC_INSTANCE_NUM) { + this.parallelExecInstanceNum = MAX_EXEC_INSTANCE_NUM; + } else { + this.parallelExecInstanceNum = parallelExecInstanceNum; + } + } + + // Serialize to thrift object TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMem_limit(maxExecMemByte); @@ -435,7 +446,6 @@ TQueryOptions toThrift() { tResult.setBatch_size(batchSize); tResult.setDisable_stream_preaggregations(disableStreamPreaggregations); - tResult.setMt_dop(mtDop); return tResult; } @@ -470,7 +480,7 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, collationServer); out.writeInt(batchSize); out.writeBoolean(disableStreamPreaggregations); - out.writeInt(mtDop); + out.writeInt(parallelExecInstanceNum); } @Override @@ -507,7 +517,7 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_38) { batchSize = in.readInt(); disableStreamPreaggregations = in.readBoolean(); - mtDop = in.readInt(); + parallelExecInstanceNum = in.readInt(); } } } diff --git a/fe/src/test/java/org/apache/doris/common/util/ListUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/ListUtilTest.java new file mode 100644 index 00000000000000..75858189efb722 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/common/util/ListUtilTest.java @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; + +public class ListUtilTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void testSplitBySizeNormal() { + List lists = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7); + int expectSize = 3; + + List> splitLists = ListUtil.splitBySize(lists, expectSize); + + Assert.assertEquals(splitLists.size(), 3); + Assert.assertEquals(splitLists.get(0).size(), 3); + Assert.assertEquals(splitLists.get(1).size(), 2); + Assert.assertEquals(splitLists.get(2).size(), 2); + } + + @Test + public void testSplitBySizeNormal2() { + List lists = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7); + int expectSize = 1; + + List> splitLists = ListUtil.splitBySize(lists, expectSize); + + Assert.assertEquals(splitLists.size(), 1); + Assert.assertEquals(lists, splitLists.get(0)); + } + + @Test + public void testSplitBySizeWithLargeExpectSize() { + List lists = Lists.newArrayList(1, 2, 3); + int expectSize = 10; + + List> splitLists = ListUtil.splitBySize(lists, expectSize); + + Assert.assertEquals(splitLists.size(), lists.size()); + Assert.assertTrue( splitLists.get(0).get(0) == 1); + Assert.assertTrue( splitLists.get(1).get(0) == 2); + Assert.assertTrue( splitLists.get(2).get(0) == 3); + } + + @Test + public void testSplitBySizeWithEmptyList() { + List lists = Lists.newArrayList(); + int expectSize = 10; + + List> splitLists = ListUtil.splitBySize(lists, expectSize); + + Assert.assertEquals(splitLists.size(), lists.size()); + } + + @Test + public void testSplitBySizeWithNullList() { + List lists = null; + int expectSize = 10; + + expectedEx.expect(NullPointerException.class); + expectedEx.expectMessage("list must not be null"); + + ListUtil.splitBySize(lists, expectSize); + } + + @Test + public void testSplitBySizeWithNegativeSize() { + List lists = Lists.newArrayList(1, 2, 3); + int expectSize = -1; + + expectedEx.expect(IllegalArgumentException.class); + expectedEx.expectMessage("expectedSize must larger than 0"); + + ListUtil.splitBySize(lists, expectSize); + } +}