Skip to content

Commit

Permalink
[Feature-#8373][MasterServer] Dependent tasks can re-run automaticall…
Browse files Browse the repository at this point in the history
…y in the case of complement (#8496)

* first add feature_8373

* fix code smell

* add blank line

* fix some problems

* fix unit test error
  • Loading branch information
SbloodyS authored Feb 25, 2022
1 parent e174d7a commit 891a002
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
Expand Down Expand Up @@ -107,7 +108,9 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
Expand All @@ -130,7 +133,8 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {

if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
Expand All @@ -139,8 +143,15 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber, dryRun);

if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}

Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode);
return returnDataList(result);
}

Expand Down Expand Up @@ -181,7 +192,9 @@ public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constant
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
Expand All @@ -204,7 +217,8 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {

if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
Expand All @@ -215,6 +229,10 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con
startParamMap = JSONUtils.toMap(startParams);
}

if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}

Map<String, Object> result = new HashMap<>();
List<String> processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
Expand All @@ -224,7 +242,9 @@ public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Con
for (String strProcessDefinitionCode : processDefinitionCodeArray) {
long processDefinitionCode = Long.parseLong(strProcessDefinitionCode);
result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun);
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
complementDependentMode);

if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
Expand Down Expand Up @@ -63,7 +64,8 @@ Map<String, Object> execProcessInstance(User loginUser, long projectCode,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun);
int dryRun,
ComplementDependentMode complementDependentMode);

/**
* check whether the process definition can be executed
Expand Down
Loading

0 comments on commit 891a002

Please sign in to comment.