Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug-13951 ][dolphinscheduler-service] StartParams is not applied when task is failover(RECOVER_TOLERANCE_FAULT_PROCESS CommandType) #13958

Merged
merged 1 commit into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
Expand Down Expand Up @@ -770,8 +771,9 @@ private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
}

CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
// reset global params while repeat running and recover tolerance fault process is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING ||
commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}

Expand Down Expand Up @@ -1593,8 +1595,7 @@ public void processNeedFailoverProcessInstances(ProcessInstance processInstance)
cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
cmd.setProcessInstanceId(processInstance.getId());
cmd.setCommandParam(
String.format("{\"%s\":%d}", CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
Expand Down Expand Up @@ -2608,4 +2609,15 @@ public void saveCommandTrigger(Integer commandId, Integer processInstanceId) {
triggerRelationService.saveCommandTrigger(commandId, processInstanceId);
}

private Map<String, Object> createCommandParams(ProcessInstance processInstance) {
Map<String, Object> commandMap =
JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
Map<String, Object> recoverFailoverCommandParams = new HashMap<>();
Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS))
.ifPresent(startParams -> recoverFailoverCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
recoverFailoverCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId());
return recoverFailoverCommandParams;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,31 @@ public void testHandleCommand() throws CronParseException, CodeGenerateUtils.Cod
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assertions.assertNotNull(processInstance10);

// build command same as processService.processNeedFailoverProcessInstances(processInstance);
Command command12 = new Command();
command12.setId(12);
command12.setProcessDefinitionCode(definitionCode);
command12.setProcessDefinitionVersion(definitionVersion);
command12.setProcessInstanceId(processInstanceId);
command12.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
HashMap<String, String> startParams12 = new HashMap<>();
startParams12.put("startParam11", "testStartParam11");
HashMap<String, String> commandParams12 = new HashMap<>();
commandParams12.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams12));
commandParams12.put("ProcessInstanceId", "222");
command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam11\"");
ProcessInstance processInstance13 = processService.handleCommand(host, command12);
Assertions.assertNotNull(processInstance13);
Assertions.assertNotNull(processInstance13.getGlobalParams());
Assertions.assertTrue(processInstance13.getGlobalParams().contains("\"testStartParam11\""));
}

@Test
Expand Down