diff --git a/docs/docs/en/guide/task/dependent.md b/docs/docs/en/guide/task/dependent.md index c034e6c04441..4ecd33408d3e 100644 --- a/docs/docs/en/guide/task/dependent.md +++ b/docs/docs/en/guide/task/dependent.md @@ -30,6 +30,8 @@ The Dependent node provides a logical judgment function, which can detect the ex Two dependency modes are supported, including workflow-dependent and task-dependent. The task-dependent mode is divided into two cases: depend on all tasks in the workflow and depend on a single task. The workflow-dependent mode checks the status of the dependent workflow; the all-task-dependent mode checks the status of all tasks in the workflow; and the single-task-dependent mode checks the status of the dependent task. +When the dependent result is success and the parameter passing option is true, the Dependent node will output the output parameters of the dependency to the downstream task. When the parameter names of multiple dependencies are the same, it involves the priority of the parameters. See also [Parameter Priority](../parameter/priority.md) + For example, process A is a weekly task, processes B and C are daily tasks, and task A requires tasks B and C to be successfully executed last week. ![dependent_task01](../../../../img/tasks/demo/dependent_task01.png) diff --git a/docs/docs/zh/guide/task/dependent.md b/docs/docs/zh/guide/task/dependent.md index 0f260f494440..dbc94be1dc7e 100644 --- a/docs/docs/zh/guide/task/dependent.md +++ b/docs/docs/zh/guide/task/dependent.md @@ -31,6 +31,8 @@ Dependent 节点提供了逻辑判断功能,可以按照逻辑来检测所依 依赖工作流的模式会检查所依赖的工作流的状态;依赖所有任务的模式会检查工作流中所有任务的状态; 依赖单个任务的模式会检查所依赖的任务的状态。 +当 Dependent 节点结果为 success 且参数传递选项为 true 时,Dependent 节点会将该依赖项的输出参数输出给下游任务。当多个依赖项的参数名称相同时涉及到参数的优先级问题,详见[参数优先级](../parameter/priority.md) + 例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周执行成功,如图示: ![dependent_task01](../../../../img/tasks/demo/dependent_task01.png) diff --git a/docs/img/tasks/demo/dependent_task01.png b/docs/img/tasks/demo/dependent_task01.png index 9bdb05ba2456..2001598eb184 100644 Binary files a/docs/img/tasks/demo/dependent_task01.png and b/docs/img/tasks/demo/dependent_task01.png differ diff --git a/docs/img/tasks/demo/dependent_task02.png b/docs/img/tasks/demo/dependent_task02.png index 3a424314b3db..9c2866dffe8d 100644 Binary files a/docs/img/tasks/demo/dependent_task02.png and b/docs/img/tasks/demo/dependent_task02.png differ diff --git a/docs/img/tasks/demo/dependent_task03.png b/docs/img/tasks/demo/dependent_task03.png index 9dd130fbfdcd..d19cc27c4b60 100644 Binary files a/docs/img/tasks/demo/dependent_task03.png and b/docs/img/tasks/demo/dependent_task03.png differ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java index 2a51f5d5bc6e..57f356e2b5ff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java @@ -20,6 +20,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEPENDENT_SPLIT; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; @@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction; @@ -69,6 +71,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti private final Date dependentDate; private final List dependentTaskList; private final Map dependResultMap; + private final Map dependVarPoolPropertyMap; public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext, DependentParameters dependentParameters, @@ -89,6 +92,7 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte this.dependentTaskList = initializeDependentTaskList(); log.info("Initialized dependent task list successfully"); this.dependResultMap = new HashMap<>(); + this.dependVarPoolPropertyMap = new HashMap<>(); } @Override @@ -97,8 +101,13 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte log.info("All dependent task finished, will calculate the dependent result"); DependResult dependResult = calculateDependResult(); log.info("The Dependent result is: {}", dependResult); - return dependResult == DependResult.SUCCESS ? AsyncTaskExecutionStatus.SUCCESS - : AsyncTaskExecutionStatus.FAILED; + if (dependResult == DependResult.SUCCESS) { + dependentParameters.setVarPool(JSONUtils.toJsonString(dependVarPoolPropertyMap.values())); + log.info("Set dependentParameters varPool: {}", dependentParameters.getVarPool()); + return AsyncTaskExecutionStatus.SUCCESS; + } else { + return AsyncTaskExecutionStatus.FAILED; + } } return AsyncTaskExecutionStatus.RUNNING; } @@ -189,9 +198,16 @@ private List initializeDependentTaskList() { private DependResult calculateDependResult() { List dependResultList = new ArrayList<>(); + Map dependVarPoolEndTimeMap = new HashMap<>(); for (DependentExecute dependentExecute : dependentTaskList) { DependResult dependResult = dependentExecute.getModelDependResult(dependentDate, processInstance.getTestFlag()); + if (dependResult == DependResult.SUCCESS) { + Map varPoolPropertyMap = dependentExecute.getDependTaskVarPoolPropertyMap(); + Map varPoolEndTimeMap = dependentExecute.getDependTaskVarPoolEndTimeMap(); + DependentUtils.addTaskVarPool(varPoolPropertyMap, varPoolEndTimeMap, dependVarPoolPropertyMap, + dependVarPoolEndTimeMap); + } dependResultList.add(dependResult); } return DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index b71468547840..195212ef34ac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -33,9 +34,11 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -48,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -97,6 +101,14 @@ public class DependentExecute { */ private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext.getBean(TaskDefinitionDao.class); + private Map dependTaskVarPoolPropertyMap = new HashMap<>(); + + private Map dependTaskVarPoolEndTimeMap = new HashMap<>(); + + private Map dependItemVarPoolPropertyMap = new HashMap<>(); + + private Map dependItemVarPoolEndTimeMap = new HashMap<>(); + /** * constructor * @@ -168,6 +180,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan return DependResult.WAITING; } if (processInstance.getState().isSuccess()) { + addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime()); return DependResult.SUCCESS; } log.warn( @@ -221,46 +234,12 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc } } } + addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime()); return DependResult.SUCCESS; } return DependResult.FAILED; } - /** - * get depend task result - * - * @param taskCode - * @param processInstance - * @return - */ - private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance, int testFlag) { - DependResult result; - TaskInstance taskInstance = null; - List taskInstanceList = - taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), testFlag); - - for (TaskInstance task : taskInstanceList) { - if (task.getTaskCode() == taskCode) { - taskInstance = task; - break; - } - } - - if (taskInstance == null) { - // cannot find task in the process instance - // maybe because process instance is running or failed. - if (processInstance.getState().isFinished()) { - result = DependResult.FAILED; - } else { - return DependResult.WAITING; - } - } else { - result = getDependResultByState(taskInstance.getState()); - } - - return result; - } - /** * depend type = depend_task * @@ -303,12 +282,31 @@ private DependResult dependResultBySingleTaskInstance(ProcessInstance processIns log.info( "The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}.", taskInstance.getTaskCode(), taskInstance.getName()); + addItemVarPool(taskInstance.getVarPool(), taskInstance.getEndTime().getTime()); return DependResult.SUCCESS; } return getDependResultByState(taskInstance.getState()); } } + /** + * add varPool to dependItemVarPoolMap + * + * @param varPoolStr + * @param endTime + */ + private void addItemVarPool(String varPoolStr, Long endTime) { + List varPool = new ArrayList<>(JSONUtils.toList(varPoolStr, Property.class)); + if (!varPool.isEmpty()) { + Map varPoolPropertyMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT)) + .collect(Collectors.toMap(Property::getProp, Function.identity())); + Map varPoolEndTimeMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT)) + .collect(Collectors.toMap(Property::getProp, d -> endTime)); + dependItemVarPoolPropertyMap.putAll(varPoolPropertyMap); + dependItemVarPoolEndTimeMap.putAll(varPoolEndTimeMap); + } + } + /** * find the last one process instance that : * 1. manual run and finish between the interval @@ -399,7 +397,13 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) { DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag); if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) { dependResultMap.put(dependentItem.getKey(), dependResult); + if (dependentItem.getParameterPassing() && !dependItemVarPoolPropertyMap.isEmpty()) { + DependentUtils.addTaskVarPool(dependItemVarPoolPropertyMap, dependItemVarPoolEndTimeMap, + dependTaskVarPoolPropertyMap, dependTaskVarPoolEndTimeMap); + } } + dependItemVarPoolPropertyMap.clear(); + dependItemVarPoolEndTimeMap.clear(); dependResultList.add(dependResult); } return DependentUtils.getDependResultForRelation(this.relation, dependResultList); @@ -424,6 +428,14 @@ public Map getDependResultMap() { return dependResultMap; } + public Map getDependTaskVarPoolPropertyMap() { + return dependTaskVarPoolPropertyMap; + } + + public Map getDependTaskVarPoolEndTimeMap() { + return dependTaskVarPoolEndTimeMap; + } + /** * check for self-dependent * @param dependentItem diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java index e19d1c1a5198..360fac42b61c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java @@ -35,6 +35,7 @@ public class DependentItem { private String dateValue; private DependResult dependResult; private TaskExecutionStatus status; + private Boolean parameterPassing; public String getKey() { return String.format("%d-%d-%s-%s", diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/DependentUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/DependentUtils.java index 3a34634752a1..ab2b25b65333 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/DependentUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/DependentUtils.java @@ -20,10 +20,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Map; public class DependentUtils { @@ -151,4 +153,30 @@ public static List getDateIntervalList(Date businessDate, String d return result; } + /** + * add varPool from dependItemVarPoolMap to dependTaskVarPoolMap + * + * @param dependItemVarPoolPropertyMap dependItemVarPoolPropertyMap + * @param dependItemVarPoolEndTimeMap dependItemVarPoolEndTimeMap + * @param dependTaskVarPoolPropertyMap dependTaskVarPoolPropertyMap + * @param dependTaskVarPoolEndTimeMap dependTaskVarPoolEndTimeMap + */ + public static void addTaskVarPool(Map dependItemVarPoolPropertyMap, + Map dependItemVarPoolEndTimeMap, + Map dependTaskVarPoolPropertyMap, + Map dependTaskVarPoolEndTimeMap) { + dependItemVarPoolPropertyMap.forEach((prop, property) -> { + Long itemEndTime = dependItemVarPoolEndTimeMap.get(prop); + if (dependTaskVarPoolPropertyMap.containsKey(prop)) { + if (itemEndTime < dependTaskVarPoolEndTimeMap.get(prop)) { + dependTaskVarPoolPropertyMap.put(prop, property); + dependTaskVarPoolEndTimeMap.put(prop, itemEndTime); + } + } else { + dependTaskVarPoolPropertyMap.put(prop, property); + dependTaskVarPoolEndTimeMap.put(prop, itemEndTime); + } + }); + } + } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index ee33434ad2eb..dff7371ce63e 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -644,6 +644,7 @@ export default { hour: 'hour', add_dependency: 'Add dependency', waiting_dependent_start: 'Waiting Dependent start', + dependent_task_parameter_passing: 'Parameter Passing', check_interval: 'Check interval', check_interval_tips: 'Check interval must be a positive integer', waiting_dependent_complete: 'Waiting Dependent complete', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 1470c6154ea6..ce588c7e7e05 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -633,6 +633,7 @@ export default { hour: '时', add_dependency: '添加依赖', waiting_dependent_start: '等待依赖启动', + dependent_task_parameter_passing: '参数传递', check_interval: '检查间隔', check_interval_tips: '检查间隔必须为正整数', waiting_dependent_complete: '等待依赖完成', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts index 10a6fcedfd47..b7b5ca6deb43 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts @@ -357,6 +357,7 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { selectOptions.value[i] = options item.depTaskCode = null item.definitionCode = null + item.parameterPassing = false } }, options: projectList, @@ -475,6 +476,13 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { } } }), + (j = 0) => ({ + type: 'switch', + field: 'parameterPassing', + span: 20, + name: t('project.node.dependent_task_parameter_passing'), + path: `dependTaskList.${i}.dependItemList.${j}.parameterPassing` + }), (j = 0) => ({ type: 'custom', field: 'state', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 86eec11682a8..9c48fea5e07f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -101,6 +101,7 @@ interface IDependentItem { cycle?: 'month' | 'week' | 'day' | 'hour' dateValue?: string dependentType?: 'DEPENDENT_ON_WORKFLOW' | 'DEPENDENT_ON_TASK' + parameterPassing?: boolean } interface IDependTask {