Skip to content

Commit

Permalink
Merge 8bc89bd into 1d13ef0
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Apr 25, 2024
2 parents 1d13ef0 + 8bc89bd commit 1ad9011
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public interface CommandMapper extends BaseMapper<Command> {

/**
* count command state
* @param startTime startTime
* @param endTime endTime
*
* @param startTime startTime
* @param endTime endTime
* @param projectCodes projectCodes
* @return CommandCount list
*/
Expand All @@ -46,15 +47,19 @@ List<CommandCount> countCommandState(

/**
* query command page
*
* @return
*/
List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset);

/**
* query command page by slot
*
* @return command list
*/
List<Command> queryCommandPageBySlot(@Param("limit") int limit,
@Param("masterCount") int masterCount,
@Param("thisMasterSlot") int thisMasterSlot);

void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@
order by process_instance_priority, id asc
limit #{limit}
</select>
<delete id="deleteByWorkflowInstanceIds" >
delete from t_ds_command
where process_instance_id in
<foreach collection="workflowInstanceIds" index="index" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.dao.mapper;

import static com.google.common.truth.Truth.assertThat;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
Expand Down Expand Up @@ -173,6 +175,14 @@ public void testQueryCommandPageBySlot() {
toTestQueryCommandPageBySlot(masterCount, thisMasterSlot);
}

@Test
void deleteByWorkflowInstanceIds() {
Command command = createCommand();
assertThat(commandMapper.selectList(null)).isNotEmpty();
commandMapper.deleteByWorkflowInstanceIds(Lists.newArrayList(command.getProcessInstanceId()));
assertThat(commandMapper.selectList(null)).isEmpty();
}

private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) {
Command command = createCommand();
Integer id = command.getId();
Expand Down Expand Up @@ -280,5 +290,4 @@ private Command createCommand(CommandType commandType, long processDefinitionCod

return command;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,61 @@ private List<DynamicInputParameter> getDynamicInputParameters() {
@Override
public void kill() {
try {
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
doKillSubWorkflowInstances();
} catch (MasterTaskExecuteException e) {
log.error("kill {} error", taskInstance.getName(), e);
}
}

private void doKillSubWorkflowInstances() throws MasterTaskExecuteException {
List<ProcessInstance> existsSubProcessInstanceList =
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
return;
}

commandMapper.deleteByWorkflowInstanceIds(
existsSubProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));

List<ProcessInstance> runningSubProcessInstanceList =
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
doKillRunningSubWorkflowInstances(runningSubProcessInstanceList);

List<ProcessInstance> waitToRunProcessInstances =
subWorkflowService.filterWaitToRunProcessInstances(existsSubProcessInstanceList);
doKillWaitToRunSubWorkflowInstances(waitToRunProcessInstances);

this.haveBeenCanceled = true;
}

private void doKillRunningSubWorkflowInstances(List<ProcessInstance> runningSubProcessInstanceList) throws MasterTaskExecuteException {
for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) {
subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP);
processInstanceDao.updateById(subProcessInstance);
if (subProcessInstance.getState().isFinished()) {
log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId());
continue;
}
try {
sendToSubProcess(taskExecutionContext, subProcessInstance);
log.info("Success send [{}] request to SubWorkflow's master: {}", WorkflowExecutionStatus.READY_STOP,
subProcessInstance.getHost());
} catch (Exception e) {
throw new MasterTaskExecuteException(
String.format("Send stop request to SubWorkflow's master: %s failed",
subProcessInstance.getHost()),
e);
}
}
}

private void doKillWaitToRunSubWorkflowInstances(List<ProcessInstance> waitToRunWorkflowInstances) {
for (ProcessInstance subProcessInstance : waitToRunWorkflowInstances) {
subProcessInstance.setState(WorkflowExecutionStatus.STOP);
processInstanceDao.updateById(subProcessInstance);
}
}

private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException {
this.haveBeenCanceled = true;
List<ProcessInstance> existsSubProcessInstanceList =
Expand Down

0 comments on commit 1ad9011

Please sign in to comment.