Skip to content

Commit

Permalink
Merge e83e6c0 into 688f844
Browse files Browse the repository at this point in the history
  • Loading branch information
sdhzwc authored Oct 25, 2023
2 parents 688f844 + e83e6c0 commit a9c3b1a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
Expand Down Expand Up @@ -141,6 +143,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessDefinitionService processDefinitionService;

@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;

/**
* create task definition
*
Expand Down Expand Up @@ -781,20 +786,59 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
// update process task relation
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByTaskCode(taskDefinitionToUpdate.getCode());
.queryProcessTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
taskDefinition.getVersion());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
Map<Long, List<ProcessTaskRelation>> processTaskRelationGroupList = processTaskRelations.stream()
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
for (Map.Entry<Long, List<ProcessTaskRelation>> processTaskRelationMap : processTaskRelationGroupList
.entrySet()) {
Long processDefinitionCode = processTaskRelationMap.getKey();
int processDefinitionVersion =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionCode)
+ 1;
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMap.getValue();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
}
processTaskRelation.setProcessDefinitionVersion(processDefinitionVersion);
int updateProcessDefinitionVersionCount =
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (updateProcessDefinitionVersionCount != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
}
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setId(null);
processTaskRelationLog.setOperateTime(now);
int insertProcessTaskRelationLogCount = processTaskRelationLogDao.insert(processTaskRelationLog);
if (insertProcessTaskRelationLogCount != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
}
}
int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (count != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
processDefinition.setVersion(processDefinitionVersion);
processDefinition.setUpdateTime(now);
processDefinition.setUserId(loginUser.getId());
// update process definition
int updateProcessDefinitionCount = processDefinitionMapper.updateById(processDefinition);
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setId(null);
processDefinitionLog.setOperator(loginUser.getId());
int insertProcessDefinitionLogCount = processDefinitionLogMapper.insert(processDefinitionLog);
if ((updateProcessDefinitionCount & insertProcessDefinitionLogCount) != 1) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationLogDao processTaskRelationLogDao;

@Mock
private ProcessDefinitionLogMapper processDefinitionLogMapper;

private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
Expand Down Expand Up @@ -188,9 +191,14 @@ public void updateTaskDefinition() {
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition());
Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0))
.thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,14 @@ Long queryTaskCodeByTaskName(@Param("workflowCode") Long workflowCode,

void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode,
@Param("workflowDefinitionVersion") int workflowDefinitionVersion);

/**
* process task relation by taskCode and postTaskVersion
*
* @param taskCode taskCode
* @param postTaskVersion postTaskVersion
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryProcessTaskRelationByTaskCodeAndTaskVersion(@Param("taskCode") long taskCode,
@Param("postTaskVersion") long postTaskVersion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
post_task_version=#{processTaskRelation.postTaskVersion},
process_definition_version=#{processTaskRelation.processDefinitionVersion}
where id = #{processTaskRelation.id}
</update>

Expand All @@ -240,4 +241,19 @@
from t_ds_process_task_relation
where process_definition_code = #{workflowDefinitionCode} and process_definition_version = #{workflowDefinitionVersion}
</delete>

<select id="queryProcessTaskRelationByTaskCodeAndTaskVersion" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE process_definition_code in (
SELECT
process_definition_code
FROM
t_ds_process_task_relation
WHERE
post_task_code = #{taskCode}
and post_task_version = #{postTaskVersion}
)
</select>
</mapper>

0 comments on commit a9c3b1a

Please sign in to comment.