From 74738891942ae501282430f6d2ef98c608f5fd6f Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 24 Jun 2024 20:27:25 +0800 Subject: [PATCH] Fix task cannot use workflow's environment (#16199) (cherry picked from commit b34fe4604423ff209e42a6a65ac2e75770625143) --- .../api/dto/task/TaskCreateRequest.java | 4 +- .../api/service/WorkerGroupService.java | 8 --- .../impl/ProcessDefinitionServiceImpl.java | 4 +- ...ProjectWorkerGroupRelationServiceImpl.java | 3 +- .../service/impl/WorkerGroupServiceImpl.java | 27 +------ .../service/ExecuteFunctionServiceTest.java | 29 ++++---- .../service/ProcessDefinitionServiceTest.java | 3 +- .../api/service/WorkerGroupServiceTest.java | 44 ------------ .../common/constants/Constants.java | 5 -- .../dao/utils/EnvironmentUtils.java | 54 ++++++++++++++ .../dao/utils/WorkerGroupUtils.java | 45 ++++++++++++ .../dao/mapper/CommandMapperTest.java | 4 +- .../repository/impl/CommandDaoImplTest.java | 4 +- .../dao/utils/EnvironmentUtilsTest.java | 72 +++++++++++++++++++ .../dao/utils/WorkerGroupUtilsTest.java | 70 ++++++++++++++++++ .../e2e/cases/WorkflowJavaTaskE2ETest.java | 4 +- .../dispatch/context/ExecutionContext.java | 57 --------------- .../master/registry/ServerNodeManager.java | 10 ++- .../runner/StreamTaskExecuteRunnable.java | 13 ++-- .../runner/WorkflowExecuteRunnable.java | 26 +++---- .../utils/WorkflowInstanceUtilsTest.java | 3 +- .../scheduler/quartz/ProcessScheduleTask.java | 7 +- .../service/process/ProcessServiceImpl.java | 8 +-- 23 files changed, 305 insertions(+), 199 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java index 1e651ee6e345..bb362962fa9b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java @@ -19,11 +19,11 @@ import static org.apache.dolphinscheduler.common.constants.Constants.VERSION_FIRST; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import java.util.Date; @@ -107,7 +107,7 @@ public TaskDefinition convert2TaskDefinition() { taskDefinition.setProjectCode(this.projectCode); taskDefinition.setTaskType(this.taskType); taskDefinition.setTaskParams(this.taskParams); - taskDefinition.setWorkerGroup(this.workerGroup == null ? Constants.DEFAULT_WORKER_GROUP : this.workerGroup); + taskDefinition.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)); taskDefinition.setEnvironmentCode(this.environmentCode); taskDefinition.setFailRetryTimes(this.failRetryTimes); taskDefinition.setFailRetryInterval(this.failRetryInterval); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 2c87e4be2ef3..b85d3912cb7d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import java.util.List; @@ -77,13 +76,6 @@ Map saveWorkerGroup(User loginUser, int id, String name, String */ Map getWorkerAddressList(); - /** - * Get task instance's worker group - * @param taskInstance task instance - * @return worker group - */ - String getTaskWorkerGroup(TaskInstance taskInstance); - /** * Query worker group by process definition codes * @param processDefinitionCodeList processDefinitionCodeList diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index e6893b04fbf1..6a4251cef1f5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -34,7 +34,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS; import static org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX; import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS; @@ -109,6 +108,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -1390,7 +1390,7 @@ private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSour taskDefinition.setFailRetryTimes(0); taskDefinition.setFailRetryInterval(0); taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE); - taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP); + taskDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); taskDefinition.setTaskPriority(Priority.MEDIUM); taskDefinition.setEnvironmentCode(-1); taskDefinition.setTimeout(0); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java index 31dc113dbb1d..c11915667fc4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections4.SetUtils; @@ -118,7 +119,7 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect( Collectors.toSet()); - workerGroupNames.add(Constants.DEFAULT_WORKER_GROUP); + workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup()); Set assignedWorkerGroupNames = new HashSet<>(workerGroups); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 9c9888074043..465234798749 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; @@ -41,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -357,12 +357,12 @@ private List getWorkerGroups(List ids) { workerGroups = workerGroupMapper.queryAllWorkerGroup(); } boolean containDefaultWorkerGroups = workerGroups.stream() - .anyMatch(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())); + .anyMatch(workerGroup -> WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName())); if (!containDefaultWorkerGroups) { // there doesn't exist a default WorkerGroup, we will add all worker to the default worker group. Set activeWorkerNodes = registryClient.getServerNodeSet(RegistryNodeType.WORKER); WorkerGroup defaultWorkerGroup = new WorkerGroup(); - defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP); + defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup()); defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes)); defaultWorkerGroup.setCreateTime(new Date()); defaultWorkerGroup.setUpdateTime(new Date()); @@ -431,27 +431,6 @@ public Map getWorkerAddressList() { return result; } - @Override - public String getTaskWorkerGroup(TaskInstance taskInstance) { - if (taskInstance == null) { - return null; - } - - String workerGroup = taskInstance.getWorkerGroup(); - - if (StringUtils.isNotEmpty(workerGroup)) { - return workerGroup; - } - int processInstanceId = taskInstance.getProcessInstanceId(); - ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId); - - if (processInstance != null) { - return processInstance.getWorkerGroup(); - } - log.info("task : {} will use default worker group", taskInstance.getId()); - return Constants.DEFAULT_WORKER_GROUP; - } - @Override public Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList) { List processDefinitionScheduleList = diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index 8f1869c1fbc4..04dabf5eadcb 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -67,6 +67,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -271,7 +272,7 @@ public void testNoComplement() { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, null, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 10, null, null, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, @@ -298,7 +299,7 @@ public void testComplementWithStartNodeList() { null, "123456789,987654321", null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, null, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, @@ -323,7 +324,7 @@ public void testComplementWithOldStartNodeList() { null, "1123456789,987654321", null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, @@ -354,14 +355,14 @@ public void testComplementWithDependentMode() { dependentProcessDefinition.setProcessDefinitionCode(2); dependentProcessDefinition.setProcessDefinitionVersion(1); dependentProcessDefinition.setTaskDefinitionCode(1); - dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); dependentProcessDefinition.setTaskParams( "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode)) .thenReturn(Lists.newArrayList(dependentProcessDefinition)); Map processDefinitionWorkerGroupMap = new HashMap<>(); - processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP); + processDefinitionWorkerGroupMap.put(1L, WorkerGroupUtils.getDefaultWorkerGroup()); Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L))) .thenReturn(processDefinitionWorkerGroupMap); @@ -370,7 +371,7 @@ public void testComplementWithDependentMode() { command.setCommandType(CommandType.COMPLEMENT_DATA); command.setCommandParam( "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}"); - command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); command.setProcessDefinitionCode(processDefinitionCode); command.setExecutorId(1); @@ -383,7 +384,7 @@ public void testComplementWithDependentMode() { childDependent.setProcessDefinitionCode(3); childDependent.setProcessDefinitionVersion(1); childDependent.setTaskDefinitionCode(4); - childDependent.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); childDependent.setTaskParams( "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode( @@ -409,7 +410,8 @@ public void testDateError() { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 2, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, false, @@ -434,7 +436,7 @@ public void testSerial() { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, null, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, @@ -460,7 +462,8 @@ public void testParallelWithOutSchedule() { null, null, null, null, null, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 2, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, false, @@ -486,7 +489,7 @@ public void testParallelWithSchedule() { null, null, null, null, null, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, @@ -514,7 +517,7 @@ public void testNoMasterServers() { null, RunMode.RUN_MODE_PARALLEL, Priority.LOW, - Constants.DEFAULT_WORKER_GROUP, + WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, @@ -553,7 +556,7 @@ public void testOfTestRun() { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15, + Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_YES, ComplementDependentMode.OFF_MODE, null, diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index ed3a5f639b2c..37af53dbbd56 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -72,6 +72,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -1143,7 +1144,7 @@ private Schedule getSchedule() { schedule.setProcessInstancePriority(Priority.MEDIUM); schedule.setWarningType(WarningType.NONE); schedule.setWarningGroupId(1); - schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + schedule.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); return schedule; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 08a541c5bb94..fce9aa3f1c08 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; @@ -65,8 +64,6 @@ @MockitoSettings(strictness = Strictness.LENIENT) public class WorkerGroupServiceTest { - private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class); - private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class); private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class); @@ -288,47 +285,6 @@ public void testQueryAllGroupWithDefault() { Assertions.assertEquals("default", workerGroups.toArray()[0]); } - @Test - public void giveNull_whenGetTaskWorkerGroup_expectNull() { - String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null); - Assertions.assertNull(nullWorkerGroup); - } - - @Test - public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setWorkerGroup("cluster1"); - - String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); - Assertions.assertEquals("cluster1", workerGroup); - } - - @Test - public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setProcessInstanceId(1); - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(1); - processInstance.setWorkerGroup("cluster1"); - Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance); - - String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); - Assertions.assertEquals("cluster1", workerGroup); - } - - @Test - public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setProcessInstanceId(1); - Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null); - - String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); - Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup); - } - /** * get Group */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 5c56818fa0d1..d98dfdea8189 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -532,14 +532,9 @@ private Constants() { * session timeout */ public static final int SESSION_TIME_OUT = 7200; - public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024; public static final String UDF = "UDF"; public static final String CLASS = "class"; - /** - * default worker group - */ - public static final String DEFAULT_WORKER_GROUP = "default"; /** * authorize writable perm */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java new file mode 100644 index 000000000000..89ea647f75bd --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +public class EnvironmentUtils { + + private static final long EMPTY_ENVIRONMENT_CODE = -1L; + + /** + * Check if the environment code is empty (we should use null instead of -1, this is used to comply with the original code) + * + * @return true if the environment code is empty, false otherwise + */ + public static boolean isEnvironmentCodeEmpty(Long environmentCode) { + return environmentCode == null || environmentCode <= 0; + } + + /** + * Get the empty environment code + */ + public static Long getDefaultEnvironmentCode() { + return EMPTY_ENVIRONMENT_CODE; + } + + /** + * Get the environment code or the default environment code if the environment code is empty + */ + public static Long getEnvironmentCodeOrDefault(Long environmentCode) { + return getEnvironmentCodeOrDefault(environmentCode, getDefaultEnvironmentCode()); + } + + /** + * Get the environment code or the default environment code if the environment code is empty + */ + public static Long getEnvironmentCodeOrDefault(Long environmentCode, Long defaultEnvironmentCode) { + return isEnvironmentCodeEmpty(environmentCode) ? defaultEnvironmentCode : environmentCode; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java new file mode 100644 index 000000000000..2436d45a027e --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import org.apache.commons.lang3.StringUtils; + +public class WorkerGroupUtils { + + private static final String DEFAULT_WORKER_GROUP = "default"; + + /** + * Check if the worker group is empty, if the worker group is default, it is considered empty + */ + public static boolean isWorkerGroupEmpty(String workerGroup) { + return StringUtils.isEmpty(workerGroup) || getDefaultWorkerGroup().equals(workerGroup); + } + + public static String getWorkerGroupOrDefault(String workerGroup) { + return getWorkerGroupOrDefault(workerGroup, getDefaultWorkerGroup()); + } + + public static String getWorkerGroupOrDefault(String workerGroup, String defaultWorkerGroup) { + return isWorkerGroupEmpty(workerGroup) ? defaultWorkerGroup : workerGroup; + } + + public static String getDefaultWorkerGroup() { + return DEFAULT_WORKER_GROUP; + } + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index 2105885fa390..3f4b0768aa9f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -19,7 +19,6 @@ import static com.google.common.truth.Truth.assertThat; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.CommandCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import java.util.Date; import java.util.HashMap; @@ -303,7 +303,7 @@ private Command createCommand(CommandType commandType, long processDefinitionCod command.setProcessInstancePriority(Priority.MEDIUM); command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); - command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); command.setProcessInstanceId(0); command.setProcessDefinitionVersion(0); commandMapper.insert(command); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java index 85867ef3b591..1bc55f819183 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java @@ -19,7 +19,6 @@ import static com.google.common.truth.Truth.assertThat; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; @@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.commons.lang3.RandomUtils; @@ -80,7 +80,7 @@ private void createCommand(CommandType commandType, int processDefinitionCode) { command.setProcessInstancePriority(Priority.MEDIUM); command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); - command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); command.setProcessInstanceId(0); command.setProcessDefinitionVersion(0); commandDao.insert(command); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java new file mode 100644 index 000000000000..dd5356169d4e --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +class EnvironmentUtilsTest { + + @ParameterizedTest + @ValueSource(longs = {0, -1}) + void testIsEnvironmentCodeEmpty_emptyEnvironmentCode(Long environmentCode) { + assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isTrue(); + } + + @ParameterizedTest + @ValueSource(longs = {123}) + void testIsEnvironmentCodeEmpty_nonEmptyEnvironmentCode(Long environmentCode) { + assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isFalse(); + } + + @Test + void testGetDefaultEnvironmentCode() { + assertThat(EnvironmentUtils.getDefaultEnvironmentCode()).isEqualTo(-1L); + } + + @ParameterizedTest + @ValueSource(longs = {0, -1}) + void testGetEnvironmentCodeOrDefault_emptyEnvironmentCode(Long environmentCode) { + assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(-1L); + } + + @ParameterizedTest + @ValueSource(longs = {123}) + void testGetEnvironmentCodeOrDefault_nonEmptyEnvironmentCode(Long environmentCode) { + assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(environmentCode); + } + + @ParameterizedTest + @CsvSource(value = {",123", "-1,123"}) + void testGetEnvironmentCodeOrDefault_withDefaultValue_emptyEnvironmentCode(Long environmentCode, + Long defaultValue) { + assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, defaultValue)).isEqualTo(defaultValue); + } + + @ParameterizedTest + @CsvSource(value = {"1,123"}) + void testGetEnvironmentCodeOrDefault_withDefaultValue_nonEmptyEnvironmentCode(Long environmentCode, + Long defaultValue) { + assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, defaultValue)) + .isEqualTo(environmentCode); + } +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java new file mode 100644 index 000000000000..60bf74695dce --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +class WorkerGroupUtilsTest { + + @ParameterizedTest + @ValueSource(strings = {"", "default"}) + void testIsWorkerGroupEmpty_emptyWorkerGroup(String workerGroup) { + assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isTrue(); + } + + @ParameterizedTest + @ValueSource(strings = {"123", "default1"}) + void testIsWorkerGroupEmpty_nonEmptyWorkerGroup(String workerGroup) { + assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isFalse(); + } + + @ParameterizedTest + @ValueSource(strings = {"", "default"}) + void testGetWorkerGroupOrDefault_emptyWorkerGroup(String workerGroup) { + assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)) + .isEqualTo(WorkerGroupUtils.getDefaultWorkerGroup()); + } + + @ParameterizedTest + @ValueSource(strings = {"test"}) + void testGetWorkerGroupOrDefault_nonEmptyWorkerGroup(String workerGroup) { + assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup); + } + + @ParameterizedTest + @CsvSource(value = {",test", "default,test"}) + void testGetWorkerGroupOrDefault_withDefaultValue_emptyWorkerGroup(String workerGroup, String defaultValue) { + assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup, defaultValue)).isEqualTo(defaultValue); + } + + @ParameterizedTest + @CsvSource(value = {"test1,test"}) + void testGetWorkerGroupOrDefault_withDefaultValue_nonEmptyWorkerGroup(String workerGroup, String defaultValue) { + assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup); + } + + @Test + void getDefaultWorkerGroup() { + assertThat(WorkerGroupUtils.getDefaultWorkerGroup()).isEqualTo("default"); + } +} diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java index d61a9ddd2a40..77c4e554bc72 100644 --- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java +++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java @@ -92,8 +92,8 @@ public static void setup() { .goToNav(SecurityPage.class) .goToTab(UserPage.class); - new WebDriverWait(userPage.driver(), Duration.ofSeconds(20)).until(ExpectedConditions.visibilityOfElementLocated( - new By.ByClassName("name"))); + new WebDriverWait(userPage.driver(), Duration.ofSeconds(20)) + .until(ExpectedConditions.visibilityOfElementLocated(new By.ByClassName("name"))); userPage.update(user, user, email, phone, tenant) .goToNav(ProjectPage.class) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java deleted file mode 100644 index 8ad401386827..000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.dispatch.context; - -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; - -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.extract.base.utils.Host; -import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class ExecutionContext { - - private Host host; - - private TaskInstance taskInstance; - - private ExecutorType executorType; - - /** - * worker group - */ - private String workerGroup; - - public ExecutionContext(ExecutorType executorType, TaskInstance taskInstance) { - this(executorType, DEFAULT_WORKER_GROUP, taskInstance); - } - - public ExecutionContext(ExecutorType executorType, String workerGroup, TaskInstance taskInstance) { - this.executorType = executorType; - this.workerGroup = workerGroup; - this.taskInstance = taskInstance; - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index f066f0403d8d..d532168b2ad6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Event.Type; import org.apache.dolphinscheduler.registry.api.RegistryClient; @@ -36,7 +37,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; @@ -273,8 +273,8 @@ private void updateWorkerGroupMappings() { .filter(workerNodeInfo::containsKey).collect(Collectors.toSet()); tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes); } - if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) { - tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet()); + if (!tmpWorkerGroupMappings.containsKey(WorkerGroupUtils.getDefaultWorkerGroup())) { + tmpWorkerGroupMappings.put(WorkerGroupUtils.getDefaultWorkerGroup(), workerNodeInfo.keySet()); } } finally { workerNodeInfoReadLock.unlock(); @@ -307,9 +307,7 @@ public Map> getWorkerGroupNodes() { public Set getWorkerGroupNodes(String workerGroup) throws WorkerGroupNotFoundException { workerGroupReadLock.lock(); try { - if (StringUtils.isEmpty(workerGroup)) { - workerGroup = Constants.DEFAULT_WORKER_GROUP; - } + workerGroup = WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup); Set nodes = workerGroupNodes.get(workerGroup); if (nodes == null) { throw new WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", workerGroup)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 2f61507e727b..b09d9f1b1a8b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; - import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; @@ -31,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener; @@ -270,12 +270,11 @@ public TaskInstance newTaskInstance(TaskDefinition taskDefinition) { // task dry run flag taskInstance.setDryRun(taskExecuteStartMessage.getDryRun()); - taskInstance.setWorkerGroup(StringUtils.isBlank(taskDefinition.getWorkerGroup()) ? DEFAULT_WORKER_GROUP - : taskDefinition.getWorkerGroup()); - taskInstance.setEnvironmentCode( - taskDefinition.getEnvironmentCode() == 0 ? -1 : taskDefinition.getEnvironmentCode()); + taskInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(taskDefinition.getWorkerGroup())); + taskInstance + .setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(taskDefinition.getEnvironmentCode())); - if (!taskInstance.getEnvironmentCode().equals(-1L)) { + if (!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) { Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode()); if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) { taskInstance.setEnvironmentConfig(environment.getConfig()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 31779471862e..6cccef88e159 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -25,8 +25,9 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.constants.Constants.COMMA; -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS; +import static org.apache.dolphinscheduler.dao.utils.EnvironmentUtils.getEnvironmentCodeOrDefault; +import static org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils.getWorkerGroupOrDefault; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; import org.apache.dolphinscheduler.common.constants.Constants; @@ -51,7 +52,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; @@ -1119,25 +1122,22 @@ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode ta taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); } - String processWorkerGroup = processInstance.getWorkerGroup(); - processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup; - String taskWorkerGroup = - StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup(); + String processWorkerGroup = getWorkerGroupOrDefault(processInstance.getWorkerGroup()); + String taskWorkerGroup = getWorkerGroupOrDefault(taskNode.getWorkerGroup()); - Long processEnvironmentCode = - Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode(); - Long taskEnvironmentCode = - Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode(); + Long processEnvironmentCode = getEnvironmentCodeOrDefault(processInstance.getEnvironmentCode()); + Long taskEnvironmentCode = getEnvironmentCodeOrDefault(taskNode.getEnvironmentCode()); - if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) { + if (WorkerGroupUtils.isWorkerGroupEmpty(taskWorkerGroup)) { + // If the task workerGroup is empty, then use the workflow workerGroup/environment taskInstance.setWorkerGroup(processWorkerGroup); - taskInstance.setEnvironmentCode(processEnvironmentCode); + taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode, processEnvironmentCode)); } else { taskInstance.setWorkerGroup(taskWorkerGroup); - taskInstance.setEnvironmentCode(taskEnvironmentCode); + taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode, processEnvironmentCode)); } - if (!taskInstance.getEnvironmentCode().equals(-1L)) { + if (!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) { Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode()); if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) { taskInstance.setEnvironmentConfig(environment.getConfig()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java index d52c436add35..5b4bc18ca16a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import java.sql.Date; @@ -52,7 +53,7 @@ public void testLogWorkflowInstanceInDetails() { workflowInstance.setDryRun(0); workflowInstance.setTenantCode("default"); workflowInstance.setRestartTime(Date.valueOf("2023-08-01")); - workflowInstance.setWorkerGroup("default"); + workflowInstance.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup()); workflowInstance.setStartTime(Date.valueOf("2023-08-01")); workflowInstance.setEndTime(Date.valueOf("2023-08-01")); Assertions.assertEquals("\n" diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java index c6bd4cf93fea..8ce6480c4563 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java @@ -17,17 +17,15 @@ package org.apache.dolphinscheduler.scheduler.quartz; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.commons.lang3.StringUtils; - import java.util.Date; import lombok.extern.slf4j.Slf4j; @@ -93,8 +91,7 @@ protected void executeInternal(JobExecutionContext context) { command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); - String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP - : schedule.getWorkerGroup(); + String workerGroup = WorkerGroupUtils.getWorkerGroupOrDefault(schedule.getWorkerGroup()); command.setWorkerGroup(workerGroup); command.setTenantCode(schedule.getTenantCode()); command.setEnvironmentCode(schedule.getEnvironmentCode()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 635948fc860f..470226bec0a0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -107,6 +107,8 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.DqRuleUtils; +import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; @@ -573,10 +575,8 @@ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefi // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); - String workerGroup = StringUtils.defaultIfEmpty(command.getWorkerGroup(), Constants.DEFAULT_WORKER_GROUP); - processInstance.setWorkerGroup(workerGroup); - processInstance - .setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : command.getEnvironmentCode()); + processInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(command.getWorkerGroup())); + processInstance.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(command.getEnvironmentCode())); processInstance.setTimeout(processDefinition.getTimeout()); processInstance.setTenantCode(command.getTenantCode()); return processInstance;