From 1041caf76770d44a2264934b16f12e9e0f1f6190 Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 22 Dec 2023 12:09:51 +0800 Subject: [PATCH] [improvement](executor) Add tvf and regression test for Workload Scheduler (#28733) 1 Add select workload schedule policy tvf 2 Add reg test --- be/src/vec/exec/scan/vmeta_scanner.cpp | 20 ++ be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../CreateWorkloadSchedPolicyStmt.java | 34 +++ .../workloadgroup/WorkloadGroupMgr.java | 13 ++ .../WorkloadActionMeta.java | 16 ++ .../WorkloadConditionCompareUtils.java | 18 ++ .../WorkloadConditionMeta.java | 2 +- .../WorkloadQueryInfo.java | 2 +- .../WorkloadSchedPolicy.java | 11 +- .../WorkloadSchedPolicyMgr.java | 67 +++--- .../tablefunction/MetadataGenerator.java | 30 +++ .../MetadataTableValuedFunction.java | 2 + .../tablefunction/TableValuedFunctionIf.java | 2 + ...orkloadSchedPolicyTableValuedFunction.java | 88 ++++++++ .../doris/resource/WorkloadSchedTest.java | 197 ++++++++++++++++++ gensrc/thrift/Types.thrift | 1 + .../test_workload_sched_policy.out | 9 + .../test_workload_sched_policy.groovy | 168 +++++++++++++++ 18 files changed, 637 insertions(+), 45 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java create mode 100644 regression-test/data/workload_manager_p0/test_workload_sched_policy.out create mode 100644 regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 1f4dcd8593de9a0..22545fa4dcefc51 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -238,6 +238,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::WORKLOAD_GROUPS: RETURN_IF_ERROR(_build_workload_groups_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::WORKLOAD_SCHED_POLICY: + RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, &request)); + break; case TMetadataType::CATALOGS: RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, &request)); break; @@ -379,6 +382,23 @@ Status VMetaScanner::_build_workload_groups_metadata_request( return Status::OK(); } +Status VMetaScanner::_build_workload_sched_policy_metadata_request( + const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_workload_sched_policy_metadata_request"; + + // create request + request->__set_cluster_name(""); + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY); + metadata_table_params.__set_current_user_ident(_user_identity); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_catalogs_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 7c4a1f2b2deff5e..59bd55dc2d8653f 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -79,6 +79,8 @@ class VMetaScanner : public VScanner { TFetchSchemaTableDataRequest* request); Status _build_workload_groups_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); Status _build_catalogs_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range, diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java index ee82b57822f9c0a..001068476d40d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java @@ -88,4 +88,38 @@ public List getActions() { public Map getProperties() { return properties; } + + @Override + public String toSql() { + String str = ""; + str = str + "CREAYE "; + str = str + "WORKLOAD SCHEDULE POLICY " + policyName + " "; + + str = str + " CONDITIONS( "; + if (conditions != null) { + for (WorkloadConditionMeta wcm : conditions) { + str += wcm.toString() + ","; + } + } + str = str.substring(0, str.length() - 1); + str = str + ")"; + + str = str + " ACTIONS( "; + if (actions != null) { + for (WorkloadActionMeta wam : actions) { + str = str + wam.toString() + ","; + } + } + str = str.substring(0, str.length() - 1); + str = str + ")"; + + str = str + " PROPERTIES("; + for (Map.Entry entry : properties.entrySet()) { + str = str + "\"" + entry.getKey() + "\"" + "=" + "\"" + entry.getValue() + "\","; + } + str = str.substring(0, str.length() - 1); + str = str + ")"; + + return str; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 2285db603b074f0..c8b49a5ebaa2675 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -445,6 +445,19 @@ public Long getWorkloadGroupIdByName(String name) { } } + public String getWorkloadGroupNameById(Long id) { + readLock(); + try { + WorkloadGroup wg = idToWorkloadGroup.get(id); + if (wg == null) { + return null; + } + return wg.getName(); + } finally { + readUnlock(); + } + } + // for ut public Map getNameToWorkloadGroup() { return nameToWorkloadGroup; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java index 776c0bccfdc4ed2..57f6ba379937e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java @@ -17,9 +17,11 @@ package org.apache.doris.resource.workloadschedpolicy; +import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; public class WorkloadActionMeta { @@ -44,4 +46,18 @@ static WorkloadActionType getWorkloadActionType(String strType) throws UserExcep } throw new UserException("invalid action type " + strType); } + + public String toString() { + if (StringUtils.isEmpty(actionArgs)) { + return action.toString(); + } else { + String retActionArgs = actionArgs; + if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(action)) { + retActionArgs = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupNameById(Long.valueOf(actionArgs)); + } + retActionArgs = retActionArgs == null ? "-1" : retActionArgs; + return action + " \"" + retActionArgs + "\""; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java index 8aa53a6f340abcb..ac4c51acdff0ee5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java @@ -39,6 +39,24 @@ static WorkloadConditionOperator getOperator(String op) throws UserException { } } + // used for select tvf + static String getOperatorStr(WorkloadConditionOperator op) { + switch (op) { + case EQUAL: + return "="; + case GREATER: + return ">"; + case GREATER_EQUAL: + return ">="; + case LESS: + return "<"; + case LESS_EQUAl: + return "<="; + default: + throw new RuntimeException("unexpected compare operator " + op); + } + } + static boolean compareInteger(WorkloadConditionOperator operator, long firstArgs, long secondArgs) { switch (operator) { case EQUAL: diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java index c6bfb526b9b0350..d5d2f922f3fdf96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -49,6 +49,6 @@ private static WorkloadMetricType getMetricType(String metricStr) throws UserExc } public String toString() { - return metricName + " " + op + " " + value; + return metricName + " " + WorkloadConditionCompareUtils.getOperatorStr(op) + " " + value; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java index 27d821c32c0d11a..b6a98633c583d8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java @@ -26,5 +26,5 @@ public class WorkloadQueryInfo { String queryId = null; TUniqueId tUniqueId = null; ConnectContext context = null; - Map metricMap; + public Map metricMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 7186d4409a55cb6..827c23671338358 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -62,6 +62,15 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { private List workloadConditionList; private List workloadActionList; + // for ut + public WorkloadSchedPolicy() { + } + + // for ut + public void setWorkloadConditionList(List workloadConditionList) { + this.workloadConditionList = workloadConditionList; + } + public WorkloadSchedPolicy(long id, String name, List workloadConditionList, List workloadActionList, Map properties) throws UserException { this.id = id; @@ -77,7 +86,7 @@ public WorkloadSchedPolicy(long id, String name, List workloa // return false, // 1 metric not match // 2 condition value not match query info's value - boolean isMatch(WorkloadQueryInfo queryInfo) { + public boolean isMatch(WorkloadQueryInfo queryInfo) { for (WorkloadCondition condition : workloadConditionList) { WorkloadMetricType metricType = condition.getMetricType(); String value = queryInfo.metricMap.get(metricType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 9e2e4cd91afae70..346e34796c7aa20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -34,12 +34,12 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.thrift.TUserIdentity; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -69,7 +69,7 @@ public class WorkloadSchedPolicyMgr implements Writable, GsonPostProcessable { public static final ImmutableList WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES = new ImmutableList.Builder() - .add("Id").add("Name").add("ItemName").add("ItemValue") + .add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version") .build(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -349,7 +349,8 @@ private void checkProperties(Map properties) throws UserExceptio throw new UserException("policy's priority can only between 0 ~ 100"); } } catch (NumberFormatException e) { - throw new UserException("policy's priority must be a number, input value=" + priorityStr); + throw new UserException( + "invalid priority property value, it must be a number, input value=" + priorityStr); } } } @@ -448,6 +449,11 @@ public List> getShowPolicyInfo() { return policyProcNode.fetchResult(currentUserIdentity).getRows(); } + public List> getWorkloadSchedPolicyTvfInfo(TUserIdentity tcurrentUserIdentity) { + UserIdentity currentUserIdentity = UserIdentity.fromThrift(tcurrentUserIdentity); + return policyProcNode.fetchResult(currentUserIdentity).getRows(); + } + public class PolicyProcNode { public ProcResult fetchResult(UserIdentity currentUserIdentity) { BaseProcResult result = new BaseProcResult(); @@ -460,54 +466,31 @@ public ProcResult fetchResult(UserIdentity currentUserIdentity) { continue; } - String pId = String.valueOf(policy.getId()); + List row = new ArrayList<>(); String pName = policy.getName(); + row.add(String.valueOf(policy.getId())); + row.add(pName); List conditionList = policy.getConditionMetaList(); + StringBuilder cmStr = new StringBuilder(); for (WorkloadConditionMeta cm : conditionList) { - List condRow = new ArrayList<>(); - condRow.add(pId); - condRow.add(pName); - condRow.add("condition"); - condRow.add(cm.toString()); - result.addRow(condRow); + cmStr.append(cm.toString()).append(";"); } + String retStr = cmStr.toString().toLowerCase(); + row.add(retStr.substring(0, retStr.length() - 1)); List actionList = policy.getActionMetaList(); - for (WorkloadActionMeta workloadActionMeta : actionList) { - List actionRow = new ArrayList<>(); - actionRow.add(pId); - actionRow.add(pName); - actionRow.add("action"); - if (StringUtils.isEmpty(workloadActionMeta.actionArgs)) { - actionRow.add(workloadActionMeta.action.toString()); - } else { - actionRow.add(workloadActionMeta.action + " " + workloadActionMeta.actionArgs); - } - result.addRow(actionRow); + StringBuilder actionStr = new StringBuilder(); + for (WorkloadActionMeta am : actionList) { + actionStr.append(am.toString()).append(";"); } + String retStr2 = actionStr.toString().toLowerCase(); + row.add(retStr2.substring(0, retStr2.length() - 1)); - List prioRow = new ArrayList<>(); - prioRow.add(pId); - prioRow.add(pName); - prioRow.add("priority"); - prioRow.add(String.valueOf(policy.getPriority())); - result.addRow(prioRow); - - List enabledRow = new ArrayList<>(); - enabledRow.add(pId); - enabledRow.add(pName); - enabledRow.add("enabled"); - enabledRow.add(String.valueOf(policy.isEnabled())); - result.addRow(enabledRow); - - - List versionRow = new ArrayList<>(); - versionRow.add(pId); - versionRow.add(pName); - versionRow.add("version"); - versionRow.add(String.valueOf(policy.getVersion())); - result.addRow(versionRow); + row.add(String.valueOf(policy.getPriority())); + row.add(String.valueOf(policy.isEnabled())); + row.add(String.valueOf(policy.getVersion())); + result.addRow(row); } } finally { readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index e8620b105b6a341..9c773b37dca50da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -116,6 +116,9 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData case QUERIES: result = queriesMetadataResult(params, request); break; + case WORKLOAD_SCHED_POLICY: + result = workloadSchedPolicyMetadataResult(params); + break; default: return errorResult("Metadata table params is not set."); } @@ -383,6 +386,33 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat return result; } + private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) { + if (!params.isSetCurrentUserIdent()) { + return errorResult("current user ident is not set."); + } + + TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent(); + List> workloadPolicyList = Env.getCurrentEnv().getWorkloadSchedPolicyMgr() + .getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List dataBatch = Lists.newArrayList(); + for (List policyRow : workloadPolicyList) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action + trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority + trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled + trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version + dataBatch.add(trow); + } + + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params, TFetchSchemaTableDataRequest parentRequest) { if (!params.isSetQueriesMetadataParams()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index d2c3278314efc25..53a0b7ee5b801e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -51,6 +51,8 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case QUERIES: return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); + case WORKLOAD_SCHED_POLICY: + return WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName); default: throw new AnalysisException("Unknown Metadata TableValuedFunction type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index b14a09769cb13d7..c9547c91bd229c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -76,6 +76,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map SCHEMA = ImmutableList.of( + new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)), + new Column("Name", ScalarType.createStringType()), + new Column("Condition", ScalarType.createType(PrimitiveType.STRING)), + new Column("Action", ScalarType.createType(PrimitiveType.STRING)), + new Column("Priority", ScalarType.createType(PrimitiveType.INT)), + new Column("Enabled", ScalarType.createType(PrimitiveType.BOOLEAN)), + new Column("Version", ScalarType.createType(PrimitiveType.INT))); + + private static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + + public static Integer getColumnIndexFromColumnName(String columnName) { + return COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } + + public WorkloadSchedPolicyTableValuedFunction(Map params) { + if (params.size() > 0) { + throw new org.apache.doris.nereids.exceptions.AnalysisException( + "workload schedule policy table-valued-function does not support any params"); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.WORKLOAD_SCHED_POLICY; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.WORKLOAD_SCHED_POLICY); + return metaScanRange; + } + + @Override + public String getTableName() { + return "WorkloadSchedPolicyTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + return SCHEMA; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java new file mode 100644 index 000000000000000..11c00eca234161e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource; + +import org.apache.doris.resource.workloadschedpolicy.WorkloadCondition; +import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionOperator; +import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionQueryTime; +import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionUsername; +import org.apache.doris.resource.workloadschedpolicy.WorkloadMetricType; +import org.apache.doris.resource.workloadschedpolicy.WorkloadQueryInfo; +import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class WorkloadSchedTest { + + @Test + public void testPolicyCondition() { + // 1 test compare operator + // 1.1 > + { + List operatorList = new ArrayList<>(); + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 1.2 >= + { + List operatorList = new ArrayList<>(); + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER_EQUAL, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 1.3 = + { + List operatorList = new ArrayList<>(); + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 1.4 < + { + List operatorList = new ArrayList<>(); + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.LESS, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "99"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 1.5 <= + { + List operatorList = new ArrayList<>(); + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.LESS_EQUAl, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 2 string compare + { + List operatorList = new ArrayList<>(); + WorkloadCondition strCondition = new WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root"); + operatorList.add(strCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match + queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + // 3 mixed condition + { + List operatorList = new ArrayList<>(); + WorkloadCondition strCondition = new WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root"); + operatorList.add(strCondition); + + WorkloadCondition intCondition = new WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100); + operatorList.add(intCondition); + + WorkloadSchedPolicy workloadSchedPolicy1 = new WorkloadSchedPolicy(); + workloadSchedPolicy1.setWorkloadConditionList(operatorList); + + WorkloadQueryInfo queryInfo = new WorkloadQueryInfo(); + queryInfo.metricMap = new HashMap<>(); + queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root"); + queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100"); + + // match + Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match 1 + queryInfo.metricMap.remove(WorkloadMetricType.USERNAME); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + + // not match 2 + queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc"); + Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo)); + } + + } + +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 4f101f1177e121f..2d0f380dbceca70 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -702,6 +702,7 @@ enum TMetadataType { JOBS, TASKS, QUERIES, + WORKLOAD_SCHED_POLICY } enum TIcebergQueryType { diff --git a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out new file mode 100644 index 000000000000000..4e8482384c46a2f --- /dev/null +++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_policy_tvf -- +full_policy_policy query_time > 10;username = root set_session_variable "workload_group=normal" 10 false 0 +move_action_policy username = root move_query_to_group "normal" 0 true 0 +set_action_policy query_time > 10;username = root set_session_variable "workload_group=normal" 0 true 0 +test_cancel_policy query_time > 10 cancel_query 0 false 0 + +-- !select_policy_tvf_after_drop -- + diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy new file mode 100644 index 000000000000000..be4d7411e3d127c --- /dev/null +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_workload_sched_policy") { + + sql "set experimental_enable_nereids_planner = false;" + + sql "drop workload schedule policy if exists full_policy_policy;" + sql "drop workload schedule policy if exists set_action_policy;" + sql "drop workload schedule policy if exists move_action_policy;" + sql "drop workload schedule policy if exists test_cancel_policy;" + sql "drop workload schedule policy if exists test_set_var_policy;" + sql "drop workload schedule policy if exists test_set_var_policy2;" + + // 1 create cancel policy + sql "create workload schedule policy test_cancel_policy " + + " conditions(query_time > 10) " + + " actions(cancel_query) properties('enabled'='false'); " + + // 2 create cancel policy + sql "create workload schedule policy move_action_policy " + + "conditions(username='root') " + + "actions(move_query_to_group 'normal');" + + // 3 create set policy + sql "create workload schedule policy set_action_policy " + + "conditions(query_time > 10, username='root') " + + "actions(set_session_variable 'workload_group=normal');" + + // 4 create policy with property + sql "create workload schedule policy full_policy_policy " + + "conditions(query_time > 10, username='root') " + + "actions(set_session_variable 'workload_group=normal') " + + "properties( " + + "'enabled' = 'false', " + + "'priority'='10' " + + ");" + + qt_select_policy_tvf "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" + + // test_alter + sql "alter workload schedule policy full_policy_policy properties('priority'='2', 'enabled'='false');" + + // create failed check + try { + sql "create workload schedule policy failed_policy " + + "conditions(abc > 123) actions(cancel_query);" + } catch(Exception e) { + assertTrue(e.getMessage().contains("invalid metric name")) + } + + try { + sql "create workload schedule policy failed_policy " + + "conditions(query_time > 123) actions(abc);" + } catch(Exception e) { + assertTrue(e.getMessage().contains("invalid action type")) + } + + try { + sql "alter workload schedule policy full_policy_policy properties('priority'='abc');" + } catch (Exception e) { + assertTrue(e.getMessage().contains("invalid priority property value")) + } + + try { + sql "alter workload schedule policy full_policy_policy properties('enabled'='abc');" + } catch (Exception e) { + assertTrue(e.getMessage().contains("invalid enabled property value")) + } + + try { + sql "alter workload schedule policy full_policy_policy properties('priority'='10000');" + } catch (Exception e) { + assertTrue(e.getMessage().contains("priority can only between")) + } + + try { + sql "create workload schedule policy conflict_policy " + + "conditions (username = 'root')" + + "actions(cancel_query, move_query_to_group 'normal');" + } catch (Exception e) { + assertTrue(e.getMessage().contains("can not exist in one policy at same time")) + } + + try { + sql "create workload schedule policy conflict_policy " + + "conditions (username = 'root') " + + "actions(cancel_query, cancel_query);" + } catch (Exception e) { + assertTrue(e.getMessage().contains("duplicate action in one policy")) + } + + try { + sql "create workload schedule policy conflict_policy " + + "conditions (username = 'root') " + + "actions(set_session_variable 'workload_group=normal', set_session_variable 'workload_group=abc');" + } catch (Exception e) { + assertTrue(e.getMessage().contains("duplicate set_session_variable action args one policy")) + } + + // drop + sql "drop workload schedule policy full_policy_policy;" + sql "drop workload schedule policy set_action_policy;" + sql "drop workload schedule policy move_action_policy;" + sql "drop workload schedule policy test_cancel_policy;" + + qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from workload_schedule_policy() order by name;" + + // test workload schedule policy + sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '500');" + sql """drop user if exists test_workload_sched_user""" + sql """create user test_workload_sched_user identified by '12345'""" + sql """grant ADMIN_PRIV on *.*.* to test_workload_sched_user""" + + // 1 create test_set_var_policy + sql "create workload schedule policy test_set_var_policy conditions(username='test_workload_sched_user')" + + "actions(set_session_variable 'parallel_pipeline_task_num=33');" + def result1 = connect(user = 'test_workload_sched_user', password = '12345', url = context.config.jdbcUrl) { + logger.info("begin sleep 15s to wait") + Thread.sleep(15000) + sql "show variables like '%parallel_pipeline_task_num%';" + } + assertEquals("parallel_pipeline_task_num", result1[0][0]) + assertEquals("33", result1[0][1]) + + // 2 create test_set_var_policy2 with higher priority + sql "create workload schedule policy test_set_var_policy2 conditions(username='test_workload_sched_user') " + + "actions(set_session_variable 'parallel_pipeline_task_num=22') properties('priority'='10');" + def result2 = connect(user = 'test_workload_sched_user', password = '12345', url = context.config.jdbcUrl) { + Thread.sleep(3000) + sql "show variables like '%parallel_pipeline_task_num%';" + } + assertEquals("parallel_pipeline_task_num", result2[0][0]) + assertEquals("22", result2[0][1]) + + // 3 disable test_set_var_policy2 + sql "alter workload schedule policy test_set_var_policy2 properties('enabled'='false');" + def result3 = connect(user = 'test_workload_sched_user', password = '12345', url = context.config.jdbcUrl) { + Thread.sleep(3000) + sql "show variables like '%parallel_pipeline_task_num%';" + } + assertEquals("parallel_pipeline_task_num", result3[0][0]) + assertEquals("33", result3[0][1]) + + sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '10000');" + + sql "drop workload schedule policy if exists full_policy_policy;" + sql "drop workload schedule policy if exists set_action_policy;" + sql "drop workload schedule policy if exists move_action_policy;" + sql "drop workload schedule policy if exists test_cancel_policy;" + sql "drop workload schedule policy if exists test_set_var_policy;" + sql "drop workload schedule policy if exists test_set_var_policy2;" + +} \ No newline at end of file