From 8c8677df82361ff724e2d75525df063e3fdb6a18 Mon Sep 17 00:00:00 2001 From: Stalary Date: Thu, 8 Sep 2022 15:08:10 +0800 Subject: [PATCH] cherry-pick [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. --- .../api/service/impl/ExecutorServiceImpl.java | 17 +++++++++++------ .../dao/entity/DependentProcessDefinition.java | 17 +++++++++++++++-- .../dao/mapper/WorkFlowLineageMapper.xml | 1 + 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index da9f9a742ecb..3582ad9bddd8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -804,6 +804,7 @@ protected int createComplementDependentCommand(List schedules, Command dependentCommand.setTaskDependType(TaskDependType.TASK_POST); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); + dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion()); dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); Map cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam()); cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); @@ -823,7 +824,8 @@ private List getComplementDependentDefinitionList(lo List dependentProcessDefinitionList = processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); - return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup); + return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, + workerGroup, processDefinitionCode); } /** @@ -831,9 +833,11 @@ private List getComplementDependentDefinitionList(lo * the dependent process definition and if there is no worker group in the schedule, use the complement selection's * worker group */ - private List checkDependentProcessDefinitionValid(List dependentProcessDefinitionList, - CycleEnum processDefinitionCycle, - String workerGroup) { + private List checkDependentProcessDefinitionValid( + List dependentProcessDefinitionList, + CycleEnum processDefinitionCycle, + String workerGroup, + long upstreamProcessDefinitionCode) { List validDependentProcessDefinitionList = new ArrayList<>(); List processDefinitionCodeList = dependentProcessDefinitionList.stream() @@ -843,8 +847,9 @@ private List checkDependentProcessDefinitionValid(Li Map processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { - if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { - if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { + if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) { + if (processDefinitionWorkerGroupMap + .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { dependentProcessDefinition.setWorkerGroup(workerGroup); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java index 9de57dff3388..87bb3d4234d9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java @@ -41,6 +41,11 @@ public class DependentProcessDefinition { */ private String processDefinitionName; + /** + * process definition version + **/ + private int processDefinitionVersion; + /** * task definition name */ @@ -60,14 +65,14 @@ public class DependentProcessDefinition { * get dependent cycle * @return CycleEnum */ - public CycleEnum getDependentCycle() { + public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) { DependentParameters dependentParameters = this.getDependentParameters(); List dependentTaskModelList = dependentParameters.getDependTaskList(); for (DependentTaskModel dependentTaskModel : dependentTaskModelList) { List dependentItemList = dependentTaskModel.getDependItemList(); for (DependentItem dependentItem : dependentItemList) { - if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) { + if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) { return cycle2CycleEnum(dependentItem.getCycle()); } } @@ -122,6 +127,14 @@ public void setProcessDefinitionCode(long code) { this.processDefinitionCode = code; } + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + public long getTaskDefinitionCode() { return this.taskDefinitionCode; } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index 6b5e487c0ada..1815cfcd78c5 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -149,6 +149,7 @@ SELECT c.code AS process_definition_code ,c.name AS process_definition_name + ,c.version as process_definition_version ,a.code AS task_definition_code ,a.task_params FROM