diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 45bfecc50aff..10c617792420 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.RunMode; @@ -107,7 +108,9 @@ public class ExecutorController extends BaseController { @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"), @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8") + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"), + @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode") }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -130,7 +133,8 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant @RequestParam(value = "timeout", required = false) Integer timeout, @RequestParam(value = "startParams", required = false) String startParams, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, - @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) { + @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, + @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) { if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -139,8 +143,15 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant if (startParams != null) { startParamMap = JSONUtils.toMap(startParams); } - Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, - startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber, dryRun); + + if (complementDependentMode == null) { + complementDependentMode = ComplementDependentMode.OFF_MODE; + } + + Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, + scheduleTime, execType, failureStrategy, + startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, + workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode); return returnDataList(result); } @@ -181,7 +192,9 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"), @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8") + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"), + @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode") }) @PostMapping(value = "batch-start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -204,7 +217,8 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con @RequestParam(value = "timeout", required = false) Integer timeout, @RequestParam(value = "startParams", required = false) String startParams, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, - @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) { + @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, + @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) { if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -215,6 +229,10 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con startParamMap = JSONUtils.toMap(startParams); } + if (complementDependentMode == null) { + complementDependentMode = ComplementDependentMode.OFF_MODE; + } + Map result = new HashMap<>(); List processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA)); List startFailedProcessDefinitionCodeList = new ArrayList<>(); @@ -224,7 +242,9 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con for (String strProcessDefinitionCode : processDefinitionCodeArray) { long processDefinitionCode = Long.parseLong(strProcessDefinitionCode); result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, - startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun); + startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, + workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, + complementDependentMode); if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) { startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode)); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 2fa065bca216..1087d595eb75 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -19,6 +19,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.RunMode; @@ -63,7 +64,8 @@ Map execProcessInstance(User loginUser, long projectCode, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun); + int dryRun, + ComplementDependentMode complementDependentMode); /** * check whether the process definition can be executed diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5991e79f0a5e..6dc8ab72f409 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; +import org.apache.commons.beanutils.BeanUtils; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ExecutorService; @@ -31,6 +32,8 @@ import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -44,6 +47,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; @@ -66,9 +70,9 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,11 +101,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private MonitorService monitorService; - @Autowired private ProcessInstanceMapper processInstanceMapper; - @Autowired private ProcessService processService; @@ -138,7 +140,8 @@ public Map execProcessInstance(User loginUser, long projectCode, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun) { + int dryRun, + ComplementDependentMode complementDependentMode) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -175,7 +178,8 @@ public Map execProcessInstance(User loginUser, long projectCode, */ int create = this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun); + warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, + expectedParallelismNumber, dryRun, complementDependentMode); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -536,7 +540,7 @@ private int createCommand(CommandType commandType, long processDefineCode, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, - Map startParams, Integer expectedParallelismNumber, int dryRun) { + Map startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) { /** * instantiate command schedule instance @@ -599,7 +603,7 @@ private int createCommand(CommandType commandType, long processDefineCode, if (start == null || end == null) { return 0; } - return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber); + return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode); } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); @@ -608,15 +612,18 @@ private int createCommand(CommandType commandType, long processDefineCode, /** * create complement command - * close left open right + * close left and close right * * @param start * @param end * @param runMode * @return */ - private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, Integer expectedParallelismNumber) { + private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, + Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) { int createCount = 0; + int dependentProcessDefinitionCreateCount = 0; + runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; Map cmdParam = JSONUtils.toMap(command.getCommandParam()); switch (runMode) { @@ -629,6 +636,17 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); createCount = processService.createCommand(command); + + // dependent process definition + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + + if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { + logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " + + "dependent complement data", command.getProcessDefinitionCode()); + } else { + dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + } + break; } case RUN_MODE_PARALLEL: { @@ -637,7 +655,7 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C break; } - LinkedList listDate = new LinkedList<>(); + List listDate = new ArrayList<>(); List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); int listDateSize = listDate.size(); @@ -650,7 +668,7 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C } } logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); - + // Distribute the number of tasks equally to each command. // The last command with insufficient quantity will be assigned to the remaining tasks. int itemsPerCommand = (listDateSize / createCount); @@ -673,6 +691,13 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); processService.createCommand(command); + + if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { + logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " + + "dependent complement data", command.getProcessDefinitionCode()); + } else { + dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + } } } break; @@ -680,7 +705,81 @@ private int createComplementCommandList(Date start, Date end, RunMode runMode, C default: break; } - logger.info("create complement command count: {}", createCount); + logger.info("create complement command count: {}, create dependent complement command count: {}", createCount + , dependentProcessDefinitionCreateCount); return createCount; } + + /** + * create complement dependent command + */ + private int createComplementDependentCommand(List schedules, Command command) { + int dependentProcessDefinitionCreateCount = 0; + Command dependentCommand; + + try { + dependentCommand = (Command) BeanUtils.cloneBean(command); + } catch (Exception e) { + logger.error("copy dependent command error: ", e); + return dependentProcessDefinitionCreateCount; + } + + List dependentProcessDefinitionList = + getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), + CronUtils.getMaxCycle(schedules.get(0).getCrontab()), + dependentCommand.getWorkerGroup()); + + dependentCommand.setTaskDependType(TaskDependType.TASK_POST); + for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { + dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); + dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); + Map cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam()); + cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); + dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam)); + dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand); + } + + return dependentProcessDefinitionCreateCount; + } + + /** + * get complement dependent process definition list + */ + private List getComplementDependentDefinitionList(long processDefinitionCode, + CycleEnum processDefinitionCycle, + String workerGroup) { + List dependentProcessDefinitionList = + processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); + + return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup); + } + + /** + * Check whether the dependency cycle of the dependent node is consistent with the schedule cycle of + * the dependent process definition and if there is no worker group in the schedule, use the complement selection's + * worker group + */ + private List checkDependentProcessDefinitionValid(List dependentProcessDefinitionList, + CycleEnum processDefinitionCycle, + String workerGroup) { + List validDependentProcessDefinitionList = new ArrayList<>(); + + List processDefinitionCodeList = dependentProcessDefinitionList.stream() + .map(DependentProcessDefinition::getProcessDefinitionCode) + .collect(Collectors.toList()); + + Map processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); + + for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { + if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { + if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { + dependentProcessDefinition.setWorkerGroup(workerGroup); + } + + validDependentProcessDefinitionList.add(dependentProcessDefinition); + } + } + + return validDependentProcessDefinitionList; + } } diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 5c568ae1abbd..752313eeee18 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -179,6 +179,8 @@ PROCESS_INSTANCE_END_TIME=process instance end time PROCESS_INSTANCE_SIZE=process instance size PROCESS_INSTANCE_PRIORITY=process instance priority EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads +DRY_RUN=dry run +COMPLEMENT_DEPENDENT_MODE=complement dependent mode UPDATE_SCHEDULE_NOTES=update schedule SCHEDULE_ID=schedule id ONLINE_SCHEDULE_NOTES=online schedule diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 033621c0214e..9492795b119f 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -165,6 +165,8 @@ RECEIVERS_CC=收件人(抄送) WORKER_GROUP_ID=Worker Server分组ID PROCESS_INSTANCE_PRIORITY=流程实例优先级 EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度 +DRY_RUN=是否空跑 +COMPLEMENT_DEPENDENT_MODE=补数依赖的类型 UPDATE_SCHEDULE_NOTES=更新定时 SCHEDULE_ID=定时ID ONLINE_SCHEDULE_NOTES=定时上线 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java index c7c4b821b661..2d5c03a83a13 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java @@ -66,6 +66,7 @@ public class ExecutorControllerTest extends AbstractControllerTest { final ImmutableMap startParams = ImmutableMap.of("start", "params"); final Integer expectedParallelismNumber = 6; final int dryRun = 7; + final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE; final JsonObject expectResponseContent = gson .fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test Data\",\"success\":true,\"failed\":false}" @@ -102,7 +103,7 @@ public void testStartProcessInstanceWithFullParams() throws Exception { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun))) + eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(complementDependentMode))) .thenReturn(executeServiceResult); //When @@ -141,7 +142,8 @@ public void testStartProcessInstanceWithoutTimeout() throws Exception { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult); + eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), + eq(complementDependentMode))).thenReturn(executeServiceResult); //When final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode) @@ -178,7 +180,8 @@ public void testStartProcessInstanceWithoutStartParams() throws Exception { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult); + eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), + eq(complementDependentMode))).thenReturn(executeServiceResult); //When final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode) @@ -203,7 +206,8 @@ public void testStartProcessInstanceWithRequiredParams() throws Exception { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(null), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), eq(0), eq(null), eq(null), eq("default"), eq(-1L), - eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0))).thenReturn(executeServiceResult); + eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), + eq(complementDependentMode))).thenReturn(executeServiceResult); //When final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 90fb17301732..2e4c92f14a0d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; @@ -166,7 +167,8 @@ public void testNoComplement() { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -184,7 +186,8 @@ public void testComplementWithStartNodeList() { null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -202,7 +205,8 @@ public void testDateError() { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); } @@ -219,7 +223,8 @@ public void testSerial() { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } @@ -236,7 +241,8 @@ public void testParallelWithOutSchedule() { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); @@ -254,7 +260,8 @@ public void testParallelWithSchedule() { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); @@ -269,7 +276,8 @@ public void testNoMasterServers() { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO, + ComplementDependentMode.OFF_MODE); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java new file mode 100644 index 000000000000..68f8e573d22c --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ComplementDependentMode.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * task node depend type + */ +public enum ComplementDependentMode { + /** + * 0 off mode + * 1 run complement data with all dependent process + */ + OFF_MODE(0,"off mode"), + ALL_DEPENDENT(1,"all dependent"); + + ComplementDependentMode(int code, String desc) { + this.code = code; + this.desc = desc; + } + + @EnumValue + private final int code; + private final String desc; + + public int getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java new file mode 100644 index 000000000000..3952b40b23e9 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.util.List; + +/** + * dependent process definition + */ +public class DependentProcessDefinition { + + /** + * process definition code + */ + private long processDefinitionCode; + + /** + * process definition name + */ + private String processDefinitionName; + + /** + * task definition name + */ + private long taskDefinitionCode; + + /** + * task definition params + */ + private String taskParams; + + /** + * schedule worker group + */ + private String workerGroup; + + /** + * get dependent cycle + * @return CycleEnum + */ + public CycleEnum getDependentCycle() { + DependentParameters dependentParameters = this.getDependentParameters(); + List dependentTaskModelList = dependentParameters.getDependTaskList(); + + for (DependentTaskModel dependentTaskModel : dependentTaskModelList) { + List dependentItemList = dependentTaskModel.getDependItemList(); + for (DependentItem dependentItem : dependentItemList) { + if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) { + return cycle2CycleEnum(dependentItem.getCycle()); + } + } + } + + return CycleEnum.DAY; + } + + public CycleEnum cycle2CycleEnum(String cycle) { + CycleEnum cycleEnum = null; + + switch (cycle) { + case "day": + cycleEnum = CycleEnum.DAY; + break; + case "hour": + cycleEnum = CycleEnum.HOUR; + break; + case "week": + cycleEnum = CycleEnum.WEEK; + break; + case "month": + cycleEnum = CycleEnum.MONTH; + break; + default: + break; + } + return cycleEnum; + } + + public DependentParameters getDependentParameters() { + return JSONUtils.parseObject(getDependence(), DependentParameters.class); + } + + public String getDependence() { + return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); + } + + public String getProcessDefinitionName() { + return this.processDefinitionName; + } + + public void setprocessDefinitionName(String name) { + this.processDefinitionName = name; + } + + public long getProcessDefinitionCode() { + return this.processDefinitionCode; + } + + public void setProcessDefinitionCode(long code) { + this.processDefinitionCode = code; + } + + public long getTaskDefinitionCode() { + return this.taskDefinitionCode; + } + + public void setTaskDefinitionCode(long code) { + this.taskDefinitionCode = code; + } + + public String getTaskParams() { + return this.taskParams; + } + + public void setTaskParams(String taskParams) { + this.taskParams = taskParams; + } + + public String getWorkerGroup() { + return this.workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index d0b2d326be94..2af88fcde0ed 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -86,4 +86,12 @@ IPage queryByProcessDefineCodePaging(IPage page, * @return schedule */ Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + + /** + * query worker group list by process definition code + * + * @param processDefinitionCodeList processDefinitionCodeList + * @return schedule + */ + List querySchedulesByProcessDefinitionCodes(@Param("processDefinitionCodeList") List processDefinitionCodeList); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index 249e42afecd9..314f542b06e4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; @@ -69,4 +70,12 @@ public interface WorkFlowLineageMapper { */ List queryProcessLineageByCode(@Param("projectCode") long projectCode, @Param("processDefinitionCode") long processDefinitionCode); + + + /** + * query process definition by name + * + * @return dependent process definition + */ + List queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml index 85a26dd400d9..b22ac9f5a46c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml @@ -70,6 +70,17 @@ from t_ds_schedules where process_definition_code = #{processDefinitionCode} + + + + + diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index cea26c77fb15..9fa1366e695f 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; @@ -94,6 +95,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer { private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST; private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL; private static final int DEFAULT_DRY_RUN = 0; + private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE; @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -341,7 +343,8 @@ public void execProcessInstance(String userName, timeout, null, null, - DEFAULT_DRY_RUN + DEFAULT_DRY_RUN, + COMPLEMENT_DEPENDENT_MODE ); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 43714e7d7972..8fabaf98e4d9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.DqComparisonType; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqRule; @@ -114,6 +115,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DqRuleUtils; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; @@ -142,6 +144,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.stream.Collector; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -256,6 +259,9 @@ public class ProcessService { @Autowired private TaskGroupMapper taskGroupMapper; + @Autowired + private WorkFlowLineageMapper workFlowLineageMapper; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -1900,6 +1906,28 @@ public List queryReleaseSchedulerListByProcessDefinitionCode(long proc return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); } + /** + * query Schedule by processDefinitionCode + * + * @param processDefinitionCodeList processDefinitionCodeList + * @see Schedule + */ + public Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList) { + List processDefinitionScheduleList = scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList); + return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode, + Schedule::getWorkerGroup)); + } + + /** + * query dependent process definition by process definition code + * + * @param processDefinitionCode processDefinitionCode + * @see DependentProcessDefinition + */ + public List queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) { + return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); + } + /** * query need failover process instance *