diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 91a29eacba9d..6aa48ea12ddf 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends IDao { */ void upsertProcessInstance(ProcessInstance processInstance); + /** + * performs an "upsert" operation (update or insert) on a ProcessInstance object within a new transaction + * + * @param processInstance processInstance + */ + void performTransactionalUpsert(ProcessInstance processInstance); + /** * find last scheduler process instance in the date interval * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index fd0aea9ee5a7..a668af8efbe0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -57,6 +57,12 @@ public void upsertProcessInstance(@NonNull ProcessInstance processInstance) { } } + @Override + @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class) + public void performTransactionalUpsert(ProcessInstance processInstance) { + this.upsertProcessInstance(processInstance); + } + /** * find last scheduler process instance in the date interval * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index d9c1fb99d7dd..9a0b686b3829 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -1813,7 +1813,7 @@ private void updateWorkflowInstanceStatesToDB(WorkflowExecutionStatus newStates) workflowInstance.setEndTime(new Date()); } try { - processInstanceDao.upsertProcessInstance(workflowInstance); + processInstanceDao.performTransactionalUpsert(workflowInstance); } catch (Exception ex) { // recover the status workflowInstance.setStateWithDesc(originStates, "recover state by DB error"); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index a47fdac8a520..da3fbfe0cfe2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -343,7 +343,7 @@ public class ProcessServiceImpl implements ProcessService { protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) { processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); // serial wait // when we get the running instance(or waiting instance) only get the priority instance(by id) if (processDefinition.getExecutionType().typeIsSerialWait()) { @@ -356,7 +356,7 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit if (CollectionUtils.isEmpty(runningProcessInstances)) { processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit from serial_wait strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) { List runningProcessInstances = @@ -367,12 +367,12 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit processInstance.getId()); if (CollectionUtils.isNotEmpty(runningProcessInstances)) { processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); return; } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit from serial_discard strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId( @@ -402,7 +402,7 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit } processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "submit by serial_priority strategy"); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.performTransactionalUpsert(processInstance); } }