diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index a8490cbef7c2b..9fb664322764f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -34,8 +34,9 @@ public interface CommandMapper extends BaseMapper { /** * count command state - * @param startTime startTime - * @param endTime endTime + * + * @param startTime startTime + * @param endTime endTime * @param projectCodes projectCodes * @return CommandCount list */ @@ -46,15 +47,19 @@ List countCommandState( /** * query command page + * * @return */ List queryCommandPage(@Param("limit") int limit, @Param("offset") int offset); /** * query command page by slot + * * @return command list */ List queryCommandPageBySlot(@Param("limit") int limit, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot); + + void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List workflowInstanceIds); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index c950f664138a5..56db890ef07ae 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -47,4 +47,11 @@ order by process_instance_priority, id asc limit #{limit} + + delete from t_ds_command + where process_instance_id in + + #{i} + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index 3d45477d858b9..2d367e46e4c4f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.dao.mapper; +import static com.google.common.truth.Truth.assertThat; + import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -173,6 +175,14 @@ public void testQueryCommandPageBySlot() { toTestQueryCommandPageBySlot(masterCount, thisMasterSlot); } + @Test + void deleteByWorkflowInstanceIds() { + Command command = createCommand(); + assertThat(commandMapper.selectList(null)).isNotEmpty(); + commandMapper.deleteByWorkflowInstanceIds(Lists.newArrayList(command.getProcessInstanceId())); + assertThat(commandMapper.selectList(null)).isEmpty(); + } + private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) { Command command = createCommand(); Integer id = command.getId(); @@ -280,5 +290,4 @@ private Command createCommand(CommandType commandType, long processDefinitionCod return command; } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java index 3baa10b343ef1..12cae5c53ea0f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java @@ -252,12 +252,61 @@ private List getDynamicInputParameters() { @Override public void kill() { try { - changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP); + doKillSubWorkflowInstances(); } catch (MasterTaskExecuteException e) { log.error("kill {} error", taskInstance.getName(), e); } } + private void doKillSubWorkflowInstances() throws MasterTaskExecuteException { + List existsSubProcessInstanceList = + subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode()); + if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) { + return; + } + + commandMapper.deleteByWorkflowInstanceIds( + existsSubProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())); + + List runningSubProcessInstanceList = + subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList); + doKillRunningSubWorkflowInstances(runningSubProcessInstanceList); + + List waitToRunProcessInstances = + subWorkflowService.filterWaitToRunProcessInstances(existsSubProcessInstanceList); + doKillWaitToRunSubWorkflowInstances(waitToRunProcessInstances); + + this.haveBeenCanceled = true; + } + + private void doKillRunningSubWorkflowInstances(List runningSubProcessInstanceList) throws MasterTaskExecuteException { + for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) { + subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP); + processInstanceDao.updateById(subProcessInstance); + if (subProcessInstance.getState().isFinished()) { + log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId()); + continue; + } + try { + sendToSubProcess(taskExecutionContext, subProcessInstance); + log.info("Success send [{}] request to SubWorkflow's master: {}", WorkflowExecutionStatus.READY_STOP, + subProcessInstance.getHost()); + } catch (Exception e) { + throw new MasterTaskExecuteException( + String.format("Send stop request to SubWorkflow's master: %s failed", + subProcessInstance.getHost()), + e); + } + } + } + + private void doKillWaitToRunSubWorkflowInstances(List waitToRunWorkflowInstances) { + for (ProcessInstance subProcessInstance : waitToRunWorkflowInstances) { + subProcessInstance.setState(WorkflowExecutionStatus.STOP); + processInstanceDao.updateById(subProcessInstance); + } + } + private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException { this.haveBeenCanceled = true; List existsSubProcessInstanceList =