Skip to content

Commit

Permalink
[Bug][Master]serial_wait strategy workflow unable to wake up (apache#…
Browse files Browse the repository at this point in the history
…15270)

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

* fix: serial_wait strategy workflow unable to wake up

Signed-off-by: Gallardot <gallardot@apache.org>

---------

Signed-off-by: Gallardot <gallardot@apache.org>
Co-authored-by: fuchanghai <changhaifu@apache.org>
  • Loading branch information
Gallardot and fuchanghai committed Jan 16, 2024
1 parent 73f2c53 commit f349212
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
*/
void upsertProcessInstance(ProcessInstance processInstance);

/**
* performs an "upsert" operation (update or insert) on a ProcessInstance object within a new transaction
*
* @param processInstance processInstance
*/
void performTransactionalUpsert(ProcessInstance processInstance);

/**
* find last scheduler process instance in the date interval
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public void upsertProcessInstance(@NonNull ProcessInstance processInstance) {
}
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public void performTransactionalUpsert(ProcessInstance processInstance) {
this.upsertProcessInstance(processInstance);
}

/**
* find last scheduler process instance in the date interval
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,7 @@ private void updateWorkflowInstanceStatesToDB(WorkflowExecutionStatus newStates)
workflowInstance.setEndTime(new Date());
}
try {
processInstanceDao.upsertProcessInstance(workflowInstance);
processInstanceDao.performTransactionalUpsert(workflowInstance);
} catch (Exception ex) {
// recover the status
workflowInstance.setStateWithDesc(originStates, "recover state by DB error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public class ProcessServiceImpl implements ProcessService {

protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
Expand All @@ -356,7 +356,7 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances =
Expand All @@ -367,12 +367,12 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit
processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
return;
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
Expand Down Expand Up @@ -402,7 +402,7 @@ protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinit
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit by serial_priority strategy");
processInstanceDao.upsertProcessInstance(processInstance);
processInstanceDao.performTransactionalUpsert(processInstance);
}
}

Expand Down

0 comments on commit f349212

Please sign in to comment.