From 167f1868f3235af795b89fb73ba844a5b9d2938b Mon Sep 17 00:00:00 2001 From: WangJPLeo Date: Tue, 26 Apr 2022 14:13:01 +0800 Subject: [PATCH] A task instance that normally queries the serial wait state. --- .../dolphinscheduler/dao/mapper/ProcessInstanceMapper.java | 2 +- .../server/master/runner/WorkflowExecuteThread.java | 2 +- .../dolphinscheduler/service/process/ProcessService.java | 2 +- .../dolphinscheduler/service/process/ProcessServiceImpl.java | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index bc4ea7cacaf8..1a639df8b47b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -255,5 +255,5 @@ int updateGlobalParamsById(@Param("globalParams") String globalParams, boolean updateNextProcessIdById(@Param("thisInstanceId") int thisInstanceId, @Param("runningInstanceId") int runningInstanceId); - ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode, @Param("state") int state); + ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode, @Param("state") int state, @Param("id") int id); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 4d1cb296a7fa..49c1f83e9c23 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -823,7 +823,7 @@ private void endProcess() { public void checkSerialProcess(ProcessDefinition processDefinition) { int nextInstanceId = processInstance.getNextProcessInstanceId(); if (nextInstanceId == 0) { - ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode()); + ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId()); if (nextProcessInstance == null) { return; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 395aa3166850..bf48ab9cea65 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -295,5 +295,5 @@ TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, void sendStartTask2Master(ProcessInstance processInstance, int taskId, org.apache.dolphinscheduler.remote.command.CommandType taskType); - ProcessInstance loadNextProcess4Serial(long code, int state); + ProcessInstance loadNextProcess4Serial(long code, int state, int id); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index e06d84287f7b..58183f2c88fc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -3031,8 +3031,8 @@ public void sendStartTask2Master(ProcessInstance processInstance, int taskId, } @Override - public ProcessInstance loadNextProcess4Serial(long code, int state) { - return this.processInstanceMapper.loadNextProcess4Serial(code, state); + public ProcessInstance loadNextProcess4Serial(long code, int state, int id) { + return this.processInstanceMapper.loadNextProcess4Serial(code, state, id); } protected void deleteCommandWithCheck(int commandId) {